[SparkSQL] Catalyst, Tungsten ์ž‘๋™ ์›๋ฆฌ

2022. 5. 9. 12:26ใ†๐Ÿ›  Data Engineering/Apache Spark

 

 

Spark Backend

: Catalyst, Tungsten

 

Spark๋Š” ์ฟผ๋ฆฌ๋ฅผ ๋Œ๋ฆฌ๊ธฐ ์œ„ํ•ด ์œ„์˜ ๋‘ ๊ฐ€์ง€ ์—”์ง„์„ ์‚ฌ์šฉํ•œ๋‹ค.

Catalyst๋Š” ์ฟผ๋ฆฌ๋ฌธ์„ ์ตœ์ ํ™” ์‹œํ‚ค๋Š”๋ฐ ์ด์šฉํ•˜๊ณ , Tungsten์€ RDD level์—์„œ ์šฉ๋Ÿ‰์„ ์ตœ์ ํ™”์‹œํ‚จ๋‹ค.

Process๋ฅผ ๋ณด๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

 

 

SparkSQL ๋ฐฑ์—”๋“œ

 

 

 

 

 

Spark์˜ ์กฐ์ง๋„๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

 

 

 

Spark ์กฐ์ง๋„

 

 

 

์กฐ์ง๋„๋ฅผ ๋ณด๋ฉด ์•Œ ์ˆ˜ ์žˆ๋‹ค์‹œํ”ผ Catalyst๋Š” SQL, DataFrame์ด Structured Data๋ฅผ ๋‹ค๋ฃฐ ์ˆ˜ ์žˆ๊ฒŒ ํ•˜๋Š” ํ•„์ˆ˜ ๋ชจ๋“ˆ์ด๋‹ค.

๊ทธ๋ ‡๋‹ค๋ฉด Catalyst๊ฐ€ ๊ตฌ์ฒด์ ์œผ๋กœ ์–ด๋–ค ๊ธฐ๋Šฅ์„ ํ•˜๋Š”์ง€ ์‚ดํŽด๋ณด์ž.

 

 

 

 

"๋ณธ ํฌ์ŠคํŒ…์€ ํŒจ์ŠคํŠธ์บ ํผ์Šค์˜ ๊ฐ•์˜๋ฅผ ๋“ฃ๊ณ , ์ •๋ฆฌํ•œ ์ž๋ฃŒ์ž„์„ ๋ฐํž™๋‹ˆ๋‹ค."

 

 

 

 

Catalyst์˜ ๊ธฐ๋Šฅ


Logical Plan -> Physical Plan

 

Logical Plan์ด๋ž€? ์ˆ˜ํ–‰ํ•˜๋Š” ๋ชจ๋“  Transformation ๋‹จ๊ณ„๋ฅผ ์ถ”์ƒํ™”ํ•œ ๊ฒƒ์ด๋‹ค. ๋ฐ์ดํ„ฐ๊ฐ€ ์–ด๋–ป๊ฒŒ ๋ณ€ํ•ด์•ผ ํ•˜๋Š”์ง€ ์ •์˜ํ•˜๋ฉฐ, ์‹ค์ œ ์–ด๋””์„œ ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ•˜๋Š”์ง€๋Š” ์ •์˜ํ•˜์ง€ ์•Š๋Š”๋‹ค.

 

Physical Plan์ด๋ž€? Logical Plan์ด ์–ด๋–ป๊ฒŒ ํด๋Ÿฌ์Šคํ„ฐ ์œ„์—์„œ ์‹คํ–‰๋ ์ง€ ์ •์˜ํ•œ๋‹ค. ์‹คํ–‰ ์ „๋žต์„ ๋งŒ๋“ค๊ณ , Cost Model์„ ํ†ตํ•ด ํ›„๋ณด Plan๋“ค์„ ํ‰๊ฐ€ํ•œ๋‹ค. ๊ฐ€์žฅ ์„ฑ๋Šฅ์ด ์ข‹์€ Plan์„ ๋ฝ‘๋Š” ์‹์œผ๋กœ ์ตœ์ ํ™”ํ•œ๋‹ค.

 

 

 

