[Spark] Key-Value RDD ๊ฐœ๋… ๋ฐ ์ฝ”๋“œ

2022. 5. 1. 10:08ใ†๐Ÿ›  Data Engineering/Apache Spark

 

 

 

Key-Value RDD

(Key, Value) ์Œ์„ ๊ฐ€์ง€๋Š” RDD๋กœ, Paris RDD๋ผ๊ณ  ๋ถˆ๋ฆฌ๊ธฐ๋„ ํ•œ๋‹ค.

Single Value RDD์™€ Key-Value RDD๋Š” ํ™œ์šฉ ์–‘์ƒ์ด ์กฐ๊ธˆ ๋‹ค๋ฅธ๋ฐ, ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

Single Value RDD

์˜ˆ) ํŠน์ • ๋‹จ์–ด ์ˆ˜ ์„ธ๊ธฐ

 

Key-Value RDD

์˜ˆ) ํŠน์ • ๋“œ๋ผ๋งˆ๊ฐ€ ๋ฐ›์€ ๋ณ„์ ์˜ ํ‰๊ท 

 

 

RDD๊ฐ€ ๋ฌด์—‡์ธ์ง€ ๋ชจ๋ฅด๊ฒ ๋‹ค๋ฉด? https://mengu.tistory.com/27?category=932924

 

[Spark] RDD ๊ฐœ๋… ๋ฐ ์ฝ”๋“œ

RDD(Resilient Distributed Dataset)๋ž€? ์ง์—ญํ•˜๋ฉด ํƒ„๋ ฅ์ ์ธ ๋ถ„์‚ฐ ๋ฐ์ดํ„ฐ์…‹์ด๋‹ค. ๋ง ๊ทธ๋Œ€๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ํด๋Ÿฌ์Šคํ„ฐ์— ๋ถ„์‚ฐํ•˜์—ฌ ์ €์žฅํ•˜๋ฉฐ, ๊ทธ๋Ÿฐ ๋ฐ์ดํ„ฐ๋ฅผ ํƒ„๋ ฅ์ ์œผ๋กœ ์ด์šฉ(์ด์Šˆ ๋ฐœ์ƒ ์‹œ ๊ณผ๊ฑฐ ๋ฐ์ดํ„ฐ๋กœ ๋Œ์•„๊ฐ€๊ธฐ

mengu.tistory.com

 

 

Key-Value RDD์˜ ๊ฒฝ์šฐ, Key ๊ฐ’์„ ๊ธฐ์ค€์œผ๋กœ Value ํ†ต๊ณ„๋ฅผ ๋‚ด๊ฑฐ๋‚˜ ๋ณ€ํ˜•ํ•  ์ˆ˜ ์žˆ๋‹ค.

๋‹ค๋ฅธ ์˜ˆ์‹œ๋กœ๋Š”,

 

์ง€์—ญ๋ณ„ ์•”ํ™˜์ž ์ˆ˜๋ฅผ ์•Œ๊ณ  ์‹ถ์„ ๋•Œ, ๋‹ค์Œ๊ณผ ๊ฐ™์€ Key-Value RDD๋ฅผ ์ด์šฉํ•˜๋ฉด ์†์‰ฝ๊ฒŒ ๊ตฌํ•  ์ˆ˜ ์žˆ๋‹ค.

Key: ์ง€์—ญ ID

Value: ์•”ํ™˜์ž ์ˆ˜

 

์ฝ”๋“œ๋กœ ๋” ์ž์„ธํžˆ ์‚ดํŽด๋ณด์ž.

 

 

 

 

 

Code


 

๋จผ์ €, ์ฝ”๋“œ ์‹ค์Šต์„ ์œ„ํ•ด conf๋ฅผ ์„ค์ •ํ•ด์ฃผ์ž. 

 

import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler; faulthandler.enable()


# Spark ์„ค์ •
conf = SparkConf().setMaster('local').setAppName('category-review-averag')
sc = SparkContext(conf=conf)

 

 

 

1) Single RDD ์ƒ์„ฑ

 

single_rdd = sc.parallelize(['์งœ์žฅ๋ฉด', '์งฌ๋ฝ•', '๊น€๋ฐฅ', '์งฌ๋ฝ•', '๊น€๋ฐฅ', '๋–ก๋ณถ์ด'])
single_rdd.collect()


