2022. 5. 1. 16:32ใ๐ Data Engineering/Apache Spark
Spark ์๋๋ฅผ ์ต์ ํ์ํค๋ ๋ฐฉ๋ฒ์ผ๋ก ,
Cashe(), Persist()๊ฐ ์๋ค.
Spark ํน์ฑ์, Transformations๋ Lazy Execution ๋ฐฉ์์ผ๋ก ์์ ์ ์ํํ๋ค.
๊ทธ ์ด์ ๋, Eager Execution(์ฆ์ ์ํ)์ ๋ฐ๋ฅผ ๊ฒฝ์ฐ ๋ค์๊ณผ ๊ฐ์ ๋นํจ์จ์ด ๋ฐ์ํ๋ค.
Task๋ฅผ ์ํํ ๋๋ง๋ค Disk์ ์ ์ฅ์ ํ๊ณ , ๋ค์ ์ด๋ฅผ ๋ถ๋ฌ๋ค์ฌ์ผ ํ๋ค.
์ด ๊ณผ์ ์์ Disk ๊ณต๊ฐ์ ๋ฌผ๋ก , ํต์ ์ผ๋ก ์ธํ ์๋ ์ ํ ๋ฑ ๋นํจ์จ์ด ๋ฐ์ํ๊ฒ ๋๋ค.
๊ทธ๋ ๋ค๋ฉด Lazy Execution ๋ฐฉ์์ผ๋ก ์์ ์ ์ํํ๋ค๋ฉด ์ด๋จ๊น?
Lazy Execution ๋ฐฉ์์ด๋ผ๋ฉด, Disk์ ์ ์ฅํ์ง ์๋๋ค. ๋์ Task์์ Task๋ก ์ ๋ฌ๋ ๋ In-Memory ๋ฐฉ์์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ์ ์ ์๋ค. In-Memory ๋ฐฉ์์ผ๋ก ์ฃผ๊ณ ๋ฐ๊ธฐ ์ํด ์ฌ์ฉํ๋ ๊ฒ์ด ๋ฐ๋ก, Cashe(), Persist() ์ด๋ค.
์ฝ๋๋ฅผ ํตํด ๋ฐ๋ก ์ค์ตํด๋ณด์.
conf ์ค์
import os
import findspark
findspark.init(os.environ.get('SPARK_HOME'))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
conf = SparkConf().setMaster('local').setAppName('my-RDD-transformation-action')
sc = SparkContext(conf=conf)
# RDD ์์ฑ
foods = sc.parallelize(['์ง์ฅ๋ฉด', '์งฌ๋ฝ', '๋ณถ์๋ฐฅ', '๋ก๋ณถ์ด', '๋ผ๋ฉด', '์ฐ๋', '์ง์ฅ๋ฉด', '์งฌ๋ฝ', '๋ณถ์๋ฐฅ', '๋ก๋ณถ์ด', '๋ผ๋ฉด', '์ฐ๋', '์ง์ฅ๋ฉด', '์งฌ๋ฝ', '๋ณถ์๋ฐฅ'])
Code
Cache(), Persist()๋ฅผ ํ์ฉํ์ง ์์ ๊ฒฝ์ฐ.
def parse(row):
return (row, 1)
key_foods = foods.map(parse)
key_foods.collect()
[('์ง์ฅ๋ฉด', 1),
('์งฌ๋ฝ', 1),
('๋ณถ์๋ฐฅ', 1),
('๋ก๋ณถ์ด', 1),
('๋ผ๋ฉด', 1),
('์ฐ๋', 1),
('์ง์ฅ๋ฉด', 1),
('์งฌ๋ฝ', 1),
('๋ณถ์๋ฐฅ', 1),
('๋ก๋ณถ์ด', 1),
('๋ผ๋ฉด', 1),
('์ฐ๋', 1),
('์ง์ฅ๋ฉด', 1),
('์งฌ๋ฝ', 1),
('๋ณถ์๋ฐฅ', 1)]
map() ํจ์๋ฅผ ํ์ฉํด์ key_foods RDD๋ฅผ ๋ง๋ค์๋ค.
result1 = key_foods.take(3)
result2 = key_foods.mapValues(lambda x: x + 1).collect()
print(result2)
[('์ง์ฅ๋ฉด', 2), ('์งฌ๋ฝ', 2), ('๋ณถ์๋ฐฅ', 2),
('๋ก๋ณถ์ด', 2), ('๋ผ๋ฉด', 2), ('์ฐ๋', 2), ('์ง์ฅ๋ฉด', 2),
('์งฌ๋ฝ', 2), ('๋ณถ์๋ฐฅ', 2), ('๋ก๋ณถ์ด', 2), ('๋ผ๋ฉด', 2),
('์ฐ๋', 2), ('์ง์ฅ๋ฉด', 2), ('์งฌ๋ฝ', 2), ('๋ณถ์๋ฐฅ', 2)]
Cache(), Persist() ํจ์๋ฅผ ์ฌ์ฉํ์ง ์์๊ธฐ์, ์์ ์ฝ๋์์ key_foods๊ฐ ๋ ๋ฒ ๋ง๋ค์ด์ง๋ค. take() ํจ์๋ฅผ ์คํํ๋ฉด์ 1ํ ๋ง๋ค์ด์ง๊ณ , mapValues๋ฅผ ์คํํ๋ฉด์ ๋๋ค์ ํ๋ฒ ๋ง๋ค์ด์ง๋ ๊ฒ์ด๋ค. ์ด๋ฌํ ๋นํจ์จ์ ์๋๋ฅผ ์ ํ์ํฌ ์ ์๋ค.
Cache(), Persist()๋ฅผ ํ์ฉํ ๊ฒฝ์ฐ.
def parse(row):
return (row, 1)
key_foods = foods.map(parse).persist()
key_foods.collect()
[('์ง์ฅ๋ฉด', 1),
('์งฌ๋ฝ', 1),
('๋ณถ์๋ฐฅ', 1),
('๋ก๋ณถ์ด', 1),
('๋ผ๋ฉด', 1),
('์ฐ๋', 1),
('์ง์ฅ๋ฉด', 1),
('์งฌ๋ฝ', 1),
('๋ณถ์๋ฐฅ', 1),
('๋ก๋ณถ์ด', 1),
('๋ผ๋ฉด', 1),
('์ฐ๋', 1),
('์ง์ฅ๋ฉด', 1),
('์งฌ๋ฝ', 1),
('๋ณถ์๋ฐฅ', 1)]
map() ํจ์๋ฅผ ์ ์ฉํ๊ณ , ๊ณง๋ฐ๋ก persist() ํจ์๋ฅผ ์ด์ฉํด ๋ฉ๋ชจ๋ฆฌ์ ์ ์ฅํ๋๋ก ํ๋ค.
result1 = key_foods.take(3)
result2 = key_foods.mapValues(lambda x: x + 1).collect()
print(result2)
[('์ง์ฅ๋ฉด', 2), ('์งฌ๋ฝ', 2), ('๋ณถ์๋ฐฅ', 2), ('๋ก๋ณถ์ด', 2),
('๋ผ๋ฉด', 2), ('์ฐ๋', 2), ('์ง์ฅ๋ฉด', 2), ('์งฌ๋ฝ', 2),
('๋ณถ์๋ฐฅ', 2), ('๋ก๋ณถ์ด', 2), ('๋ผ๋ฉด', 2), ('์ฐ๋', 2),
('์ง์ฅ๋ฉด', 2), ('์งฌ๋ฝ', 2), ('๋ณถ์๋ฐฅ', 2)]
์ด์ ๋ key_foods ์ฐ์ฐ์ด ํ ๋ฒ๋ง ์คํ๋๊ณ , ๋ฉ๋ชจ๋ฆฌ์ ์๋ key_foods๊ฐ ํ์ฉ๋๊ธฐ๋ง ํ ๋ฟ์ด๋ค.
์ด์ธ์๋ ๋จธ์ ๋ฌ๋ ํ์ต๊ฐ์ด, ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃจ๋ task๊ฐ ๋ฐ๋ณต๋๋ ๊ฒฝ์ฐ๊ฐ ๋ง์ ๋๋ ํ์ฉํ ์ ์๋ค.
points = sc.textFile('...').map(parsePoint).cache()
for i in range(ITERATIONS):
gradient = points.map(gradient_descent).reduce(lambda x, y: (x + y) / n)
w -= gradient * learning_rate
cache() ํจ์๋ฅผ ์ด์ฉํ์ง ์์๋ค๋ฉด, gradient๋ฅผ ๊ณ์ฐํ ๋๋ง๋ค points๋ฅผ ์ฐ์ฐํ๋ ๋์ฐํ ์ํฉ์ด ์ฐ์ถ๋์์ ๊ฒ์ด๋ค.
๋ค์์ Cache(), Persist()๋ฅผ ํ์ฉํ ๋, ์ฐธ๊ณ ์ฌํญ์ด๋ค.
Storage Level
๋ค์ํ ์คํ ๋ฆฌ์ง ๋ ๋ฒจ์ด ์กด์ฌํ๋ค. ์ฌ์ฉ์๋ ํน์ ๋ ๋ฒจ์ ์ง์ ํด์ ํ์ฉํ ์ ์๋ค.
1) MEMORY_ONLY - ๋ฉ๋ชจ๋ฆฌ์๋ง ์ ์ฅ
2) MEMORY_AND_DISK - ๋ฉ๋ชจ๋ฆฌ์ ๋์คํฌ์ ์ ์ฅ
3) MEMORY_ONLY_SER - ๋ฉ๋ชจ๋ฆฌ์๋ง ์๋ฆฌ์ผ ๋ผ์ด์ ธ ์ํจ ๋ค์ ์ ์ฅ
4) MEMORY_AND_DISK_SER - ๋ฉ๋ชจ๋ฆฌ์ ๋์คํฌ์ ์๋ฆฌ์ผ๋ผ์ด์ ธ ์ํจ ๋ค์ ์ ์ฅ
5) DISK_ONLY - ๋์คํฌ์๋ง ์ ์ฅ
Cache
Cache()์ ์ค์ ์ต์ ์ด๋ค.
1) Default Storage Level ์ฌ์ฉ
2) RDD - MEMORY_ONLY
3) DataFrame - MEMORY_AND_DISK
Persist
Persist()์ ์ค์ ์ต์ ์ด๋ค.
1) Storage Level์ ์ฌ์ฉ์๊ฐ ์ํ๋ ๋๋ก ์ง์ ๊ฐ๋ฅ
'๐ Data Engineering > Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] SparkSQL ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.06 |
---|---|
[Spark] Reduction ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.01 |
[Spark] Transformations & Actions ํจ์ (0) | 2022.05.01 |
[Spark] Key-Value RDD ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.01 |
[Spark] ๋ถ์ฐ/๋ณ๋ ฌ ์ฒ๋ฆฌํ ๋ ์ฃผ์ํ ์ (0) | 2022.05.01 |