์ง„ํ–‰ ๋‹จ๊ณ„

 

1๋‹จ๊ณ„, ๋ถ„์„: DataFrame ๊ฐ์ฒด์˜ relation์„ ๊ณ„์‚ฐํ•œ๋‹ค. ์นผ๋Ÿผ์˜ ํƒ€์ž…๊ณผ ์ด๋ฆ„์„ ํ™•์ธํ•ด๋‘”๋‹ค.

2๋‹จ๊ณ„, Logical Plan ์ตœ์ ํ™”: (1) ์ƒ์ˆ˜๋กœ ํ‘œํ˜„๋œ ์‹์„ Compile Time์— ๊ณ„์‚ฐ (2) Predicate Pushdown: join & filter -> filter & join (3) Projection Prunning: ์—ฐ์‚ฐ์— ํ•„์š”ํ•œ ์นผ๋Ÿผ๋งŒ ๊ฐ€์ ธ์˜ค๊ธฐ

3๋‹จ๊ณ„, Physical Plan ๋งŒ๋“ค๊ธฐ: Spark์—์„œ ์‹คํ–‰ ๊ฐ€๋Šฅํ•œ Plan์œผ๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค.

4๋‹จ๊ณ„, Code generation: ์ตœ์ ํ™”๋œ Physical Plan์„ Java Bytecode๋กœ ๋ณ€ํ™˜.

 

 

 

 

 

 

 

Catalyst์˜ ๊ธฐ๋Šฅ์„ ์ฝ”๋“œ๋กœ ์ง์ ‘ ํ™•์ธํ•ด๋ณด์ž.

 

 

Basic Setting

 

import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName("dataframe").getOrCreate()

# Data

