[SparkSQL] DataFrame ๋‹ค๋ฃจ๊ธฐ

2022. 5. 7. 12:27ใ†๐Ÿ›  Data Engineering/Apache Spark

 

 

 

DataFrame

SparkSQL์—์„œ ๋‹ค๋ฃจ๋Š” Structured Data๋กœ ์•„์ฃผ ์ฃผ์š” ๊ฐœ๋…์ด๋‹ค.

๊ธฐ๋ณธ์ ์œผ๋กœ Lazy Execution, ๋ถ„์‚ฐ, Immutable์ด๋ž€ RDD์˜ ์žฅ์ ์„ ๊ฐ€์ง๊ณผ ๋™์‹œ์—

๊ตฌ์กฐํ™”(Structured)๋˜์–ด ์žˆ์–ด ์ž๋™ ์ตœ์ ํ™”๊นŒ์ง€ ๊ฐ€๋Šฅํ•˜๋‹ค.

CSV, JSON, Hive ๋“ฑ์œผ๋กœ ์ฝ๊ฑฐ๋‚˜ ๋ณ€ํ™˜๋„ ๊ฐ€๋Šฅํ•˜๋‹ค.

๋ณธ๊ฒฉ์ ์œผ๋กœ DataFrame์„ ๋‹ค๋ค„๋ณด์ž.

 

 

"๋ณธ ํฌ์ŠคํŒ…์€ ํŒจ์ŠคํŠธ์บ ํผ์Šค์˜ ๊ฐ•์˜๋ฅผ ๋“ฃ๊ณ , ์ •๋ฆฌํ•œ ์ž๋ฃŒ์ž„์„ ๋ฐํž™๋‹ˆ๋‹ค."

 

 

 

 

 

Basic Setting

 

import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName("dataframe").getOrCreate()

# Data

