[Spark] Spark ์†๋„ ์ตœ์ ํ™”, Cashe() & Persist()

2022. 5. 1. 16:32ใ†๐Ÿ›  Data Engineering/Apache Spark

 

 

Spark ์†๋„๋ฅผ ์ตœ์ ํ™”์‹œํ‚ค๋Š” ๋ฐฉ๋ฒ•์œผ๋ก ,

Cashe(), Persist()๊ฐ€ ์žˆ๋‹ค.

 

 

 Spark ํŠน์„ฑ์ƒ, Transformations๋Š” Lazy Execution ๋ฐฉ์‹์œผ๋กœ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•œ๋‹ค.

๊ทธ ์ด์œ ๋Š”, Eager Execution(์ฆ‰์‹œ ์‹œํ–‰)์„ ๋”ฐ๋ฅผ ๊ฒฝ์šฐ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๋น„ํšจ์œจ์ด ๋ฐœ์ƒํ•œ๋‹ค.

 

 

 

 

 

 

 

Task๋ฅผ ์ˆ˜ํ–‰ํ•  ๋•Œ๋งˆ๋‹ค Disk์— ์ €์žฅ์„ ํ•˜๊ณ , ๋‹ค์‹œ ์ด๋ฅผ ๋ถˆ๋Ÿฌ๋“ค์—ฌ์•ผ ํ•œ๋‹ค.

์ด ๊ณผ์ •์—์„œ Disk ๊ณต๊ฐ„์€ ๋ฌผ๋ก , ํ†ต์‹ ์œผ๋กœ ์ธํ•œ ์†๋„ ์ €ํ•˜ ๋“ฑ ๋น„ํšจ์œจ์ด ๋ฐœ์ƒํ•˜๊ฒŒ ๋œ๋‹ค.

๊ทธ๋ ‡๋‹ค๋ฉด Lazy Execution ๋ฐฉ์‹์œผ๋กœ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•œ๋‹ค๋ฉด ์–ด๋–จ๊นŒ?

 

 

 

 

 

 

 

Lazy Execution ๋ฐฉ์‹์ด๋ผ๋ฉด, Disk์— ์ €์žฅํ•˜์ง€ ์•Š๋Š”๋‹ค. ๋Œ€์‹  Task์—์„œ Task๋กœ ์ „๋‹ฌ๋  ๋•Œ In-Memory ๋ฐฉ์‹์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ ๋ฐ›์„ ์ˆ˜ ์žˆ๋‹ค. In-Memory ๋ฐฉ์‹์œผ๋กœ ์ฃผ๊ณ ๋ฐ›๊ธฐ ์œ„ํ•ด ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ๋ฐ”๋กœ, Cashe(), Persist() ์ด๋‹ค.

์ฝ”๋“œ๋ฅผ ํ†ตํ•ด ๋ฐ”๋กœ ์‹ค์Šตํ•ด๋ณด์ž.

 

 

 

 

 

conf ์„ค์ •

 

import os
import findspark
findspark.init(os.environ.get('SPARK_HOME'))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()

conf = SparkConf().setMaster('local').setAppName('my-RDD-transformation-action')
sc = SparkContext(conf=conf)

# RDD ์ƒ์„ฑ
foods = sc.parallelize(['์งœ์žฅ๋ฉด', '์งฌ๋ฝ•', '๋ณถ์Œ๋ฐฅ', '๋–ก๋ณถ์ด', '๋ผ๋ฉด', '์šฐ๋™', '์งœ์žฅ๋ฉด', '์งฌ๋ฝ•', '๋ณถ์Œ๋ฐฅ', '๋–ก๋ณถ์ด', '๋ผ๋ฉด', '์šฐ๋™', '์งœ์žฅ๋ฉด', '์งฌ๋ฝ•', '๋ณถ์Œ๋ฐฅ'])

 

 

 

 

 

Code


 

Cache(), Persist()๋ฅผ ํ™œ์šฉํ•˜์ง€ ์•Š์€ ๊ฒฝ์šฐ.

 

def parse(row):
    return (row, 1)

key_foods = foods.map(parse)
key_foods.collect()


