[Spark] SparkSQL ๊ฐœ๋… ๋ฐ ์ฝ”๋“œ

2022. 5. 6. 13:32ใ†๐Ÿ›  Data Engineering/Apache Spark

 

 

SparkSQL

 

Structured Data๋ฅผ ๋‹ค๋ฃจ๊ธฐ ์œ„ํ•ด Spark์—์„œ๋„ SQL๋ฌธ์„ ์“ธ ์ˆ˜ ์žˆ๊ฒŒ ํ–ˆ๋‹ค.

 

1. Structured Data์™€ Unstructured Data์˜ ์ฐจ์ด์ ์€ ๋ฌด์—‡์ด๊ณ 

2. ์™œ RDD๊ฐ€ ์•„๋‹Œ Structured Data๋ฅผ ์จ์•ผ ํ•˜๋ฉฐ

3. Spark์—์„œ SQL๋ฌธ์„ ์–ด๋–ป๊ฒŒ ์“ธ ์ˆ˜ ์žˆ์„๊นŒ?

 

ํฌ์ŠคํŒ…์„ ์ฝ์–ด๊ฐ€๋ฉฐ ์งˆ๋ฌธ์„ ํ•ด์†Œํ•ด๋ณด์ž.

 

 

 

"๋ณธ ํฌ์ŠคํŒ…์€ ํŒจ์ŠคํŠธ์บ ํผ์Šค์˜ ๊ฐ•์˜๋ฅผ ๋“ฃ๊ณ , ์ •๋ฆฌํ•œ ์ž๋ฃŒ์ž„์„ ๋ฐํž™๋‹ˆ๋‹ค."

 

 

 

 

 

 

Structured Data vs Unstructured Data


Unstructured Data : free form

- ๋กœ๊ทธ ํŒŒ์ผ

- ์ด๋ฏธ์ง€

๋ง ๊ทธ๋Œ€๋กœ free form. ์ž์œ ๋กœ์šด ํ˜•์‹์˜ ๋ฐ์ดํ„ฐ๋‹ค. ์ด๋ฏธ์ง€๊ฐ€ ๋  ์ˆ˜๋„ ์žˆ๊ณ , ํ•˜๋‚˜์˜ ์ŠคํŠธ๋ง์ด ๋  ์ˆ˜ ์žˆ๋‹ค. ์ด๋Ÿฐ ๋ฐ์ดํ„ฐ๋“ค์„ ์ •์ œํ•˜์—ฌ Structured Data๋กœ ๋ณ€ํ™˜ํ•œ๋‹ค. 

 

 

Semi Structured Data : ํ–‰๊ณผ ์—ด

- CSV

- JSON

- XML

๊ธฐ๋ณธ์ ์ธ ํ–‰๊ณผ ์—ด์€ ๊ฐ–์ถฐ์ ธ ์žˆ๋Š” ํ˜•ํƒœ์˜ ๋ฐ์ดํ„ฐ๋‹ค. ํ•˜์ง€๋งŒ ๋ฐ์ดํ„ฐ์˜ ํƒ€์ž…์ด ์ˆซ์ž์ธ์ง€, ๋ฌธ์ž์ธ์ง€ ๋“ฑ์˜ ์ •๋ณด๋Š” ๋‹ด๊ณ  ์žˆ๋Š” ์•Š๋Š”๋‹ค. ๋”ฑ ๋ฐ์ดํ„ฐ์˜ ์กด์žฌ๋งŒ ๋ณด์—ฌ์ฃผ๋Š” ํ˜•ํƒœ๋‹ค. 

 

 

Structured Data : ํ–‰๊ณผ ์—ด + ์Šคํ‚ค๋งˆ

- ๋ฐ์ดํ„ฐ ๋ฒ ์ด์Šค (ex. table ํ˜•ํƒœ, dataframe)

ํ–‰๊ณผ ์—ด์„ ๋”๋ถˆ์–ด, ๋ฐ์ดํ„ฐ์˜ ํƒ€์ž…๊นŒ์ง€ ์ €์žฅํ•ด๋‘” ํ˜•ํƒœ๋‹ค. ์ˆซ์ž, ๋ฌธ์ž, ๋ฆฌ์ŠคํŠธ ๋“ฑ Schema๋ฅผ ๋‹ด๊ณ  ์žˆ์–ด, ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ ํ˜•ํƒœ์— ๋น„ํ•ด ์ตœ์ ํ™”์— ์œ ๋ฆฌํ•˜๋‹ค.

 

 

 

* RDD๊ฐ€ ์•„๋‹Œ Structured Data๋ฅผ Spark์—์„œ ์‚ฌ์šฉํ•˜๋Š” ์ด์œ 

 

