[Spark] Reduction ๊ฐœ๋… ๋ฐ ์ฝ”๋“œ

2022. 5. 1. 23:32ใ†๐Ÿ›  Data Engineering/Apache Spark

 

 

Reduction

์š”์†Œ๋“ค์„ ๋ชจ์•„์„œ ํ•˜๋‚˜๋กœ ํ•ฉ์น˜๋Š” ์ž‘์—…์„ ๋งํ•œ๋‹ค.

๋งŽ์€ Spark ์—ฐ์‚ฐ๋“ค์ด Reduction์ด๋ผ๊ณ  ๋ด๋„ ๋ฌด๋ฐฉํ•˜๋‹ค.

* ํŒŒ์ผ ์ €์žฅ, collect() ๋“ฑ๊ณผ ๊ฐ™์ด Reduction์ด ์•„๋‹Œ ์•ก์…˜๋„ ์กด์žฌ.

์ฝ”๋“œ๋กœ ์‹ค์Šตํ•ด๋ณด์ž.

 

 

 

 

 

conf ์„ค์ •

 

import os
import findspark
findspark.init(os.environ.get('SPARK_HOME'))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()

conf = SparkConf().setMaster('local').setAppName('my-RDD-transformation-action')
sc = SparkContext(conf=conf)

 

 

 

 

 

๋Œ€ํ‘œ์ ์ธ Reduction Actions


 

1) Reduction

 

RDD.reduce(function)

 

from operator import add
sc.parallelize([1,2,3,4,5]).reduce(add)


15

 

 

add ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ํ•ฉ์ณค๋‹ค.

Partition์— ๋”ฐ๋ผ ๊ฒฐ๊ณผ ๊ฐ’์ด ๋‹ฌ๋ผ์ง€๊ธฐ๋„ ํ•œ๋‹ค. ์•„๋ž˜ ์ฝ”๋“œ๋ฅผ ๋ณด์ž. 

 

 

 

# Partition - 1
sc.parallelize([1,2,3,4], 1).reduce(lambda x,y: (x*2) + y)


26

----
# Partition - 2
sc.parallelize([1,2,3,4], 2).reduce(lambda x,y: (x*2) + y)


18

----
# Partition - 3
sc.parallelize([1,2,3,4], 3).reduce(lambda x,y: (x*2) + y)


18

 

 

์ด์ฒ˜๋Ÿผ ํŒŒํ‹ฐ์…˜์ด ์–ด๋–ป๊ฒŒ ๋‚˜๋‰˜๋ƒ์— ๋”ฐ๋ผ ๊ฒฐ๊ณผ ๊ฐ’์ด ๋‹ค๋ฅผ ์ˆ˜ ์žˆ์œผ๋ฏ€๋กœ, ์‹ ๊ฒฝ ์จ์ค˜์•ผ ํ•œ๋‹ค.

 

 

 

 

2) Fold

 

RDD.fold(zeroValue, function)

 

 

from operator import add
sc.parallelize([1,2,3,4,5]).fold(0, add)


15

 

 

reduce์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ํ•ฉ์น˜๋Š” ํ•จ์ˆ˜์ด์ง€๋งŒ, ์–‘์ƒ์ด ์‚ด์ง ๋‹ค๋ฅด๋‹ค.

๊ธฐ๋ณธ ๊ฐ’๊ณผ RDD๋ฅผ ๊ฐ™์ด functionํ•ด์ฃผ๋Š” ํ•จ์ˆ˜๋‹ค. 

 

 

rdd = sc.parallelize([2,3,4], 4)
rdd.fold(1, lambda x, y: x+y)


# (1+1) + (1+2) + (1+3) + (1+4) = 14
14

 

 

๋‹ค๋ฅธ ํ™œ์šฉ ์˜ˆ์‹œ

 

 

rdd = sc.parallelize([2,3,4], 4)
rdd.fold(3, lambda x, y: x*y)


# (3*3) * (3*2) * (3*3) * (3*4) = 5832
5832

 

 

 

 

3) GroupBy

 

RDD.groupBy(function)

 

 

rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
rdd_group = rdd.groupBy(lambda x: x % 2).collect()
print(rdd_group)


[(1, <pyspark.resultiterable.ResultIterable at 0x134519253c8>),
 (0, <pyspark.resultiterable.ResultIterable at 0x13452665ec8>)]
 
 ----
 
 list(rdd_group[0][1])
 
 
 [1, 3, 5, 7, 9]

 

 

์—ฐ์‚ฐ ๊ธฐ์ค€์„ ํ†ตํ•ด ๋ฐ์ดํ„ฐ๋ฅผ groupingํ•˜๋Š” ์‹œํ–‰์ด๋‹ค.

 

 

 

 

4) Aggregate

 

RDD.aggregate(zeroValue, seqOp, combOp)

- zeroValue: ๊ฐ ํŒŒํ‹ฐ์…˜์—์„œ ๋ˆ„์ ํ•  ์‹œ์ž‘ ๊ฐ’

- seqOp: ํƒ€์ž… ๋ณ€๊ฒฝ ํ•จ์ˆ˜

- combOp: ํ•ฉ์น˜๋Š” ํ•จ์ˆ˜

 

 

seqOp = (lambda x,y: (x[0] + y, x[1] + 1))
combOp = (lambda x,y: (x[0] + y[0], x[1] + y[1]))
sc.parallelize([1,2,3,4,5]).aggregate((0,0), seqOp, combOp)


(15,5)

 

 

๋งŽ์ด ์“ฐ์ด๋Š” reduction action.

๋Œ€๋ถ€๋ถ„์˜ ๋ฐ์ดํ„ฐ ์ž‘์—…์€ ํฌ๊ณ  ๋ณต์žกํ•œ ๋ฐ์ดํ„ฐ ํƒ€์ž… -> ์ •์ œ๋œ ๋ฐ์ดํ„ฐ๋กœ ๋ณ€ํ™˜.