['์งœ์žฅ๋ฉด', '์งฌ๋ฝ•', '๊น€๋ฐฅ', '์งฌ๋ฝ•', '๊น€๋ฐฅ', '๋–ก๋ณถ์ด']

sc.parallelize() ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•˜์—ฌ Single RDD๋ฅผ ์ƒ์„ฑํ•œ๋‹ค.

RDD๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ Lazy Execution์ด๊ธฐ ๋•Œ๋ฌธ์—, collect() ํ•จ์ˆ˜๋กœ action์„ ํ•ด์ค˜์•ผ์ง€ ํ˜•ํƒœ๋ฅผ ๋ณผ ์ˆ˜ ์žˆ๋‹ค. 

 

 

 

 

2) Single RDD -> Key-Value RDD 

 

def single2key(rdd):
    foods = rdd
    return (foods, 1)

key_rdd = single_rdd.map(single2key)
key_rdd.collect()


[('์งœ์žฅ๋ฉด', 1), ('์งฌ๋ฝ•', 1), ('๊น€๋ฐฅ', 1), ('์งฌ๋ฝ•', 1), ('๊น€๋ฐฅ', 1), ('๋–ก๋ณถ์ด', 1)]

single2key() ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด (food, 1)์„ ๋ฐ˜ํ™˜ํ•˜๋„๋ก ํ–ˆ๊ณ , map() ํ•จ์ˆ˜๋กœ task๋ฅผ ์ˆ˜ํ–‰์‹œ์ผฐ๋‹ค. ๊ทธ ๊ฒฐ๊ณผ, (Key, Value) ํ˜•ํƒœ์˜ RDD๋ฅผ ์–ป์—ˆ๋‹ค. 

 

 

 

 

3) Key-Value RDD ํ™œ์šฉ

 

# reduceByKey()

count = key_rdd.reduceByKey(lambda a, b: a + b)
count.collect()


[('์งœ์žฅ๋ฉด', 1), ('์งฌ๋ฝ•', 2), ('๊น€๋ฐฅ', 2), ('๋–ก๋ณถ์ด', 1)]

reduceByKey() ํ•จ์ˆ˜๋Š” ๋ฐ์ดํ„ฐ๋ฅผ Key ๊ฐ’์„ ๊ธฐ์ค€์œผ๋กœ ์š”์•ฝํ•œ๋‹ค.

 

 

 

# mapValues()

count_mapvalues = count.mapValues(lambda x: (x,1))
count_mapvalues.collect()


[('์งœ์žฅ๋ฉด', (1, 1)), ('์งฌ๋ฝ•', (2, 1)), ('๊น€๋ฐฅ', (2, 1)), ('๋–ก๋ณถ์ด', (1, 1))]

Key-Value RDD์—์„œ Key๋ฅผ ๊ฑด๋“œ๋ฆฌ์ง€ ์•Š๋Š” ๊ฒฝ์šฐ, map() ํ•จ์ˆ˜๋ณด๋‹จ mapValues() ํ•จ์ˆ˜๋ฅผ ์จ์ฃผ๋Š” ๊ฒƒ์ด ์ข‹๋‹ค. 

value๋งŒ ๋ณ€ํ˜•ํ•จ์œผ๋กœ์จ ์ „์ฒด์ ์ธ ๋ฐ์ดํ„ฐ ํŒŒํ‹ฐ์…˜์€ ์œ ์ง€ํ•  ์ˆ˜ ์žˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. 

 

 

# mapValues()

count_values = count_mapvalues.mapValues(lambda x: (x[0]+x[1])/x[1])
count_values.collect()


[('์งœ์žฅ๋ฉด', 2.0), ('์งฌ๋ฝ•', 3.0), ('๊น€๋ฐฅ', 3.0), ('๋–ก๋ณถ์ด', 2.0)]

 

๊ฐ์ž mapValues()๋ฅผ ํ™œ์šฉํ•ด๋ณด์ž. 

์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„  Key-Value RDD์— ๋Œ€ํ•ด์„œ ์•Œ์•„๋ณด์•˜๋‹ค. 

๋‹ค์Œ ํฌ์ŠคํŒ…์—์„  Transformation, Action ํ•จ์ˆ˜๋“ค์„ ์ž์„ธํžˆ ์•Œ์•„๋ณด๋„๋ก ํ•˜๊ฒ ๋‹ค.