RDD๋Š” ๋ฐ์ดํ„ฐ์˜ ๊ตฌ์กฐ๋ฅผ ๋ชจ๋ฅด๊ธฐ ๋•Œ๋ฌธ์—, ์ตœ์ ํ™” ๋“ฑ ๋ฐ์ดํ„ฐ ๋‹ค๋ฃจ๊ธฐ๋ฅผ ๊ฐœ๋ฐœ์ž์—๊ฒŒ ๋ชจ๋‘ ์˜์กดํ•ด์•ผ ํ•œ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ์ „ ํฌ์ŠคํŒ…์—์„œ ์ด์•ผ๊ธฐํ–ˆ๋˜ ์„ฑ๋Šฅ ๋น„๊ต ๋ฌธ์ œ๋ฅผ ๋– ์˜ฌ๋ ค ๋ณด์ž. (1) filter()๋ฅผ ์ ์šฉํ•œ ๋‹ค์Œ, reduceByKey()๋ฅผ ํ•œ ๊ฒƒ๊ณผ (2) reduceByKey()๋ฅผ ์ ์šฉํ•œ ๋‹ค์Œ, filter()๋ฅผ ์ ์šฉํ•œ ๊ฒƒ์€ ์„ฑ๋Šฅ ์ฐจ์ด๊ฐ€ ์กด์žฌํ–ˆ๋‹ค. ์ด์ฒ˜๋Ÿผ RDD๋ฅผ ์ด์šฉํ•˜๋ฉด ๊ฐœ๋ฐœ์ž์˜ ์‹ค๋ ฅ์— ๋”ฐ๋ผ ์„ฑ๋Šฅ์ด ์ฒœ์ฐจ๋งŒ๋ณ„์ด ๋œ๋‹ค. 

ํ•˜์ง€๋งŒ Structured Data์˜ ๊ฒฝ์šฐ, ๊ตฌ์กฐ๋ฅผ ์ปดํ“จํ„ฐ๊ฐ€ ์•Œ๊ณ  ์žˆ๊ธฐ์— ์–ด๋–ค task๋ฅผ ์ˆ˜ํ–‰ํ• ์ง€๋งŒ ์ž…๋ ฅํ•˜๋ฉด ์ž๋™์œผ๋กœ ์ตœ์ ํ™”๊นŒ์ง€ ์ง„ํ–‰์‹œํ‚จ๋‹ค. ๊ฐœ๋ฐœ์ž๋Š” task๋งŒ ์ •์˜ ๋‚ด๋ฆฌ๋ฉด ๋˜๋Š” ๊ฒƒ์ด๋‹ค. ์ด๋Ÿฐ ๋‹ˆ์ฆˆ์—์„œ ํƒ„์ƒํ•œ ๊ฒƒ์ด SparkSQL์ด๋‹ค. 

SparkSQL์€ Structured Data๋ฅผ ๋‹ค๋ฃฐ ์ˆ˜ ์žˆ๊ฒŒ ํ•จ์œผ๋กœ์จ, Spark์˜ ์ด์šฉ ์„ฑ๋Šฅ์„ ์ƒํ–ฅ ํ‰์ค€ํ™”์‹œ์ผฐ๋‹ค. 

 

 

 

 

 

 

SparkSQL ๊ฐ„๋‹จ ์„ค๋ช…


SparkSQL์€ ์ŠคํŒŒํฌ ์œ„์— ๊ตฌํ˜„๋œ ํ•˜๋‚˜์˜ ํŒจํ‚ค์ง€๋‹ค. 

 

3๊ฐœ์˜ ์ฃผ์š” API๊ฐ€ ์žˆ๋Š”๋ฐ SQL, DataFrame, Datasets(pyspark ์‚ฌ์šฉ์ž๋Š” ์‹ ๊ฒฝ X)์ด๋‹ค.

์ฆ‰, SparkSQL์„ ์‚ฌ์šฉํ•˜๋ฉด SQL๋ฌธ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋งŒ์งˆ ์ˆ˜ ์žˆ์œผ๋ฉฐ, DataFrame์„ ํ˜•์„ฑ ๋ฐ ์ „์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ๋œป์ด๋‹ค.

 

2๊ฐœ์˜ ๋ฐฑ์—”๋“œ ์ปดํฌ๋„ŒํŠธ๊ฐ€ ์กด์žฌํ•˜๋Š”๋ฐ, Catalyst์™€ Tungsten์ด ๊ทธ๊ฒƒ์ด๋‹ค. ๊ธฐ๋Šฅ์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค. 

Catalyst - ์ฟผ๋ฆฌ ์ตœ์ ํ™” ์—”์ง„

