[Spark] Transformations & Actions 함수

2022. 5. 1. 11:25🛠 Data Engineering/Apache Spark

 

 

 

Spark Operation = Transformations + Actions

 

Tranformations는 결괏값으로 새로운 RDD를 반환하고, Lazy Execution을 한다는 점이 특징이다. Actions는 결괏값을 연산하여 출력하거나 저장하며, Eager Execution을 한다는 점이 특징이다. Transformations와 Actions 함수들을 더 자세히 살펴보자.

 

 

 

 

conf 설정

 

import os
import findspark
findspark.init(os.environ.get('SPARK_HOME'))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()

conf = SparkConf().setMaster('local').setAppName('my-RDD-transformation-action')
sc = SparkContext(conf=conf)

# RDD 생성
foods = sc.parallelize(['짜장면', '짬뽕', '볶음밥', '떡볶이', '라면', '우동', '짜장면', '짬뽕', '볶음밥', '떡볶이', '라면', '우동', '짜장면', '짬뽕', '볶음밥'])

 

 

 

 

 

 

 

Actions


 

Actions 예시 

 

1) collect()

 

# action 1
# 사실 이 함수는 모든 데이터를 다 가져오기 때문에 실전에선 지양해야 한다.
foods.collect()


['짜장면',
 '짬뽕',
 '볶음밥',
 '떡볶이',
 '라면',
 '우동',
 '짜장면',
 '짬뽕',
 '볶음밥',
 '떡볶이',
 '라면',
 '우동',
 '짜장면',
 '짬뽕',
 '볶음밥']

 

 

2) count()

 

# action 2
# 요소 개수를 센다.
foods.count()


15

 

 

3) countByValue()

 

# action 3
# Value별 개수를 센다.
foods.countByValue()


defaultdict(int, {'짜장면': 3, '짬뽕': 3, '볶음밥': 3, '떡볶이': 2, '라면': 2, '우동': 2})

 

 

4) take()

 

# action 4
# 무작위로 특정수만큼 뽑는다.
foods.take(3)


['짜장면', '짬뽕', '볶음밥']

 

 

5) first()

 

# action 5
# 첫 번재 요소를 뽑는다.
foods.first()


'짜장면'

 

 

6) foreach()

 

# action 6
# 특정 함수를 각 요소에 대해 수행한다.
# foreach - work load에서 이뤄지므로, Script엔 나타나지 않는다.
foods.foreach(lambda x: print(x))

 

 

 

 

 

 

 

Transformations


Transformations = Narrow + Wide

Transformation은 Narrow 변형과 Wide 변형이 존재한다. 

 

 

 

 

Narrow Transformations

 

 

 

1:1 변환

filter(), map(), flatMap(), sample(), union()

1열을 조적하기 위해 다른 열/파티션의 데이터를 참고할 필요가 없다.

No Shuffling

 

 

 

1) map()

 

# transformation 1
# narrow - map
# RDD 요소들에 함수 적용
sc.parallelize([1,2,3]).map(lambda x: x+2).collect()


[3, 4, 5]

 

 

2) flatMap()

 

# transformation 2
# narrow - flatMap
# 데이터에 함수를 적용한 후, 쫙 펼친다.
movies = [
    '그린 북',
    '해리포터',
    '백 투 더 퓨처',
    '반지의 제왕'
]

movies_rdd = sc.parallelize(movies)
flatMovies = movies_rdd.flatMap(lambda x: x.split(' '))
flatMovies.collect()


['그린', '북', '해리포터', '백', '투', '더', '퓨처', '반지의', '제왕']

 

 

3) filter()

 

# transformation 3
# narrow - filter
# 조건에 맞는 요소만 뽑아낸다.
filteredMovies = flatMovies.filter(lambda x: x != '해리포터')
filteredMovies.collect()


['그린', '북', '백', '투', '더', '퓨처', '반지의', '제왕']

 

 

4) intersection(), union(), substraction()

 

# transformation 4
# narrow - intersection, union, substraction
# 공통점 찾기, 합치기, 빼기
save1 = sc.parallelize([1,2,3,4])
save2 = sc.parallelize([4,5,6,7,8,9,10])


---

save1.intersection(save2).collect()

[4]

---

save1.union(save2).collect()

[1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]

---

save1.subtract(save2).collect()

[2, 1, 3]

 

 

5) sample()

 

# transformation 5
# narrow - sample
# 무작위로 특정 비율만큼 뽑아내기
num_union = save1.union(save2)
num_union.collect()

[1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]

---

# False를 인자로 넘기면, RDD에서 중복으로 특정 요소를 뽑을 수 없다.
num_union.sample(False, .2).collect()

[1, 3, 8]

---

# 확률적으로 뽑기 때문에 길이와 내용을 보장할 수 없다.
# seed 라는 파라미터를 통해서 고정 가능
num_union.sample(True, .5, seed=2).collect()

[1, 1, 2, 4, 4, 5, 6, 7, 9]

 

 

 

 

 

Wide Transformations

 

 

 

Shuffling

intersection and join, distinct, cartesian, reduceByKey(), groupByKey()

아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있다.

 

 

 

1) groupBy

 

# transformation 6
# wide - groupBy
# RDD 생성
foods = sc.parallelize(['짜장면', '짬뽕', '볶음밥', '떡볶이', '라면', '우동', '짜장면', '짬뽕', '볶음밥', '떡볶이', '라면', '우동', '짜장면', '짬뽕', '볶음밥'])
foods.collect()


['짜장면',
 '짬뽕',
 '볶음밥',
 '떡볶이',
 '라면',
 '우동',
 '짜장면',
 '짬뽕',
 '볶음밥',
 '떡볶이',
 '라면',
 '우동',
 '짜장면',
 '짬뽕',
 '볶음밥']
 
 
---

# lambda를 통해 지정한 조건을 기준으로 group을 묶은 후, 데이터를 변형한다.
foodGroup = foods.groupBy(lambda x: x[0])
result = foodGroup.collect()

for (k,v) in result:
    print(k, list(v))



짜 ['짜장면', '짜장면', '짜장면']
짬 ['짬뽕', '짬뽕', '짬뽕']
볶 ['볶음밥', '볶음밥', '볶음밥']
떡 ['떡볶이', '떡볶이']
라 ['라면', '라면']
우 ['우동', '우동']

 

또 다른 groupBy 예시.

 

# 2로 나누었을 때, 나머지를 기준으로 grouping 해준다.
sura = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
sura.groupBy(lambda x: x % 2).collect()


[(1, <pyspark.resultiterable.ResultIterable at 0x2623ea46f88>),
 (0, <pyspark.resultiterable.ResultIterable at 0x2623ea46e88>)]
 
 
 ---
 
list(sura.groupBy(lambda x: x % 2).collect()[1][1])

[2, 4, 6, 8, 10]

 

 

 

지금까지 Transformations와 Actions 함수들을 살펴보았다. 

이 함수들을 데이터 정제, 변형에 활용하면 된다. 

다음 포스팅에선 Cache & Persist를 주제로 만나보자.