2022. 5. 1. 10:08ใ๐ Data Engineering/Apache Spark
Key-Value RDD
(Key, Value) ์์ ๊ฐ์ง๋ RDD๋ก, Paris RDD๋ผ๊ณ ๋ถ๋ฆฌ๊ธฐ๋ ํ๋ค.
Single Value RDD์ Key-Value RDD๋ ํ์ฉ ์์์ด ์กฐ๊ธ ๋ค๋ฅธ๋ฐ, ๋ค์๊ณผ ๊ฐ๋ค.
Single Value RDD
์) ํน์ ๋จ์ด ์ ์ธ๊ธฐ
Key-Value RDD
์) ํน์ ๋๋ผ๋ง๊ฐ ๋ฐ์ ๋ณ์ ์ ํ๊ท
RDD๊ฐ ๋ฌด์์ธ์ง ๋ชจ๋ฅด๊ฒ ๋ค๋ฉด? https://mengu.tistory.com/27?category=932924
Key-Value RDD์ ๊ฒฝ์ฐ, Key ๊ฐ์ ๊ธฐ์ค์ผ๋ก Value ํต๊ณ๋ฅผ ๋ด๊ฑฐ๋ ๋ณํํ ์ ์๋ค.
๋ค๋ฅธ ์์๋ก๋,
์ง์ญ๋ณ ์ํ์ ์๋ฅผ ์๊ณ ์ถ์ ๋, ๋ค์๊ณผ ๊ฐ์ Key-Value RDD๋ฅผ ์ด์ฉํ๋ฉด ์์ฝ๊ฒ ๊ตฌํ ์ ์๋ค.
Key: ์ง์ญ ID
Value: ์ํ์ ์
์ฝ๋๋ก ๋ ์์ธํ ์ดํด๋ณด์.
Code
๋จผ์ , ์ฝ๋ ์ค์ต์ ์ํด conf๋ฅผ ์ค์ ํด์ฃผ์.
import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler; faulthandler.enable()
# Spark ์ค์
conf = SparkConf().setMaster('local').setAppName('category-review-averag')
sc = SparkContext(conf=conf)
1) Single RDD ์์ฑ
single_rdd = sc.parallelize(['์ง์ฅ๋ฉด', '์งฌ๋ฝ', '๊น๋ฐฅ', '์งฌ๋ฝ', '๊น๋ฐฅ', '๋ก๋ณถ์ด'])
single_rdd.collect()
['์ง์ฅ๋ฉด', '์งฌ๋ฝ', '๊น๋ฐฅ', '์งฌ๋ฝ', '๊น๋ฐฅ', '๋ก๋ณถ์ด']
sc.parallelize() ํจ์๋ฅผ ์ด์ฉํ์ฌ Single RDD๋ฅผ ์์ฑํ๋ค.
RDD๋ ๊ธฐ๋ณธ์ ์ผ๋ก Lazy Execution์ด๊ธฐ ๋๋ฌธ์, collect() ํจ์๋ก action์ ํด์ค์ผ์ง ํํ๋ฅผ ๋ณผ ์ ์๋ค.
2) Single RDD -> Key-Value RDD
def single2key(rdd):
foods = rdd
return (foods, 1)
key_rdd = single_rdd.map(single2key)
key_rdd.collect()
[('์ง์ฅ๋ฉด', 1), ('์งฌ๋ฝ', 1), ('๊น๋ฐฅ', 1), ('์งฌ๋ฝ', 1), ('๊น๋ฐฅ', 1), ('๋ก๋ณถ์ด', 1)]
single2key() ํจ์๋ฅผ ํตํด (food, 1)์ ๋ฐํํ๋๋ก ํ๊ณ , map() ํจ์๋ก task๋ฅผ ์ํ์์ผฐ๋ค. ๊ทธ ๊ฒฐ๊ณผ, (Key, Value) ํํ์ RDD๋ฅผ ์ป์๋ค.
3) Key-Value RDD ํ์ฉ
# reduceByKey()
count = key_rdd.reduceByKey(lambda a, b: a + b)
count.collect()
[('์ง์ฅ๋ฉด', 1), ('์งฌ๋ฝ', 2), ('๊น๋ฐฅ', 2), ('๋ก๋ณถ์ด', 1)]
reduceByKey() ํจ์๋ ๋ฐ์ดํฐ๋ฅผ Key ๊ฐ์ ๊ธฐ์ค์ผ๋ก ์์ฝํ๋ค.
# mapValues()
count_mapvalues = count.mapValues(lambda x: (x,1))
count_mapvalues.collect()
[('์ง์ฅ๋ฉด', (1, 1)), ('์งฌ๋ฝ', (2, 1)), ('๊น๋ฐฅ', (2, 1)), ('๋ก๋ณถ์ด', (1, 1))]
Key-Value RDD์์ Key๋ฅผ ๊ฑด๋๋ฆฌ์ง ์๋ ๊ฒฝ์ฐ, map() ํจ์๋ณด๋จ mapValues() ํจ์๋ฅผ ์จ์ฃผ๋ ๊ฒ์ด ์ข๋ค.
value๋ง ๋ณํํจ์ผ๋ก์จ ์ ์ฒด์ ์ธ ๋ฐ์ดํฐ ํํฐ์ ์ ์ ์งํ ์ ์๊ธฐ ๋๋ฌธ์ด๋ค.
# mapValues()
count_values = count_mapvalues.mapValues(lambda x: (x[0]+x[1])/x[1])
count_values.collect()
[('์ง์ฅ๋ฉด', 2.0), ('์งฌ๋ฝ', 3.0), ('๊น๋ฐฅ', 3.0), ('๋ก๋ณถ์ด', 2.0)]
๊ฐ์ mapValues()๋ฅผ ํ์ฉํด๋ณด์.
์ด๋ฒ ํฌ์คํ ์์ Key-Value RDD์ ๋ํด์ ์์๋ณด์๋ค.
๋ค์ ํฌ์คํ ์์ Transformation, Action ํจ์๋ค์ ์์ธํ ์์๋ณด๋๋ก ํ๊ฒ ๋ค.
'๐ Data Engineering > Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Spark] Spark ์๋ ์ต์ ํ, Cashe() & Persist() (0) | 2022.05.01 |
---|---|
[Spark] Transformations & Actions ํจ์ (0) | 2022.05.01 |
[Spark] ๋ถ์ฐ/๋ณ๋ ฌ ์ฒ๋ฆฌํ ๋ ์ฃผ์ํ ์ (0) | 2022.05.01 |
[Spark] RDD ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.04.24 |
[Spark] Apache Spark ๊ฐ๋ ๋ฐ ๋ฒ์ (0) | 2022.04.23 |