stocks = [
    ('Google', 'GOOGL', 'USA', 2984, 'USD'), 
    ('Netflix', 'NFLX', 'USA', 645, 'USD'),
    ('Amazon', 'AMZN', 'USA', 3518, 'USD'),
    ('Tesla', 'TSLA', 'USA', 1222, 'USD'),
    ('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
    ('Toyota', '7203', 'Japan', 2006, 'JPY'),
    ('Samsung', '005930', 'Korea', 70600, 'KRW'),
    ('Kakao', '035720', 'Korea', 125000, 'KRW'),
]


# Schema

stockSchema = ['name', 'ticker', 'country', 'price', 'currency']


# createDataFrame()
df = spark.createDataFrame(data = stocks, schema=stockSchema)
df.createOrReplaceTempView('stocks')


# Data 2

earnings = [
    ('Google', 27.99, 'USD'), 
    ('Netflix', 2.56, 'USD'),
    ('Amazon', 6.12, 'USD'),
    ('Tesla', 1.86, 'USD'),
    ('Tencent', 11.01, 'HKD'),
    ('Toyota', 224.82, 'JPY'),
    ('Samsung', 1780., 'KRW'),
    ('Kakao', 705., 'KRW')
]


from pyspark.sql.types import StringType, FloatType, StructType, StructField
# Schema์—์„œ DataType์„ ์ง์ ‘ ์ง€์ •ํ•ด์ค„ ์ˆ˜ ์žˆ๋‹ค. 

earningsSchema = StructType([
    StructField("name", StringType(), True),
    StructField("EPS", StringType(), True),
    StructField("currency", StringType(), True)
])


df_earning = spark.createDataFrame(data=earnings, schema=earningsSchema)
df_earning.createOrReplaceTempView('earnings')

 

spark.sql("select * from stocks join earnings on stocks.name = earnings.name").show()


+-------+------+---------+------+--------+-------+------+--------+
|   name|ticker|  country| price|currency|   name|   EPS|currency|
+-------+------+---------+------+--------+-------+------+--------+
| Amazon|  AMZN|      USA|  3518|     USD| Amazon|  6.12|     USD|
| Google| GOOGL|      USA|  2984|     USD| Google| 27.99|     USD|
|  Kakao|035720|    Korea|125000|     KRW|  Kakao| 705.0|     KRW|
|Netflix|  NFLX|      USA|   645|     USD|Netflix|  2.56|     USD|
|Samsung|005930|    Korea| 70600|     KRW|Samsung|1780.0|     KRW|
|Tencent|  0700|Hong Kong|   483|     HKD|Tencent| 11.01|     HKD|
|  Tesla|  TSLA|      USA|  1222|     USD|  Tesla|  1.86|     USD|
| Toyota|  7203|    Japan|  2006|     JPY| Toyota|224.82|     JPY|
+-------+------+---------+------+--------+-------+------+--------+

 

 

 

 

 

 

Logical Plan ์ตœ์ ํ™”

๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ํ†ตํ•ด logical plan ์ตœ์ ํ™”๋ฅผ ์–ด๋–ป๊ฒŒ ํ•˜๋Š”์ง€ ์‚ดํŽด๋ณด์ž.

 

query = '''
SELECT
    earnings.currency,
    count(*) AS count
FROM
    stocks 
    LEFT JOIN 
        earnings 
    ON 
        stocks.name = earnings.name
    WHERE
        earnings.EPS > 10
    GROUP BY
        earnings.currency
'''

spark.sql(query).show()


+--------+-----+
|currency|count|
+--------+-----+
|     KRW|    2|
|     JPY|    1|
|     HKD|    1|
|     USD|    1|
+--------+-----+

 

 

๋ณธ๋ž˜์˜ Plan์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

(1) ์ฝ”๋“œ ์Šค์บ” -> (2) Join -> (3) Filter -> (4) Project -> (5) Aggregate

 

์œ„์˜ Plan์„ Catalyst๊ฐ€ ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ตœ์ ํ™”ํ•œ๋‹ค.

(1) ์ฝ”๋“œ ์Šค์บ” -> (2) Filter -> (3) Join -> (4) Project -> (5) Aggregate 

 

 

 

 

 

 

Explain() ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด Cataylst์˜ ์—ญํ• ์„ ์„ธ๋ฐ€ํ•˜๊ฒŒ ์‚ดํ”ผ์ž.

 

spark.sql(query).explain(True)



# ์Šค์บ”ํ•œ Logical Plan์ด๋‹ค. ------------------------------------------------------

== Parsed Logical Plan ==
'Aggregate ['earnings.currency], ['earnings.currency, 'count(1) AS count#875]
+- 'Filter ('earnings.EPS > 10)
   +- 'Join LeftOuter, ('stocks.name = 'earnings.name)
      :- 'UnresolvedRelation [stocks], [], false
      +- 'UnresolvedRelation [earnings], [], false

 

# Logical Plan์„ ๋ถ„์„ํ•œ๋‹ค. ------------------------------------------------------