[('์งœ์žฅ๋ฉด', 1),
 ('์งฌ๋ฝ•', 1),
 ('๋ณถ์Œ๋ฐฅ', 1),
 ('๋–ก๋ณถ์ด', 1),
 ('๋ผ๋ฉด', 1),
 ('์šฐ๋™', 1),
 ('์งœ์žฅ๋ฉด', 1),
 ('์งฌ๋ฝ•', 1),
 ('๋ณถ์Œ๋ฐฅ', 1),
 ('๋–ก๋ณถ์ด', 1),
 ('๋ผ๋ฉด', 1),
 ('์šฐ๋™', 1),
 ('์งœ์žฅ๋ฉด', 1),
 ('์งฌ๋ฝ•', 1),
 ('๋ณถ์Œ๋ฐฅ', 1)]

 

map() ํ•จ์ˆ˜๋ฅผ ํ™œ์šฉํ•ด์„œ key_foods RDD๋ฅผ ๋งŒ๋“ค์—ˆ๋‹ค. 

 

result1 = key_foods.take(3)
result2 = key_foods.mapValues(lambda x: x + 1).collect()
print(result2)


[('์งœ์žฅ๋ฉด', 2), ('์งฌ๋ฝ•', 2), ('๋ณถ์Œ๋ฐฅ', 2), 
('๋–ก๋ณถ์ด', 2), ('๋ผ๋ฉด', 2), ('์šฐ๋™', 2), ('์งœ์žฅ๋ฉด', 2), 
('์งฌ๋ฝ•', 2), ('๋ณถ์Œ๋ฐฅ', 2), ('๋–ก๋ณถ์ด', 2), ('๋ผ๋ฉด', 2), 
('์šฐ๋™', 2), ('์งœ์žฅ๋ฉด', 2), ('์งฌ๋ฝ•', 2), ('๋ณถ์Œ๋ฐฅ', 2)]

 

Cache(), Persist() ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š์•˜๊ธฐ์—, ์œ„์˜ ์ฝ”๋“œ์—์„  key_foods๊ฐ€ ๋‘ ๋ฒˆ ๋งŒ๋“ค์–ด์ง„๋‹ค. take() ํ•จ์ˆ˜๋ฅผ ์‹คํ–‰ํ•˜๋ฉด์„œ 1ํšŒ ๋งŒ๋“ค์–ด์ง€๊ณ , mapValues๋ฅผ ์‹คํ–‰ํ•˜๋ฉด์„œ ๋˜๋‹ค์‹œ ํ•œ๋ฒˆ ๋งŒ๋“ค์–ด์ง€๋Š” ๊ฒƒ์ด๋‹ค. ์ด๋Ÿฌํ•œ ๋น„ํšจ์œจ์€ ์†๋„๋ฅผ ์ €ํ•˜์‹œํ‚ฌ ์ˆ˜ ์žˆ๋‹ค. 

 

 

 

 

Cache(), Persist()๋ฅผ ํ™œ์šฉํ•œ ๊ฒฝ์šฐ.

 

def parse(row):
    return (row, 1)

key_foods = foods.map(parse).persist()
key_foods.collect()


[('์งœ์žฅ๋ฉด', 1),
 ('์งฌ๋ฝ•', 1),
 ('๋ณถ์Œ๋ฐฅ', 1),
 ('๋–ก๋ณถ์ด', 1),
 ('๋ผ๋ฉด', 1),
 ('์šฐ๋™', 1),
 ('์งœ์žฅ๋ฉด', 1),
 ('์งฌ๋ฝ•', 1),
 ('๋ณถ์Œ๋ฐฅ', 1),
 ('๋–ก๋ณถ์ด', 1),
 ('๋ผ๋ฉด', 1),
 ('์šฐ๋™', 1),
 ('์งœ์žฅ๋ฉด', 1),
 ('์งฌ๋ฝ•', 1),
 ('๋ณถ์Œ๋ฐฅ', 1)]

 

map() ํ•จ์ˆ˜๋ฅผ ์ ์šฉํ•˜๊ณ , ๊ณง๋ฐ”๋กœ persist() ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•ด ๋ฉ”๋ชจ๋ฆฌ์— ์ €์žฅํ•˜๋„๋ก ํ–ˆ๋‹ค.

 

result1 = key_foods.take(3)
result2 = key_foods.mapValues(lambda x: x + 1).collect()
print(result2)