Tungsten - ์‹œ๋ฆฌ์–ผ๋ผ์ด์ € (์šฉ๋Ÿ‰ ์ตœ์ ํ™”)

๋ฐฑ์—”๋“œ์— ๋Œ€ํ•œ ์ž์„ธํ•œ ๋‚ด์šฉ์€ ์ดํ›„ ํฌ์ŠคํŒ…์—์„œ ๋‹ค๋ฃจ๊ฒ ๋‹ค. 

 

 

 

 

 

SparkSQL ์‚ฌ์šฉํ•˜๊ธฐ


Import

 

import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession

 

 

 

SparkCore์—์„  SparkContext๋ฅผ ์ด์šฉํ–ˆ๋‹ค๋ฉด, SparkSQL์—์„  SparkSession์„ ์ด์šฉํ•œ๋‹ค.

 

# SparkSession์„ ๋จผ์ € ์ž…๋ ฅ
# local ํ™˜๊ฒฝ์—์„œ ๋Œ๋ฆฌ๋Š” ๊ฒฝ์šฐ, 'local'์ด๋ผ๊ณ  ์„ค์ •
# appName()์€ ์ž์‹ ์ด ์›ํ•˜๋Š” ์ด๋ฆ„์œผ๋กœ ํ•˜๋ฉด ๋œ๋‹ค.
# SparkSession์€ SparkContext์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ 1ํšŒ ๋ถˆ๋Ÿฌ์˜ฌ ์ˆ˜ ์žˆ๋‹ค. 

spark = SparkSession.builder.master('local').appName("sql-learn").getOrCreate()

 

 

 

createDataFrame() ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•˜์—ฌ DataFrame์„ ํ˜•์„ฑํ•œ๋‹ค.

 

# Data

