2022. 5. 1. 11:25ㆍ🛠 Data Engineering/Apache Spark
Spark Operation = Transformations + Actions
Tranformations는 결괏값으로 새로운 RDD를 반환하고, Lazy Execution을 한다는 점이 특징이다. Actions는 결괏값을 연산하여 출력하거나 저장하며, Eager Execution을 한다는 점이 특징이다. Transformations와 Actions 함수들을 더 자세히 살펴보자.
conf 설정
import os
import findspark
findspark.init(os.environ.get('SPARK_HOME'))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
conf = SparkConf().setMaster('local').setAppName('my-RDD-transformation-action')
sc = SparkContext(conf=conf)
# RDD 생성
foods = sc.parallelize(['짜장면', '짬뽕', '볶음밥', '떡볶이', '라면', '우동', '짜장면', '짬뽕', '볶음밥', '떡볶이', '라면', '우동', '짜장면', '짬뽕', '볶음밥'])
Actions
Actions 예시
1) collect()
# action 1
# 사실 이 함수는 모든 데이터를 다 가져오기 때문에 실전에선 지양해야 한다.
foods.collect()
['짜장면',
'짬뽕',
'볶음밥',
'떡볶이',
'라면',
'우동',
'짜장면',
'짬뽕',
'볶음밥',
'떡볶이',
'라면',
'우동',
'짜장면',
'짬뽕',
'볶음밥']
2) count()
# action 2
# 요소 개수를 센다.
foods.count()
15
3) countByValue()
# action 3
# Value별 개수를 센다.
foods.countByValue()
defaultdict(int, {'짜장면': 3, '짬뽕': 3, '볶음밥': 3, '떡볶이': 2, '라면': 2, '우동': 2})
4) take()
# action 4
# 무작위로 특정수만큼 뽑는다.
foods.take(3)
['짜장면', '짬뽕', '볶음밥']
5) first()
# action 5
# 첫 번재 요소를 뽑는다.
foods.first()
'짜장면'
6) foreach()
# action 6
# 특정 함수를 각 요소에 대해 수행한다.
# foreach - work load에서 이뤄지므로, Script엔 나타나지 않는다.
foods.foreach(lambda x: print(x))
Transformations
Transformations = Narrow + Wide
Transformation은 Narrow 변형과 Wide 변형이 존재한다.
Narrow Transformations
1:1 변환
filter(), map(), flatMap(), sample(), union()
1열을 조적하기 위해 다른 열/파티션의 데이터를 참고할 필요가 없다.
No Shuffling
1) map()
# transformation 1
# narrow - map
# RDD 요소들에 함수 적용
sc.parallelize([1,2,3]).map(lambda x: x+2).collect()
[3, 4, 5]
2) flatMap()
# transformation 2
# narrow - flatMap
# 데이터에 함수를 적용한 후, 쫙 펼친다.
movies = [
'그린 북',
'해리포터',
'백 투 더 퓨처',
'반지의 제왕'
]
movies_rdd = sc.parallelize(movies)
flatMovies = movies_rdd.flatMap(lambda x: x.split(' '))
flatMovies.collect()
['그린', '북', '해리포터', '백', '투', '더', '퓨처', '반지의', '제왕']
3) filter()
# transformation 3
# narrow - filter
# 조건에 맞는 요소만 뽑아낸다.
filteredMovies = flatMovies.filter(lambda x: x != '해리포터')
filteredMovies.collect()
['그린', '북', '백', '투', '더', '퓨처', '반지의', '제왕']
4) intersection(), union(), substraction()
# transformation 4
# narrow - intersection, union, substraction
# 공통점 찾기, 합치기, 빼기
save1 = sc.parallelize([1,2,3,4])
save2 = sc.parallelize([4,5,6,7,8,9,10])
---
save1.intersection(save2).collect()
[4]
---
save1.union(save2).collect()
[1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]
---
save1.subtract(save2).collect()
[2, 1, 3]
5) sample()
# transformation 5
# narrow - sample
# 무작위로 특정 비율만큼 뽑아내기
num_union = save1.union(save2)
num_union.collect()
[1, 2, 3, 4, 4, 5, 6, 7, 8, 9, 10]
---
# False를 인자로 넘기면, RDD에서 중복으로 특정 요소를 뽑을 수 없다.
num_union.sample(False, .2).collect()
[1, 3, 8]
---
# 확률적으로 뽑기 때문에 길이와 내용을 보장할 수 없다.
# seed 라는 파라미터를 통해서 고정 가능
num_union.sample(True, .5, seed=2).collect()
[1, 1, 2, 4, 4, 5, 6, 7, 9]
Wide Transformations
Shuffling
intersection and join, distinct, cartesian, reduceByKey(), groupByKey()
아웃풋 RDD의 파티션에 다른 파티션의 데이터가 들어갈 수 있다.
1) groupBy
# transformation 6
# wide - groupBy
# RDD 생성
foods = sc.parallelize(['짜장면', '짬뽕', '볶음밥', '떡볶이', '라면', '우동', '짜장면', '짬뽕', '볶음밥', '떡볶이', '라면', '우동', '짜장면', '짬뽕', '볶음밥'])
foods.collect()
['짜장면',
'짬뽕',
'볶음밥',
'떡볶이',
'라면',
'우동',
'짜장면',
'짬뽕',
'볶음밥',
'떡볶이',
'라면',
'우동',
'짜장면',
'짬뽕',
'볶음밥']
---
# lambda를 통해 지정한 조건을 기준으로 group을 묶은 후, 데이터를 변형한다.
foodGroup = foods.groupBy(lambda x: x[0])
result = foodGroup.collect()
for (k,v) in result:
print(k, list(v))
짜 ['짜장면', '짜장면', '짜장면']
짬 ['짬뽕', '짬뽕', '짬뽕']
볶 ['볶음밥', '볶음밥', '볶음밥']
떡 ['떡볶이', '떡볶이']
라 ['라면', '라면']
우 ['우동', '우동']
또 다른 groupBy 예시.
# 2로 나누었을 때, 나머지를 기준으로 grouping 해준다.
sura = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
sura.groupBy(lambda x: x % 2).collect()
[(1, <pyspark.resultiterable.ResultIterable at 0x2623ea46f88>),
(0, <pyspark.resultiterable.ResultIterable at 0x2623ea46e88>)]
---
list(sura.groupBy(lambda x: x % 2).collect()[1][1])
[2, 4, 6, 8, 10]
지금까지 Transformations와 Actions 함수들을 살펴보았다.
이 함수들을 데이터 정제, 변형에 활용하면 된다.
다음 포스팅에선 Cache & Persist를 주제로 만나보자.
'🛠 Data Engineering > Apache Spark' 카테고리의 다른 글
[Spark] Reduction 개념 및 코드 (0) | 2022.05.01 |
---|---|
[Spark] Spark 속도 최적화, Cashe() & Persist() (0) | 2022.05.01 |
[Spark] Key-Value RDD 개념 및 코드 (0) | 2022.05.01 |
[Spark] 분산/병렬 처리할 때 주의할 점 (0) | 2022.05.01 |
[Spark] RDD 개념 및 코드 (0) | 2022.04.24 |