2022. 5. 7. 12:27ใ๐ Data Engineering/Apache Spark
DataFrame
SparkSQL์์ ๋ค๋ฃจ๋ Structured Data๋ก ์์ฃผ ์ฃผ์ ๊ฐ๋ ์ด๋ค.
๊ธฐ๋ณธ์ ์ผ๋ก Lazy Execution, ๋ถ์ฐ, Immutable์ด๋ RDD์ ์ฅ์ ์ ๊ฐ์ง๊ณผ ๋์์
๊ตฌ์กฐํ(Structured)๋์ด ์์ด ์๋ ์ต์ ํ๊น์ง ๊ฐ๋ฅํ๋ค.
CSV, JSON, Hive ๋ฑ์ผ๋ก ์ฝ๊ฑฐ๋ ๋ณํ๋ ๊ฐ๋ฅํ๋ค.
๋ณธ๊ฒฉ์ ์ผ๋ก DataFrame์ ๋ค๋ค๋ณด์.
"๋ณธ ํฌ์คํ ์ ํจ์คํธ์บ ํผ์ค์ ๊ฐ์๋ฅผ ๋ฃ๊ณ , ์ ๋ฆฌํ ์๋ฃ์์ ๋ฐํ๋๋ค."
Basic Setting
import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName("dataframe").getOrCreate()
# Data
stocks = [
('Google', 'GOOGL', 'USA', 2984, 'USD'),
('Netflix', 'NFLX', 'USA', 645, 'USD'),
('Amazon', 'AMZN', 'USA', 3518, 'USD'),
('Tesla', 'TSLA', 'USA', 1222, 'USD'),
('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
('Toyota', '7203', 'Japan', 2006, 'JPY'),
('Samsung', '005930', 'Korea', 70600, 'KRW'),
('Kakao', '035720', 'Korea', 125000, 'KRW'),
]
# Schema
stockSchema = ['name', 'ticker', 'country', 'price', 'currency']
# createDataFrame()
df = spark.createDataFrame(data = stocks, schema=stockSchema)
SparkSQL's DataFrame ๊ธฐ๋ณธ ํจ์
1. df.dtypes
๋ฐ์ดํฐ ํ๋ ์์ ์นผ๋ผ๊ณผ ํ์ ์ ๋ณด์ฌ์ค๋ค.
df.dtypes
[('name', 'string'),
('ticker', 'string'),
('country', 'string'),
('price', 'bigint'),
('currency', 'string')]
2. df.show()
๋ฐ์ดํฐ ํ๋ ์์ ์ ์ฒด ํํ๋ฅผ ๋ณด์ฌ์ค๋ค.
df.show()
+-------+------+---------+------+--------+
| name|ticker| country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL| USA| 2984| USD|
|Netflix| NFLX| USA| 645| USD|
| Amazon| AMZN| USA| 3518| USD|
| Tesla| TSLA| USA| 1222| USD|
|Tencent| 0700|Hong Kong| 483| HKD|
| Toyota| 7203| Japan| 2006| JPY|
|Samsung|005930| Korea| 70600| KRW|
| Kakao|035720| Korea|125000| KRW|
+-------+------+---------+------+--------+
3. df.printSchema()
๋ฐ์ดํฐ ํ๋ ์์ ์คํค๋ง๋ฅผ ๋ณผ ์ ์๋ค.
df.printSchema()
root
|-- name: string (nullable = true)
|-- ticker: string (nullable = true)
|-- country: string (nullable = true)
|-- price: long (nullable = true)
|-- currency: string (nullable = true)
4. df.Select()
๋ฐ์ดํฐ ํ๋ ์์์ ์ํ๋ Column์ด๋ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถํ ์ ์๋ค.
df.select('name', 'currency').collect()
[Row(name='Google', currency='USD'),
Row(name='Netflix', currency='USD'),
Row(name='Amazon', currency='USD'),
Row(name='Tesla', currency='USD'),
Row(name='Tencent', currency='HKD'),
Row(name='Toyota', currency='JPY'),
Row(name='Samsung', currency='KRW'),
Row(name='Kakao', currency='KRW')]
5. df.agg()
Aggregate(์งํฉ์ ์ธ)์ ์ฝ์. ๊ทธ๋ฃนํํ์ฌ ๋ฐ์ดํฐ๋ฅผ ํฉ์น๋ ์์ ์ด๋ค.
df.agg({'price':'mean'}).collect()
[Row(avg(price)=25807.25)]
----
# sql function์ importํ์ฌ aggregate์ ํ์ฉ
from pyspark.sql import functions as F
df.agg(F.count(df.currency)).collect()
[Row(count(currency)=8)]
6. df.groupBy()
์ง์ Column์ ๊ธฐ์ค์ผ๋ก ๋ฐ์ดํฐ๋ฅผ Grouping ํ๋ ์์ ์ด๋ค.
# currency๋ก ๊ทธ๋ฃจํํ ํ, ๊ฐ๊ฒฉ์ด ์ต๋์ธ ๊ฐ์ ๋ฝ๋๋ก ํ๋ค.
df.groupBy('currency').agg({'price':'max'}).collect()
[Row(currency='KRW', max(price)=125000),
Row(currency='JPY', max(price)=2006),
Row(currency='HKD', max(price)=483),
Row(currency='USD', max(price)=3518)]
7. df.join()
๋ฐ์ดํฐํ๋ ์์ ๋ค๋ฅธ ๋ฐ์ดํฐ์ ํฉ์น๋ ์์ ์ด๋ค.
# ๋ฐ์ดํฐํ๋ ์ joinํ๊ธฐ
df.join(df_earning, 'name').collect()
[Row(name='Amazon', ticker='AMZN', country='USA', price=3518, currency='USD', EPS='6.12', currency='USD'),
Row(name='Google', ticker='GOOGL', country='USA', price=2984, currency='USD', EPS='27.99', currency='USD'),
Row(name='Kakao', ticker='035720', country='Korea', price=125000, currency='KRW', EPS='705.0', currency='KRW'),
Row(name='Netflix', ticker='NFLX', country='USA', price=645, currency='USD', EPS='2.56', currency='USD'),
Row(name='Samsung', ticker='005930', country='Korea', price=70600, currency='KRW', EPS='1780.0', currency='KRW'),
Row(name='Tencent', ticker='0700', country='Hong Kong', price=483, currency='HKD', EPS='11.01', currency='HKD'),
Row(name='Tesla', ticker='TSLA', country='USA', price=1222, currency='USD', EPS='1.86', currency='USD'),
Row(name='Toyota', ticker='7203', country='Japan', price=2006, currency='JPY', EPS='224.82', currency='JPY')]
----
df.join(df_earning, 'name').select(df.name, df_earning.EPS).collect()
[Row(name='Amazon', EPS='6.12'),
Row(name='Google', EPS='27.99'),
Row(name='Kakao', EPS='705.0'),
Row(name='Netflix', EPS='2.56'),
Row(name='Samsung', EPS='1780.0'),
Row(name='Tencent', EPS='11.01'),
Row(name='Tesla', EPS='1.86'),
Row(name='Toyota', EPS='224.82')]
8. ๋ง๋๋ก ์กฐ์ํด๋ณด๊ธฐ
# where๋ฌธ์ ์ด์ฉํด์ EPS์ ํ์ ๋๊ณ , orderBy ํจ์๋ฅผ ์ด์ฉํด์ ์ค์ธ์ ๋ค.
df.join(df_earning, 'name').where(df_earning.EPS > 5).orderBy(df.price).collect()
[Row(name='Tencent', ticker='0700', country='Hong Kong', price=483, currency='HKD', EPS='11.01', currency='HKD'),
Row(name='Toyota', ticker='7203', country='Japan', price=2006, currency='JPY', EPS='224.82', currency='JPY'),
Row(name='Google', ticker='GOOGL', country='USA', price=2984, currency='USD', EPS='27.99', currency='USD'),
Row(name='Amazon', ticker='AMZN', country='USA', price=3518, currency='USD', EPS='6.12', currency='USD'),
Row(name='Samsung', ticker='005930', country='Korea', price=70600, currency='KRW', EPS='1780.0', currency='KRW'),
Row(name='Kakao', ticker='035720', country='Korea', price=125000, currency='KRW', EPS='705.0', currency='KRW')]
----
# ๋๋ผ๋ฅผ ๊ธฐ์ค์ผ๋ก Grouping ํ, ๊ฐ๊ฒฉ์ ํ๊ท ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ถ๋ ธ๋ค.
df.join(df_earning, 'name').groupBy(df.country).agg(F.mean(df.price)).collect()
[Row(country='Hong Kong', avg(price)=483.0),
Row(country='USA', avg(price)=2092.25),
Row(country='Japan', avg(price)=2006.0),
Row(country='Korea', avg(price)=97800.0)]
์ด์ SparkSQL์์ ๋ฐ์ดํฐ ํ๋ ์์ ๋ค๋ฃจ๋ ๋ฐฉ๋ฒ์ ์์๋ณด์๋ค.
๋ค์ ํฌ์คํ ์์ ์ฌ์ฉ์ ์ง์ ํจ์(User Define Function)์ ๋ํด ์์๋ณด์.
์๊ณ ํ์ จ์ต๋๋ค.
'๐ Data Engineering > Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[SparkSQL] Catalyst, Tungsten ์๋ ์๋ฆฌ (0) | 2022.05.09 |
---|---|
[SparkSQL] UDF ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.08 |
[Spark] SparkSQL ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.06 |
[Spark] Reduction ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.01 |
[Spark] Spark ์๋ ์ต์ ํ, Cashe() & Persist() (0) | 2022.05.01 |