2022. 5. 6. 13:32ใ๐ Data Engineering/Apache Spark
SparkSQL
Structured Data๋ฅผ ๋ค๋ฃจ๊ธฐ ์ํด Spark์์๋ SQL๋ฌธ์ ์ธ ์ ์๊ฒ ํ๋ค.
1. Structured Data์ Unstructured Data์ ์ฐจ์ด์ ์ ๋ฌด์์ด๊ณ
2. ์ RDD๊ฐ ์๋ Structured Data๋ฅผ ์จ์ผ ํ๋ฉฐ
3. Spark์์ SQL๋ฌธ์ ์ด๋ป๊ฒ ์ธ ์ ์์๊น?
ํฌ์คํ ์ ์ฝ์ด๊ฐ๋ฉฐ ์ง๋ฌธ์ ํด์ํด๋ณด์.
"๋ณธ ํฌ์คํ ์ ํจ์คํธ์บ ํผ์ค์ ๊ฐ์๋ฅผ ๋ฃ๊ณ , ์ ๋ฆฌํ ์๋ฃ์์ ๋ฐํ๋๋ค."
Structured Data vs Unstructured Data
Unstructured Data : free form
- ๋ก๊ทธ ํ์ผ
- ์ด๋ฏธ์ง
๋ง ๊ทธ๋๋ก free form. ์์ ๋ก์ด ํ์์ ๋ฐ์ดํฐ๋ค. ์ด๋ฏธ์ง๊ฐ ๋ ์๋ ์๊ณ , ํ๋์ ์คํธ๋ง์ด ๋ ์ ์๋ค. ์ด๋ฐ ๋ฐ์ดํฐ๋ค์ ์ ์ ํ์ฌ Structured Data๋ก ๋ณํํ๋ค.
Semi Structured Data : ํ๊ณผ ์ด
- CSV
- JSON
- XML
๊ธฐ๋ณธ์ ์ธ ํ๊ณผ ์ด์ ๊ฐ์ถฐ์ ธ ์๋ ํํ์ ๋ฐ์ดํฐ๋ค. ํ์ง๋ง ๋ฐ์ดํฐ์ ํ์ ์ด ์ซ์์ธ์ง, ๋ฌธ์์ธ์ง ๋ฑ์ ์ ๋ณด๋ ๋ด๊ณ ์๋ ์๋๋ค. ๋ฑ ๋ฐ์ดํฐ์ ์กด์ฌ๋ง ๋ณด์ฌ์ฃผ๋ ํํ๋ค.
Structured Data : ํ๊ณผ ์ด + ์คํค๋ง
- ๋ฐ์ดํฐ ๋ฒ ์ด์ค (ex. table ํํ, dataframe)
ํ๊ณผ ์ด์ ๋๋ถ์ด, ๋ฐ์ดํฐ์ ํ์ ๊น์ง ์ ์ฅํด๋ ํํ๋ค. ์ซ์, ๋ฌธ์, ๋ฆฌ์คํธ ๋ฑ Schema๋ฅผ ๋ด๊ณ ์์ด, ๋ค๋ฅธ ๋ฐ์ดํฐ ํํ์ ๋นํด ์ต์ ํ์ ์ ๋ฆฌํ๋ค.
* RDD๊ฐ ์๋ Structured Data๋ฅผ Spark์์ ์ฌ์ฉํ๋ ์ด์
RDD๋ ๋ฐ์ดํฐ์ ๊ตฌ์กฐ๋ฅผ ๋ชจ๋ฅด๊ธฐ ๋๋ฌธ์, ์ต์ ํ ๋ฑ ๋ฐ์ดํฐ ๋ค๋ฃจ๊ธฐ๋ฅผ ๊ฐ๋ฐ์์๊ฒ ๋ชจ๋ ์์กดํด์ผ ํ๋ค. ์๋ฅผ ๋ค์ด, ์ ํฌ์คํ ์์ ์ด์ผ๊ธฐํ๋ ์ฑ๋ฅ ๋น๊ต ๋ฌธ์ ๋ฅผ ๋ ์ฌ๋ ค ๋ณด์. (1) filter()๋ฅผ ์ ์ฉํ ๋ค์, reduceByKey()๋ฅผ ํ ๊ฒ๊ณผ (2) reduceByKey()๋ฅผ ์ ์ฉํ ๋ค์, filter()๋ฅผ ์ ์ฉํ ๊ฒ์ ์ฑ๋ฅ ์ฐจ์ด๊ฐ ์กด์ฌํ๋ค. ์ด์ฒ๋ผ RDD๋ฅผ ์ด์ฉํ๋ฉด ๊ฐ๋ฐ์์ ์ค๋ ฅ์ ๋ฐ๋ผ ์ฑ๋ฅ์ด ์ฒ์ฐจ๋ง๋ณ์ด ๋๋ค.
ํ์ง๋ง Structured Data์ ๊ฒฝ์ฐ, ๊ตฌ์กฐ๋ฅผ ์ปดํจํฐ๊ฐ ์๊ณ ์๊ธฐ์ ์ด๋ค task๋ฅผ ์ํํ ์ง๋ง ์ ๋ ฅํ๋ฉด ์๋์ผ๋ก ์ต์ ํ๊น์ง ์งํ์ํจ๋ค. ๊ฐ๋ฐ์๋ task๋ง ์ ์ ๋ด๋ฆฌ๋ฉด ๋๋ ๊ฒ์ด๋ค. ์ด๋ฐ ๋์ฆ์์ ํ์ํ ๊ฒ์ด SparkSQL์ด๋ค.
SparkSQL์ Structured Data๋ฅผ ๋ค๋ฃฐ ์ ์๊ฒ ํจ์ผ๋ก์จ, Spark์ ์ด์ฉ ์ฑ๋ฅ์ ์ํฅ ํ์คํ์์ผฐ๋ค.
SparkSQL ๊ฐ๋จ ์ค๋ช
SparkSQL์ ์คํํฌ ์์ ๊ตฌํ๋ ํ๋์ ํจํค์ง๋ค.
3๊ฐ์ ์ฃผ์ API๊ฐ ์๋๋ฐ SQL, DataFrame, Datasets(pyspark ์ฌ์ฉ์๋ ์ ๊ฒฝ X)์ด๋ค.
์ฆ, SparkSQL์ ์ฌ์ฉํ๋ฉด SQL๋ฌธ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ง์ง ์ ์์ผ๋ฉฐ, DataFrame์ ํ์ฑ ๋ฐ ์ ์ฒ๋ฆฌํ ์ ์๋ค๋ ๋ป์ด๋ค.
2๊ฐ์ ๋ฐฑ์๋ ์ปดํฌ๋ํธ๊ฐ ์กด์ฌํ๋๋ฐ, Catalyst์ Tungsten์ด ๊ทธ๊ฒ์ด๋ค. ๊ธฐ๋ฅ์ ๋ค์๊ณผ ๊ฐ๋ค.
Catalyst - ์ฟผ๋ฆฌ ์ต์ ํ ์์ง
Tungsten - ์๋ฆฌ์ผ๋ผ์ด์ (์ฉ๋ ์ต์ ํ)
๋ฐฑ์๋์ ๋ํ ์์ธํ ๋ด์ฉ์ ์ดํ ํฌ์คํ ์์ ๋ค๋ฃจ๊ฒ ๋ค.
SparkSQL ์ฌ์ฉํ๊ธฐ
Import
import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession
SparkCore์์ SparkContext๋ฅผ ์ด์ฉํ๋ค๋ฉด, SparkSQL์์ SparkSession์ ์ด์ฉํ๋ค.
# SparkSession์ ๋จผ์ ์
๋ ฅ
# local ํ๊ฒฝ์์ ๋๋ฆฌ๋ ๊ฒฝ์ฐ, 'local'์ด๋ผ๊ณ ์ค์
# appName()์ ์์ ์ด ์ํ๋ ์ด๋ฆ์ผ๋ก ํ๋ฉด ๋๋ค.
# SparkSession์ SparkContext์ ๋ง์ฐฌ๊ฐ์ง๋ก 1ํ ๋ถ๋ฌ์ฌ ์ ์๋ค.
spark = SparkSession.builder.master('local').appName("sql-learn").getOrCreate()
createDataFrame() ํจ์๋ฅผ ์ด์ฉํ์ฌ DataFrame์ ํ์ฑํ๋ค.
# Data
stocks = [
('Google', 'GOOGL', 'USA', 2984, 'USD'),
('Netflix', 'NFLX', 'USA', 645, 'USD'),
('Amazon', 'AMZN', 'USA', 3518, 'USD'),
('Tesla', 'TSLA', 'USA', 1222, 'USD'),
('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
('Toyota', '7203', 'Japan', 2006, 'JPY'),
('Samsung', '005930', 'Korea', 70600, 'KRW'),
('Kakao', '035720', 'Korea', 125000, 'KRW'),
]
# Schema
stockSchema = ['name', 'ticker', 'country', 'price', 'currency']
# createDataFrame()
df = spark.createDataFrame(data = stocks, schema=stockSchema)
df.dtypes
[('name', 'string'),
('ticker', 'string'),
('country', 'string'),
('price', 'bigint'),
('currency', 'string')]
----
df.show()
+-------+------+---------+------+--------+
| name|ticker| country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL| USA| 2984| USD|
|Netflix| NFLX| USA| 645| USD|
| Amazon| AMZN| USA| 3518| USD|
| Tesla| TSLA| USA| 1222| USD|
|Tencent| 0700|Hong Kong| 483| HKD|
| Toyota| 7203| Japan| 2006| JPY|
|Samsung|005930| Korea| 70600| KRW|
| Kakao|035720| Korea|125000| KRW|
+-------+------+---------+------+--------+
createOrReplaceTempView() ์ด์ฉ
์ด ํจ์๋ฅผ ํตํด DataFrame์ SQL๋ฌธ์ผ๋ก ๋ถ๋ฌ์ค๊ณ , ์ ์ฒ๋ฆฌํ ์ ์๋ค.
# dataframe์ ๋จผ์ ์
๋ ฅ
# ํจ์ ์
๋ ฅ
# createOrReplaceTempView() ํจ์ ์์ชฝ์๋ SQL๋ฌธ ์ธ ๋ ๋ฐ์ดํฐํ๋ ์์ ์ด๋ค ์ด๋ฆ์ผ๋ก ์ฌ์ฉํ ๊ฒ์ธ์ง
# ์ ์ํ๋ค.
df.createOrReplaceTempView('stocks')
SQL๋ฌธ์ผ๋ก ์์ ๋กญ๊ฒ ๋ฐ์ดํฐ๋ฅผ ๋ค๋ค๋ณด์.
spark.sql("select name from stocks").show()
+--------------+
|(price > 2000)|
+--------------+
| true|
| false|
| true|
| false|
| false|
| true|
| true|
| true|
+--------------+
----
spark.sql("select name, price from stocks where country = 'Korea'").show()
+-------+------+
| name| price|
+-------+------+
|Samsung| 70600|
| Kakao|125000|
+-------+------+
----
spark.sql("select name, price from stocks order by price asc").show()
+-------+------+
| name| price|
+-------+------+
|Tencent| 483|
|Netflix| 645|
| Tesla| 1222|
| Toyota| 2006|
| Google| 2984|
| Amazon| 3518|
|Samsung| 70600|
| Kakao|125000|
+-------+------+
----
spark.sql("select count(price) from stocks where country in ('Korea', 'USA')").show()
+------------+
|count(price)|
+------------+
| 6|
+------------+
๋ค๋ฅธ ๋ฐ์ดํฐ๋ฅผ Join ํด๋ณด์.
earnings = [
('Google', 27.99, 'USD'),
('Netflix', 2.56, 'USD'),
('Amazon', 6.12, 'USD'),
('Tesla', 1.86, 'USD'),
('Tencent', 11.01, 'HKD'),
('Toyota', 224.82, 'JPY'),
('Samsung', 1780., 'KRW'),
('Kakao', 705., 'KRW')
]
from pyspark.sql.types import StringType, FloatType, StructType, StructField
# Schema์์ DataType์ ์ง์ ์ง์ ํด์ค ์ ์๋ค.
earningsSchema = StructType([
StructField("name", StringType(), True),
StructField("EPS", StringType(), True),
StructField("currency", StringType(), True)
])
df_earning = spark.createDataFrame(data=earnings, schema=earningsSchema)
df_earning.createOrReplaceTempView('earnings')
SQL๋ฌธ์ ์ฐ์ง ์๊ณ ๋, ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃฐ ์ ์๋ค. ํ์ง๋ง SQL๋ฌธ์ ํตํด ๋ ๋ง์ด ๋ค๋ฃฌ๋ค.
df_earning.select("*").show()
+-------+------+--------+
| name| EPS|currency|
+-------+------+--------+
| Google| 27.99| USD|
|Netflix| 2.56| USD|
| Amazon| 6.12| USD|
| Tesla| 1.86| USD|
|Tencent| 11.01| HKD|
| Toyota|224.82| JPY|
|Samsung|1780.0| KRW|
| Kakao| 705.0| KRW|
+-------+------+--------+
JOIN ํ์ฌ ๋ฐ์ดํฐ๋ฅผ ๋ ๋ค์ํ ๊ฐ๋์์ ์ดํด๋ณด์!
spark.sql("select * from stocks join earnings on stocks.name = earnings.name").show()
+-------+------+---------+------+--------+-------+------+--------+
| name|ticker| country| price|currency| name| EPS|currency|
+-------+------+---------+------+--------+-------+------+--------+
| Amazon| AMZN| USA| 3518| USD| Amazon| 6.12| USD|
| Google| GOOGL| USA| 2984| USD| Google| 27.99| USD|
| Kakao|035720| Korea|125000| KRW| Kakao| 705.0| KRW|
|Netflix| NFLX| USA| 645| USD|Netflix| 2.56| USD|
|Samsung|005930| Korea| 70600| KRW|Samsung|1780.0| KRW|
|Tencent| 0700|Hong Kong| 483| HKD|Tencent| 11.01| HKD|
| Tesla| TSLA| USA| 1222| USD| Tesla| 1.86| USD|
| Toyota| 7203| Japan| 2006| JPY| Toyota|224.82| JPY|
+-------+------+---------+------+--------+-------+------+--------+
----
# PER = Price / EPS
spark.sql("select stocks.name, (stocks.price/earnings.EPS) AS PER from stocks join earnings on stocks.name = earnings.name").show()
+-------+------------------+
| name| PER|
+-------+------------------+
| Amazon| 574.8366013071895|
| Google|106.60950339406932|
| Kakao| 177.3049645390071|
|Netflix| 251.953125|
|Samsung|39.662921348314605|
|Tencent|43.869209809264305|
| Tesla| 656.989247311828|
| Toyota| 8.922693710523975|
+-------+------------------+
์ง๊ธ๊น์ง SparkSQL์ ์ฌ์ฉํ๋ ๋ฐฉ๋ฒ์ ๊ฐ๋ตํ๊ฒ ์์๋ณด์๋ค.
๋ค์ ํฌ์คํ ์์ DataFrame์ ๋ ์์ธํ๊ฒ ๋ค๋ค๋ณด๋๋ก ํ๊ฒ ๋ค.
์๊ณ ํ์ต๋๋ค!
'๐ Data Engineering > Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[SparkSQL] UDF ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.08 |
---|---|
[SparkSQL] DataFrame ๋ค๋ฃจ๊ธฐ (0) | 2022.05.07 |
[Spark] Reduction ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.01 |
[Spark] Spark ์๋ ์ต์ ํ, Cashe() & Persist() (0) | 2022.05.01 |
[Spark] Transformations & Actions ํจ์ (0) | 2022.05.01 |