2022. 5. 9. 12:26ใ๐ Data Engineering/Apache Spark
Spark Backend
: Catalyst, Tungsten
Spark๋ ์ฟผ๋ฆฌ๋ฅผ ๋๋ฆฌ๊ธฐ ์ํด ์์ ๋ ๊ฐ์ง ์์ง์ ์ฌ์ฉํ๋ค.
Catalyst๋ ์ฟผ๋ฆฌ๋ฌธ์ ์ต์ ํ ์ํค๋๋ฐ ์ด์ฉํ๊ณ , Tungsten์ RDD level์์ ์ฉ๋์ ์ต์ ํ์ํจ๋ค.
Process๋ฅผ ๋ณด๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.
Spark์ ์กฐ์ง๋๋ ๋ค์๊ณผ ๊ฐ๋ค.
์กฐ์ง๋๋ฅผ ๋ณด๋ฉด ์ ์ ์๋ค์ํผ Catalyst๋ SQL, DataFrame์ด Structured Data๋ฅผ ๋ค๋ฃฐ ์ ์๊ฒ ํ๋ ํ์ ๋ชจ๋์ด๋ค.
๊ทธ๋ ๋ค๋ฉด Catalyst๊ฐ ๊ตฌ์ฒด์ ์ผ๋ก ์ด๋ค ๊ธฐ๋ฅ์ ํ๋์ง ์ดํด๋ณด์.
"๋ณธ ํฌ์คํ ์ ํจ์คํธ์บ ํผ์ค์ ๊ฐ์๋ฅผ ๋ฃ๊ณ , ์ ๋ฆฌํ ์๋ฃ์์ ๋ฐํ๋๋ค."
Catalyst์ ๊ธฐ๋ฅ
Logical Plan -> Physical Plan
Logical Plan์ด๋? ์ํํ๋ ๋ชจ๋ Transformation ๋จ๊ณ๋ฅผ ์ถ์ํํ ๊ฒ์ด๋ค. ๋ฐ์ดํฐ๊ฐ ์ด๋ป๊ฒ ๋ณํด์ผ ํ๋์ง ์ ์ํ๋ฉฐ, ์ค์ ์ด๋์ ์ด๋ป๊ฒ ๋์ํ๋์ง๋ ์ ์ํ์ง ์๋๋ค.
Physical Plan์ด๋? Logical Plan์ด ์ด๋ป๊ฒ ํด๋ฌ์คํฐ ์์์ ์คํ๋ ์ง ์ ์ํ๋ค. ์คํ ์ ๋ต์ ๋ง๋ค๊ณ , Cost Model์ ํตํด ํ๋ณด Plan๋ค์ ํ๊ฐํ๋ค. ๊ฐ์ฅ ์ฑ๋ฅ์ด ์ข์ Plan์ ๋ฝ๋ ์์ผ๋ก ์ต์ ํํ๋ค.
์งํ ๋จ๊ณ
1๋จ๊ณ, ๋ถ์: DataFrame ๊ฐ์ฒด์ relation์ ๊ณ์ฐํ๋ค. ์นผ๋ผ์ ํ์ ๊ณผ ์ด๋ฆ์ ํ์ธํด๋๋ค.
2๋จ๊ณ, Logical Plan ์ต์ ํ: (1) ์์๋ก ํํ๋ ์์ Compile Time์ ๊ณ์ฐ (2) Predicate Pushdown: join & filter -> filter & join (3) Projection Prunning: ์ฐ์ฐ์ ํ์ํ ์นผ๋ผ๋ง ๊ฐ์ ธ์ค๊ธฐ
3๋จ๊ณ, Physical Plan ๋ง๋ค๊ธฐ: Spark์์ ์คํ ๊ฐ๋ฅํ Plan์ผ๋ก ๋ณํํ๋ค.
4๋จ๊ณ, Code generation: ์ต์ ํ๋ Physical Plan์ Java Bytecode๋ก ๋ณํ.
Catalyst์ ๊ธฐ๋ฅ์ ์ฝ๋๋ก ์ง์ ํ์ธํด๋ณด์.
Basic Setting
import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName("dataframe").getOrCreate()
# Data
stocks = [
('Google', 'GOOGL', 'USA', 2984, 'USD'),
('Netflix', 'NFLX', 'USA', 645, 'USD'),
('Amazon', 'AMZN', 'USA', 3518, 'USD'),
('Tesla', 'TSLA', 'USA', 1222, 'USD'),
('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
('Toyota', '7203', 'Japan', 2006, 'JPY'),
('Samsung', '005930', 'Korea', 70600, 'KRW'),
('Kakao', '035720', 'Korea', 125000, 'KRW'),
]
# Schema
stockSchema = ['name', 'ticker', 'country', 'price', 'currency']
# createDataFrame()
df = spark.createDataFrame(data = stocks, schema=stockSchema)
df.createOrReplaceTempView('stocks')
# Data 2
earnings = [
('Google', 27.99, 'USD'),
('Netflix', 2.56, 'USD'),
('Amazon', 6.12, 'USD'),
('Tesla', 1.86, 'USD'),
('Tencent', 11.01, 'HKD'),
('Toyota', 224.82, 'JPY'),
('Samsung', 1780., 'KRW'),
('Kakao', 705., 'KRW')
]
from pyspark.sql.types import StringType, FloatType, StructType, StructField
# Schema์์ DataType์ ์ง์ ์ง์ ํด์ค ์ ์๋ค.
earningsSchema = StructType([
StructField("name", StringType(), True),
StructField("EPS", StringType(), True),
StructField("currency", StringType(), True)
])
df_earning = spark.createDataFrame(data=earnings, schema=earningsSchema)
df_earning.createOrReplaceTempView('earnings')
spark.sql("select * from stocks join earnings on stocks.name = earnings.name").show()
+-------+------+---------+------+--------+-------+------+--------+
| name|ticker| country| price|currency| name| EPS|currency|
+-------+------+---------+------+--------+-------+------+--------+
| Amazon| AMZN| USA| 3518| USD| Amazon| 6.12| USD|
| Google| GOOGL| USA| 2984| USD| Google| 27.99| USD|
| Kakao|035720| Korea|125000| KRW| Kakao| 705.0| KRW|
|Netflix| NFLX| USA| 645| USD|Netflix| 2.56| USD|
|Samsung|005930| Korea| 70600| KRW|Samsung|1780.0| KRW|
|Tencent| 0700|Hong Kong| 483| HKD|Tencent| 11.01| HKD|
| Tesla| TSLA| USA| 1222| USD| Tesla| 1.86| USD|
| Toyota| 7203| Japan| 2006| JPY| Toyota|224.82| JPY|
+-------+------+---------+------+--------+-------+------+--------+
Logical Plan ์ต์ ํ
๋ค์ ์ฝ๋๋ฅผ ํตํด logical plan ์ต์ ํ๋ฅผ ์ด๋ป๊ฒ ํ๋์ง ์ดํด๋ณด์.
query = '''
SELECT
earnings.currency,
count(*) AS count
FROM
stocks
LEFT JOIN
earnings
ON
stocks.name = earnings.name
WHERE
earnings.EPS > 10
GROUP BY
earnings.currency
'''
spark.sql(query).show()
+--------+-----+
|currency|count|
+--------+-----+
| KRW| 2|
| JPY| 1|
| HKD| 1|
| USD| 1|
+--------+-----+
๋ณธ๋์ Plan์ ๋ค์๊ณผ ๊ฐ๋ค.
(1) ์ฝ๋ ์ค์บ -> (2) Join -> (3) Filter -> (4) Project -> (5) Aggregate
์์ Plan์ Catalyst๊ฐ ๋ค์๊ณผ ๊ฐ์ด ์ต์ ํํ๋ค.
(1) ์ฝ๋ ์ค์บ -> (2) Filter -> (3) Join -> (4) Project -> (5) Aggregate
Explain() ํจ์๋ฅผ ํตํด Cataylst์ ์ญํ ์ ์ธ๋ฐํ๊ฒ ์ดํผ์.
spark.sql(query).explain(True)
# ์ค์บํ Logical Plan์ด๋ค. ------------------------------------------------------
== Parsed Logical Plan ==
'Aggregate ['earnings.currency], ['earnings.currency, 'count(1) AS count#875]
+- 'Filter ('earnings.EPS > 10)
+- 'Join LeftOuter, ('stocks.name = 'earnings.name)
:- 'UnresolvedRelation [stocks], [], false
+- 'UnresolvedRelation [earnings], [], false
# Logical Plan์ ๋ถ์ํ๋ค. ------------------------------------------------------
== Analyzed Logical Plan ==
currency: string, count: bigint
Aggregate [currency#655], [currency#655, count(1) AS count#875L]
+- Filter (cast(EPS#654 as int) > 10)
+- Join LeftOuter, (name#0 = name#653)
:- SubqueryAlias stocks
: +- View (`stocks`, [name#0,ticker#1,country#2,price#3L,currency#4])
: +- LogicalRDD [name#0, ticker#1, country#2, price#3L, currency#4], false
+- SubqueryAlias earnings
+- View (`earnings`, [name#653,EPS#654,currency#655])
+- LogicalRDD [name#653, EPS#654, currency#655], false
# Logical Plan์ ์ต์ ํํ๋ค. --------------------------------------------------------------
== Optimized Logical Plan ==
Aggregate [currency#655], [currency#655, count(1) AS count#875L]
+- Project [currency#655]
+- Join Inner, (name#0 = name#653)
:- Project [name#0]
: +- Filter isnotnull(name#0)
: +- LogicalRDD [name#0, ticker#1, country#2, price#3L, currency#4], false
+- Project [name#653, currency#655]
+- Filter ((isnotnull(EPS#654) AND (cast(EPS#654 as int) > 10)) AND isnotnull(name#653))
+- LogicalRDD [name#653, EPS#654, currency#655], false
# Physical Plan์ผ๋ก ๋ณํ -----------------------------------------------------------------
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[currency#655], functions=[count(1)], output=[currency#655, count#875L])
+- Exchange hashpartitioning(currency#655, 200), ENSURE_REQUIREMENTS, [id=#2557]
+- HashAggregate(keys=[currency#655], functions=[partial_count(1)], output=[currency#655, count#880L])
+- Project [currency#655]
+- SortMergeJoin [name#0], [name#653], Inner
:- Sort [name#0 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(name#0, 200), ENSURE_REQUIREMENTS, [id=#2549]
: +- Project [name#0]
: +- Filter isnotnull(name#0)
: +- Scan ExistingRDD[name#0,ticker#1,country#2,price#3L,currency#4]
+- Sort [name#653 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(name#653, 200), ENSURE_REQUIREMENTS, [id=#2550]
+- Project [name#653, currency#655]
+- Filter ((isnotnull(EPS#654) AND (cast(EPS#654 as int) > 10)) AND isnotnull(name#653))
+- Scan ExistingRDD[name#653,EPS#654,currency#655]
Tungsten์ ๊ธฐ๋ฅ
Physical Plan์ด ์ ํ๋๋ฉด ๋ถ์ฐ ํ๊ฒฝ์์ ์คํ๋๋ Java Bytecode๊ฐ ๋ง๋ค์ด์ง๋ค.
์ด ํ๋ก์ธ์ค๋ฅผ Code Generation์ด๋ผ๊ณ ํ๋ค.
Tungsten์ ์์ง์ ์ฑ๋ฅ ํฅ์์ ๋ชฉ์ ์ผ๋ก ํ๋ฉฐ, ๋ค์๊ณผ ๊ฐ์ ๊ธฐ๋ฅ์ ์ง์ํ๋ค.
(1) ๋ฉ๋ชจ๋ฆฌ ๊ด๋ฆฌ ์ต์ ํ
(2) ์บ์ ํ์ฉ ์ฐ์ฐ
(3) ์ฝ๋ ์์ฑ
์ง๊ธ๊น์ง ๋ฐฑ์๋ ์ปดํฌ๋ํธ Catalyst, Tungsten์ ์์๋ณด์๋ค.
์ด๋ค์ ๊ณต๋ถํ ์ด์ ๋, ์ฝ๋๋ฅผ ์งค ๋ ์ด๋ป๊ฒ ๋ค์์ ๋์๊ฐ์ง ์์์ ํด์ผ ํ๊ธฐ ๋๋ฌธ์ด๋ค. ์ด๋ค์ ์ดํดํด์ผ ๋ ํจ์จ์ ์ธ ํ๋ก๊ทธ๋จ์ ์งค ์ ์์ ๊ฒ์ด๋ค.
๋ค์ ํฌ์คํ ์์ ์ง์ ๋ฐ์ดํฐ๋ฅผ ๋ค์ด๋ก๋ํ์ฌ SQL ๋ฌธ์ผ๋ก ์ ์ฒ๋ฆฌ๋ฅผ ํด๋ณด์.
์๊ณ ํ์ จ์ต๋๋ค.
'๐ Data Engineering > Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[SparkML] MLlib ๊ฐ๋ ๋ฐ ์ค์ต ์ฝ๋ (0) | 2022.05.20 |
---|---|
[SparkSQL] ํ์ ๋ฐ์ดํฐ ๋ค์ด/์ ์ฒ๋ฆฌ/๋ถ์ feat. TLC (0) | 2022.05.10 |
[SparkSQL] UDF ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.08 |
[SparkSQL] DataFrame ๋ค๋ฃจ๊ธฐ (0) | 2022.05.07 |
[Spark] SparkSQL ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.06 |