== Analyzed Logical Plan ==
currency: string, count: bigint
Aggregate [currency#655], [currency#655, count(1) AS count#875L]
+- Filter (cast(EPS#654 as int) > 10)
   +- Join LeftOuter, (name#0 = name#653)
      :- SubqueryAlias stocks
      :  +- View (`stocks`, [name#0,ticker#1,country#2,price#3L,currency#4])
      :     +- LogicalRDD [name#0, ticker#1, country#2, price#3L, currency#4], false
      +- SubqueryAlias earnings
         +- View (`earnings`, [name#653,EPS#654,currency#655])
            +- LogicalRDD [name#653, EPS#654, currency#655], false

 

# Logical Plan์„ ์ตœ์ ํ™”ํ•œ๋‹ค. --------------------------------------------------------------

== Optimized Logical Plan ==
Aggregate [currency#655], [currency#655, count(1) AS count#875L]
+- Project [currency#655]
   +- Join Inner, (name#0 = name#653)
      :- Project [name#0]
      :  +- Filter isnotnull(name#0)
      :     +- LogicalRDD [name#0, ticker#1, country#2, price#3L, currency#4], false
      +- Project [name#653, currency#655]
         +- Filter ((isnotnull(EPS#654) AND (cast(EPS#654 as int) > 10)) AND isnotnull(name#653))
            +- LogicalRDD [name#653, EPS#654, currency#655], false

 

# Physical Plan์œผ๋กœ ๋ณ€ํ™˜ -----------------------------------------------------------------

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[currency#655], functions=[count(1)], output=[currency#655, count#875L])
   +- Exchange hashpartitioning(currency#655, 200), ENSURE_REQUIREMENTS, [id=#2557]
      +- HashAggregate(keys=[currency#655], functions=[partial_count(1)], output=[currency#655, count#880L])
         +- Project [currency#655]
            +- SortMergeJoin [name#0], [name#653], Inner
               :- Sort [name#0 ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(name#0, 200), ENSURE_REQUIREMENTS, [id=#2549]
               :     +- Project [name#0]
               :        +- Filter isnotnull(name#0)
               :           +- Scan ExistingRDD[name#0,ticker#1,country#2,price#3L,currency#4]
               +- Sort [name#653 ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(name#653, 200), ENSURE_REQUIREMENTS, [id=#2550]
                     +- Project [name#653, currency#655]
                        +- Filter ((isnotnull(EPS#654) AND (cast(EPS#654 as int) > 10)) AND isnotnull(name#653))
                           +- Scan ExistingRDD[name#653,EPS#654,currency#655]

 

 

 

 

 

 

Tungsten์˜ ๊ธฐ๋Šฅ


Physical Plan์ด ์„ ํƒ๋˜๋ฉด ๋ถ„์‚ฐ ํ™˜๊ฒฝ์—์„œ ์‹คํ–‰๋˜๋Š” Java Bytecode๊ฐ€ ๋งŒ๋“ค์–ด์ง„๋‹ค.

์ด ํ”„๋กœ์„ธ์Šค๋ฅผ Code Generation์ด๋ผ๊ณ  ํ•œ๋‹ค.

 

Tungsten์€ ์—”์ง„์˜ ์„ฑ๋Šฅ ํ–ฅ์ƒ์„ ๋ชฉ์ ์œผ๋กœ ํ•˜๋ฉฐ, ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ธฐ๋Šฅ์„ ์ง€์›ํ•œ๋‹ค.

 

(1) ๋ฉ”๋ชจ๋ฆฌ ๊ด€๋ฆฌ ์ตœ์ ํ™”

(2) ์บ์‹œ ํ™œ์šฉ ์—ฐ์‚ฐ

(3) ์ฝ”๋“œ ์ƒ์„ฑ

 

 

 

 

 

์ง€๊ธˆ๊นŒ์ง€ ๋ฐฑ์—”๋“œ ์ปดํฌ๋„ŒํŠธ Catalyst, Tungsten์„ ์•Œ์•„๋ณด์•˜๋‹ค. 

์ด๋“ค์„ ๊ณต๋ถ€ํ•œ ์ด์œ ๋Š”, ์ฝ”๋“œ๋ฅผ ์งค ๋•Œ ์–ด๋–ป๊ฒŒ ๋’ค์—์„œ ๋Œ์•„๊ฐˆ์ง€ ์ƒ์ƒ์„ ํ•ด์•ผ ํ•˜๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. ์ด๋“ค์„ ์ดํ•ดํ•ด์•ผ ๋” ํšจ์œจ์ ์ธ ํ”„๋กœ๊ทธ๋žจ์„ ์งค ์ˆ˜ ์žˆ์„ ๊ฒƒ์ด๋‹ค. 

๋‹ค์Œ ํฌ์ŠคํŒ…์—์„  ์ง์ ‘ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์šด๋กœ๋“œํ•˜์—ฌ SQL ๋ฌธ์œผ๋กœ ์ „์ฒ˜๋ฆฌ๋ฅผ ํ•ด๋ณด์ž.

์ˆ˜๊ณ ํ•˜์…จ์Šต๋‹ˆ๋‹ค.