2022. 5. 1. 23:32ใ๐ Data Engineering/Apache Spark
Reduction
์์๋ค์ ๋ชจ์์ ํ๋๋ก ํฉ์น๋ ์์ ์ ๋งํ๋ค.
๋ง์ Spark ์ฐ์ฐ๋ค์ด Reduction์ด๋ผ๊ณ ๋ด๋ ๋ฌด๋ฐฉํ๋ค.
* ํ์ผ ์ ์ฅ, collect() ๋ฑ๊ณผ ๊ฐ์ด Reduction์ด ์๋ ์ก์ ๋ ์กด์ฌ.
์ฝ๋๋ก ์ค์ตํด๋ณด์.
conf ์ค์
import os
import findspark
findspark.init(os.environ.get('SPARK_HOME'))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
conf = SparkConf().setMaster('local').setAppName('my-RDD-transformation-action')
sc = SparkContext(conf=conf)
๋ํ์ ์ธ Reduction Actions
1) Reduction
RDD.reduce(function)
from operator import add
sc.parallelize([1,2,3,4,5]).reduce(add)
15
add ํจ์๋ฅผ ์ด์ฉํ์ฌ ๋ฐ์ดํฐ๋ฅผ ํฉ์ณค๋ค.
Partition์ ๋ฐ๋ผ ๊ฒฐ๊ณผ ๊ฐ์ด ๋ฌ๋ผ์ง๊ธฐ๋ ํ๋ค. ์๋ ์ฝ๋๋ฅผ ๋ณด์.
# Partition - 1
sc.parallelize([1,2,3,4], 1).reduce(lambda x,y: (x*2) + y)
26
----
# Partition - 2
sc.parallelize([1,2,3,4], 2).reduce(lambda x,y: (x*2) + y)
18
----
# Partition - 3
sc.parallelize([1,2,3,4], 3).reduce(lambda x,y: (x*2) + y)
18
์ด์ฒ๋ผ ํํฐ์ ์ด ์ด๋ป๊ฒ ๋๋๋์ ๋ฐ๋ผ ๊ฒฐ๊ณผ ๊ฐ์ด ๋ค๋ฅผ ์ ์์ผ๋ฏ๋ก, ์ ๊ฒฝ ์จ์ค์ผ ํ๋ค.
2) Fold
RDD.fold(zeroValue, function)
from operator import add
sc.parallelize([1,2,3,4,5]).fold(0, add)
15
reduce์ ๋ง์ฐฌ๊ฐ์ง๋ก ๋ฐ์ดํฐ๋ฅผ ํฉ์น๋ ํจ์์ด์ง๋ง, ์์์ด ์ด์ง ๋ค๋ฅด๋ค.
๊ธฐ๋ณธ ๊ฐ๊ณผ RDD๋ฅผ ๊ฐ์ด functionํด์ฃผ๋ ํจ์๋ค.
rdd = sc.parallelize([2,3,4], 4)
rdd.fold(1, lambda x, y: x+y)
# (1+1) + (1+2) + (1+3) + (1+4) = 14
14
๋ค๋ฅธ ํ์ฉ ์์
rdd = sc.parallelize([2,3,4], 4)
rdd.fold(3, lambda x, y: x*y)
# (3*3) * (3*2) * (3*3) * (3*4) = 5832
5832
3) GroupBy
RDD.groupBy(function)
rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd_group = rdd.groupBy(lambda x: x % 2).collect()
print(rdd_group)
[(1, <pyspark.resultiterable.ResultIterable at 0x134519253c8>),
(0, <pyspark.resultiterable.ResultIterable at 0x13452665ec8>)]
----
list(rdd_group[0][1])
[1, 3, 5, 7, 9]
์ฐ์ฐ ๊ธฐ์ค์ ํตํด ๋ฐ์ดํฐ๋ฅผ groupingํ๋ ์ํ์ด๋ค.
4) Aggregate
RDD.aggregate(zeroValue, seqOp, combOp)
- zeroValue: ๊ฐ ํํฐ์ ์์ ๋์ ํ ์์ ๊ฐ
- seqOp: ํ์ ๋ณ๊ฒฝ ํจ์
- combOp: ํฉ์น๋ ํจ์
seqOp = (lambda x,y: (x[0] + y, x[1] + 1))
combOp = (lambda x,y: (x[0] + y[0], x[1] + y[1]))
sc.parallelize([1,2,3,4,5]).aggregate((0,0), seqOp, combOp)
(15,5)
๋ง์ด ์ฐ์ด๋ reduction action.
๋๋ถ๋ถ์ ๋ฐ์ดํฐ ์์ ์ ํฌ๊ณ ๋ณต์กํ ๋ฐ์ดํฐ ํ์ -> ์ ์ ๋ ๋ฐ์ดํฐ๋ก ๋ณํ.
'๐ Data Engineering > Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[SparkSQL] DataFrame ๋ค๋ฃจ๊ธฐ (0) | 2022.05.07 |
---|---|
[Spark] SparkSQL ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.06 |
[Spark] Spark ์๋ ์ต์ ํ, Cashe() & Persist() (0) | 2022.05.01 |
[Spark] Transformations & Actions ํจ์ (0) | 2022.05.01 |
[Spark] Key-Value RDD ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.01 |