[('์งœ์žฅ๋ฉด', 2), ('์งฌ๋ฝ•', 2), ('๋ณถ์Œ๋ฐฅ', 2), ('๋–ก๋ณถ์ด', 2), 
('๋ผ๋ฉด', 2), ('์šฐ๋™', 2), ('์งœ์žฅ๋ฉด', 2), ('์งฌ๋ฝ•', 2), 
('๋ณถ์Œ๋ฐฅ', 2), ('๋–ก๋ณถ์ด', 2), ('๋ผ๋ฉด', 2), ('์šฐ๋™', 2), 
('์งœ์žฅ๋ฉด', 2), ('์งฌ๋ฝ•', 2), ('๋ณถ์Œ๋ฐฅ', 2)]

 

์ด์ œ๋Š” key_foods ์—ฐ์‚ฐ์ด ํ•œ ๋ฒˆ๋งŒ ์‹คํ–‰๋˜๊ณ , ๋ฉ”๋ชจ๋ฆฌ์— ์žˆ๋˜ key_foods๊ฐ€ ํ™œ์šฉ๋˜๊ธฐ๋งŒ ํ•  ๋ฟ์ด๋‹ค. 

 

 

 

 

์ด์™ธ์—๋„ ๋จธ์‹ ๋Ÿฌ๋‹ ํ•™์Šต๊ฐ™์ด, ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃจ๋Š” task๊ฐ€ ๋ฐ˜๋ณต๋˜๋Š” ๊ฒฝ์šฐ๊ฐ€ ๋งŽ์„ ๋•Œ๋„ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. 

 

 

์ถœ์ฒ˜: ์œ„ํ‚ค๋ฐฑ๊ณผ

 

points = sc.textFile('...').map(parsePoint).cache()

for i in range(ITERATIONS):
	gradient = points.map(gradient_descent).reduce(lambda x, y: (x + y) / n)
    w -= gradient * learning_rate

cache() ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•˜์ง€ ์•Š์•˜๋‹ค๋ฉด, gradient๋ฅผ ๊ณ„์‚ฐํ•  ๋•Œ๋งˆ๋‹ค points๋ฅผ ์—ฐ์‚ฐํ•˜๋Š” ๋”์ฐํ•œ ์ƒํ™ฉ์ด ์—ฐ์ถœ๋˜์—ˆ์„ ๊ฒƒ์ด๋‹ค. 

 

 

 

๋‹ค์Œ์€ Cache(), Persist()๋ฅผ ํ™œ์šฉํ•  ๋•Œ, ์ฐธ๊ณ ์‚ฌํ•ญ์ด๋‹ค. 

 

Storage Level

๋‹ค์–‘ํ•œ ์Šคํ† ๋ฆฌ์ง€ ๋ ˆ๋ฒจ์ด ์กด์žฌํ•œ๋‹ค. ์‚ฌ์šฉ์ž๋Š” ํŠน์ • ๋ ˆ๋ฒจ์„ ์ง€์ •ํ•ด์„œ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค.

1) MEMORY_ONLY - ๋ฉ”๋ชจ๋ฆฌ์—๋งŒ ์ €์žฅ

2) MEMORY_AND_DISK - ๋ฉ”๋ชจ๋ฆฌ์™€ ๋””์Šคํฌ์— ์ €์žฅ

3) MEMORY_ONLY_SER - ๋ฉ”๋ชจ๋ฆฌ์—๋งŒ ์‹œ๋ฆฌ์–ผ ๋ผ์ด์ ธ ์‹œํ‚จ ๋‹ค์Œ ์ €์žฅ

4) MEMORY_AND_DISK_SER - ๋ฉ”๋ชจ๋ฆฌ์™€ ๋””์Šคํฌ์— ์‹œ๋ฆฌ์–ผ๋ผ์ด์ ธ ์‹œํ‚จ ๋‹ค์Œ ์ €์žฅ

5) DISK_ONLY - ๋””์Šคํฌ์—๋งŒ ์ €์žฅ

 

 

Cache

Cache()์˜ ์„ค์ • ์˜ต์…˜์ด๋‹ค.

1) Default Storage Level ์‚ฌ์šฉ

2) RDD - MEMORY_ONLY

3) DataFrame - MEMORY_AND_DISK

 

 

Persist

Persist()์˜ ์„ค์ • ์˜ต์…˜์ด๋‹ค.

1) Storage Level์„ ์‚ฌ์šฉ์ž๊ฐ€ ์›ํ•˜๋Š” ๋Œ€๋กœ ์ง€์ • ๊ฐ€๋Šฅ