stocks = [
    ('Google', 'GOOGL', 'USA', 2984, 'USD'), 
    ('Netflix', 'NFLX', 'USA', 645, 'USD'),
    ('Amazon', 'AMZN', 'USA', 3518, 'USD'),
    ('Tesla', 'TSLA', 'USA', 1222, 'USD'),
    ('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
    ('Toyota', '7203', 'Japan', 2006, 'JPY'),
    ('Samsung', '005930', 'Korea', 70600, 'KRW'),
    ('Kakao', '035720', 'Korea', 125000, 'KRW'),
]


# Schema

stockSchema = ['name', 'ticker', 'country', 'price', 'currency']


# createDataFrame()
df = spark.createDataFrame(data = stocks, schema=stockSchema)

 

df.dtypes


[('name', 'string'),
 ('ticker', 'string'),
 ('country', 'string'),
 ('price', 'bigint'),
 ('currency', 'string')]

----

df.show()


+-------+------+---------+------+--------+
|   name|ticker|  country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL|      USA|  2984|     USD|
|Netflix|  NFLX|      USA|   645|     USD|
| Amazon|  AMZN|      USA|  3518|     USD|
|  Tesla|  TSLA|      USA|  1222|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY|
|Samsung|005930|    Korea| 70600|     KRW|
|  Kakao|035720|    Korea|125000|     KRW|
+-------+------+---------+------+--------+

 

 

 

 

createOrReplaceTempView() ์ด์šฉ

์ด ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด DataFrame์„ SQL๋ฌธ์œผ๋กœ ๋ถˆ๋Ÿฌ์˜ค๊ณ , ์ „์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๋‹ค. 

 

# dataframe์„ ๋จผ์ € ์ž…๋ ฅ
# ํ•จ์ˆ˜ ์ž…๋ ฅ
# createOrReplaceTempView() ํ•จ์ˆ˜ ์•ˆ์ชฝ์—๋Š” SQL๋ฌธ ์“ธ ๋•Œ ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ์–ด๋–ค ์ด๋ฆ„์œผ๋กœ ์‚ฌ์šฉํ•  ๊ฒƒ์ธ์ง€
# ์ •์˜ํ•œ๋‹ค. 

df.createOrReplaceTempView('stocks')

 

 

 

SQL๋ฌธ์œผ๋กœ ์ž์œ ๋กญ๊ฒŒ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ค„๋ณด์ž.

 

spark.sql("select name from stocks").show()


+--------------+
|(price > 2000)|
+--------------+
|          true|
|         false|
|          true|
|         false|
|         false|
|          true|
|          true|
|          true|
+--------------+


----


spark.sql("select name, price from stocks where country = 'Korea'").show()


+-------+------+
|   name| price|
+-------+------+
|Samsung| 70600|
|  Kakao|125000|
+-------+------+


----


spark.sql("select name, price from stocks order by price asc").show()


+-------+------+
|   name| price|
+-------+------+
|Tencent|   483|
|Netflix|   645|
|  Tesla|  1222|
| Toyota|  2006|
| Google|  2984|
| Amazon|  3518|
|Samsung| 70600|
|  Kakao|125000|
+-------+------+


----


spark.sql("select count(price) from stocks where country in ('Korea', 'USA')").show()


+------------+
|count(price)|
+------------+
|           6|
+------------+

 

 

 

 

๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ๋ฅผ Join ํ•ด๋ณด์ž.

 

earnings = [
    ('Google', 27.99, 'USD'), 
    ('Netflix', 2.56, 'USD'),
    ('Amazon', 6.12, 'USD'),
    ('Tesla', 1.86, 'USD'),
    ('Tencent', 11.01, 'HKD'),
    ('Toyota', 224.82, 'JPY'),
    ('Samsung', 1780., 'KRW'),
    ('Kakao', 705., 'KRW')
]


from pyspark.sql.types import StringType, FloatType, StructType, StructField
# Schema์—์„œ DataType์„ ์ง์ ‘ ์ง€์ •ํ•ด์ค„ ์ˆ˜ ์žˆ๋‹ค. 

earningsSchema = StructType([
    StructField("name", StringType(), True),
    StructField("EPS", StringType(), True),
    StructField("currency", StringType(), True)
])


df_earning = spark.createDataFrame(data=earnings, schema=earningsSchema)
df_earning.createOrReplaceTempView('earnings')

 

 

 

SQL๋ฌธ์„ ์“ฐ์ง€ ์•Š๊ณ ๋„, ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃฐ ์ˆœ ์žˆ๋‹ค. ํ•˜์ง€๋งŒ SQL๋ฌธ์„ ํ†ตํ•ด ๋” ๋งŽ์ด ๋‹ค๋ฃฌ๋‹ค.

 

df_earning.select("*").show()


+-------+------+--------+
|   name|   EPS|currency|
+-------+------+--------+
| Google| 27.99|     USD|
|Netflix|  2.56|     USD|
| Amazon|  6.12|     USD|
|  Tesla|  1.86|     USD|
|Tencent| 11.01|     HKD|
| Toyota|224.82|     JPY|
|Samsung|1780.0|     KRW|
|  Kakao| 705.0|     KRW|
+-------+------+--------+

 

 

 

JOIN ํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ๋” ๋‹ค์–‘ํ•œ ๊ฐ๋„์—์„œ ์‚ดํŽด๋ณด์ž!

 

spark.sql("select * from stocks join earnings on stocks.name = earnings.name").show()


+-------+------+---------+------+--------+-------+------+--------+
|   name|ticker|  country| price|currency|   name|   EPS|currency|
+-------+------+---------+------+--------+-------+------+--------+
| Amazon|  AMZN|      USA|  3518|     USD| Amazon|  6.12|     USD|
| Google| GOOGL|      USA|  2984|     USD| Google| 27.99|     USD|
|  Kakao|035720|    Korea|125000|     KRW|  Kakao| 705.0|     KRW|
|Netflix|  NFLX|      USA|   645|     USD|Netflix|  2.56|     USD|
|Samsung|005930|    Korea| 70600|     KRW|Samsung|1780.0|     KRW|
|Tencent|  0700|Hong Kong|   483|     HKD|Tencent| 11.01|     HKD|
|  Tesla|  TSLA|      USA|  1222|     USD|  Tesla|  1.86|     USD|
| Toyota|  7203|    Japan|  2006|     JPY| Toyota|224.82|     JPY|
+-------+------+---------+------+--------+-------+------+--------+


----

# PER = Price / EPS
spark.sql("select stocks.name, (stocks.price/earnings.EPS) AS PER from stocks join earnings on stocks.name = earnings.name").show()


+-------+------------------+
|   name|               PER|
+-------+------------------+
| Amazon| 574.8366013071895|
| Google|106.60950339406932|
|  Kakao| 177.3049645390071|
|Netflix|        251.953125|
|Samsung|39.662921348314605|
|Tencent|43.869209809264305|
|  Tesla|  656.989247311828|
| Toyota| 8.922693710523975|
+-------+------------------+

 

 

 

 

์ง€๊ธˆ๊นŒ์ง€ SparkSQL์„ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ๋ฒ•์„ ๊ฐ„๋žตํ•˜๊ฒŒ ์•Œ์•„๋ณด์•˜๋‹ค.

๋‹ค์Œ ํฌ์ŠคํŒ…์—์„  DataFrame์„ ๋” ์ž์„ธํ•˜๊ฒŒ ๋‹ค๋ค„๋ณด๋„๋ก ํ•˜๊ฒ ๋‹ค.

์ˆ˜๊ณ ํ–ˆ์Šต๋‹ˆ๋‹ค!