stocks = [
    ('Google', 'GOOGL', 'USA', 2984, 'USD'), 
    ('Netflix', 'NFLX', 'USA', 645, 'USD'),
    ('Amazon', 'AMZN', 'USA', 3518, 'USD'),
    ('Tesla', 'TSLA', 'USA', 1222, 'USD'),
    ('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
    ('Toyota', '7203', 'Japan', 2006, 'JPY'),
    ('Samsung', '005930', 'Korea', 70600, 'KRW'),
    ('Kakao', '035720', 'Korea', 125000, 'KRW'),
]


# Schema

stockSchema = ['name', 'ticker', 'country', 'price', 'currency']


# createDataFrame()
df = spark.createDataFrame(data = stocks, schema=stockSchema)

 

 

 

 

SparkSQL's DataFrame ๊ธฐ๋ณธ ํ•จ์ˆ˜


1. df.dtypes

๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์˜ ์นผ๋Ÿผ๊ณผ ํƒ€์ž…์„ ๋ณด์—ฌ์ค€๋‹ค.

 

df.dtypes


[('name', 'string'),
 ('ticker', 'string'),
 ('country', 'string'),
 ('price', 'bigint'),
 ('currency', 'string')]

 

 

 

2. df.show()

๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์˜ ์ „์ฒด ํ˜•ํƒœ๋ฅผ ๋ณด์—ฌ์ค€๋‹ค.

 

df.show()


+-------+------+---------+------+--------+
|   name|ticker|  country| price|currency|
+-------+------+---------+------+--------+
| Google| GOOGL|      USA|  2984|     USD|
|Netflix|  NFLX|      USA|   645|     USD|
| Amazon|  AMZN|      USA|  3518|     USD|
|  Tesla|  TSLA|      USA|  1222|     USD|
|Tencent|  0700|Hong Kong|   483|     HKD|
| Toyota|  7203|    Japan|  2006|     JPY|
|Samsung|005930|    Korea| 70600|     KRW|
|  Kakao|035720|    Korea|125000|     KRW|
+-------+------+---------+------+--------+

 

 

 

3. df.printSchema()

๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์˜ ์Šคํ‚ค๋งˆ๋ฅผ ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

 

df.printSchema()


root
 |-- name: string (nullable = true)
 |-- ticker: string (nullable = true)
 |-- country: string (nullable = true)
 |-- price: long (nullable = true)
 |-- currency: string (nullable = true)

 

 

 

4. df.Select()

๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์—์„œ ์›ํ•˜๋Š” Column์ด๋‚˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœํ•  ์ˆ˜ ์žˆ๋‹ค.

 

df.select('name', 'currency').collect()


[Row(name='Google', currency='USD'),
 Row(name='Netflix', currency='USD'),
 Row(name='Amazon', currency='USD'),
 Row(name='Tesla', currency='USD'),
 Row(name='Tencent', currency='HKD'),
 Row(name='Toyota', currency='JPY'),
 Row(name='Samsung', currency='KRW'),
 Row(name='Kakao', currency='KRW')]

 

 

 

5. df.agg()

Aggregate(์ง‘ํ•ฉ์ ์ธ)์˜ ์•ฝ์ž. ๊ทธ๋ฃนํ•‘ํ•˜์—ฌ ๋ฐ์ดํ„ฐ๋ฅผ ํ•ฉ์น˜๋Š” ์ž‘์—…์ด๋‹ค.

 

df.agg({'price':'mean'}).collect()


[Row(avg(price)=25807.25)]

----

# sql function์„ importํ•˜์—ฌ aggregate์— ํ™œ์šฉ
from pyspark.sql import functions as F
df.agg(F.count(df.currency)).collect()


[Row(count(currency)=8)]

 

 

 

6. df.groupBy()

์ง€์ • Column์„ ๊ธฐ์ค€์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ Grouping ํ•˜๋Š” ์ž‘์—…์ด๋‹ค.

 

# currency๋กœ ๊ทธ๋ฃจํ•‘ํ•œ ํ›„, ๊ฐ€๊ฒฉ์ด ์ตœ๋Œ€์ธ ๊ฐ’์„ ๋ฝ‘๋„๋ก ํ–ˆ๋‹ค.
df.groupBy('currency').agg({'price':'max'}).collect()


[Row(currency='KRW', max(price)=125000),
 Row(currency='JPY', max(price)=2006),
 Row(currency='HKD', max(price)=483),
 Row(currency='USD', max(price)=3518)]

 

 

 

7. df.join()

๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„์„ ๋‹ค๋ฅธ ๋ฐ์ดํ„ฐ์™€ ํ•ฉ์น˜๋Š” ์ž‘์—…์ด๋‹ค.

 

# ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ joinํ•˜๊ธฐ
df.join(df_earning, 'name').collect()


[Row(name='Amazon', ticker='AMZN', country='USA', price=3518, currency='USD', EPS='6.12', currency='USD'),
 Row(name='Google', ticker='GOOGL', country='USA', price=2984, currency='USD', EPS='27.99', currency='USD'),
 Row(name='Kakao', ticker='035720', country='Korea', price=125000, currency='KRW', EPS='705.0', currency='KRW'),
 Row(name='Netflix', ticker='NFLX', country='USA', price=645, currency='USD', EPS='2.56', currency='USD'),
 Row(name='Samsung', ticker='005930', country='Korea', price=70600, currency='KRW', EPS='1780.0', currency='KRW'),
 Row(name='Tencent', ticker='0700', country='Hong Kong', price=483, currency='HKD', EPS='11.01', currency='HKD'),
 Row(name='Tesla', ticker='TSLA', country='USA', price=1222, currency='USD', EPS='1.86', currency='USD'),
 Row(name='Toyota', ticker='7203', country='Japan', price=2006, currency='JPY', EPS='224.82', currency='JPY')]
 
----

df.join(df_earning, 'name').select(df.name, df_earning.EPS).collect()


[Row(name='Amazon', EPS='6.12'),
 Row(name='Google', EPS='27.99'),
 Row(name='Kakao', EPS='705.0'),
 Row(name='Netflix', EPS='2.56'),
 Row(name='Samsung', EPS='1780.0'),
 Row(name='Tencent', EPS='11.01'),
 Row(name='Tesla', EPS='1.86'),
 Row(name='Toyota', EPS='224.82')]

 

 

 

8. ๋ง˜๋Œ€๋กœ ์กฐ์ž‘ํ•ด๋ณด๊ธฐ

 

# where๋ฌธ์„ ์ด์šฉํ•ด์„œ EPS์ œํ•œ์„ ๋‘๊ณ , orderBy ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•ด์„œ ์ค„์„ธ์› ๋‹ค.
df.join(df_earning, 'name').where(df_earning.EPS > 5).orderBy(df.price).collect()


[Row(name='Tencent', ticker='0700', country='Hong Kong', price=483, currency='HKD', EPS='11.01', currency='HKD'),
 Row(name='Toyota', ticker='7203', country='Japan', price=2006, currency='JPY', EPS='224.82', currency='JPY'),
 Row(name='Google', ticker='GOOGL', country='USA', price=2984, currency='USD', EPS='27.99', currency='USD'),
 Row(name='Amazon', ticker='AMZN', country='USA', price=3518, currency='USD', EPS='6.12', currency='USD'),
 Row(name='Samsung', ticker='005930', country='Korea', price=70600, currency='KRW', EPS='1780.0', currency='KRW'),
 Row(name='Kakao', ticker='035720', country='Korea', price=125000, currency='KRW', EPS='705.0', currency='KRW')]
 
 ----
 
 # ๋‚˜๋ผ๋ฅผ ๊ธฐ์ค€์œผ๋กœ Grouping ํ›„, ๊ฐ€๊ฒฉ์˜ ํ‰๊ท ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”๋ ธ๋‹ค.
 df.join(df_earning, 'name').groupBy(df.country).agg(F.mean(df.price)).collect()
 
 
 [Row(country='Hong Kong', avg(price)=483.0),
 Row(country='USA', avg(price)=2092.25),
 Row(country='Japan', avg(price)=2006.0),
 Row(country='Korea', avg(price)=97800.0)]

 

 

 

์ด์ƒ SparkSQL์—์„œ ๋ฐ์ดํ„ฐ ํ”„๋ ˆ์ž„์„ ๋‹ค๋ฃจ๋Š” ๋ฐฉ๋ฒ•์„ ์•Œ์•„๋ณด์•˜๋‹ค.

๋‹ค์Œ ํฌ์ŠคํŒ…์—์„  ์‚ฌ์šฉ์ž ์ง€์ • ํ•จ์ˆ˜(User Define Function)์— ๋Œ€ํ•ด ์•Œ์•„๋ณด์ž.

์ˆ˜๊ณ ํ•˜์…จ์Šต๋‹ˆ๋‹ค.