2022. 5. 8. 12:57ใ๐ Data Engineering/Apache Spark

UDF
User Define Function. ์ฆ, ์ฌ์ฉ์ ์ง์ ํจ์๋ฅผ ๋งํ๋ค.
์์ ํฌ์คํ ์์ ๋ฐ์ดํฐ ํ๋ ์์ ์ฒ๋ฆฌํ๋ ์ฌ๋ฌ ํจ์๋ค์ ๋ณด์๋ค.
SparkSQL์์ ์ฌ์ฉ์๊ฐ ์ง์ ์ด๋ค ์์ ์ ์ํํ๋ ํจ์๋ฅผ ๋ช ๋ช ํ๊ณ , Spark์ ๋ฑ๋กํ์ฌ ์ธ ์ ์๋ค.
์์ธํ๊ฒ ์์๋ณด์.
"๋ณธ ํฌ์คํ ์ ํจ์คํธ์บ ํผ์ค์ ๊ฐ์๋ฅผ ๋ฃ๊ณ , ์ ๋ฆฌํ ์๋ฃ์์ ๋ฐํ๋๋ค."
Basic Setting
import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName("dataframe").getOrCreate()
# Data
stocks = [
('Google', 'GOOGL', 'USA', 2984, 'USD'),
('Netflix', 'NFLX', 'USA', 645, 'USD'),
('Amazon', 'AMZN', 'USA', 3518, 'USD'),
('Tesla', 'TSLA', 'USA', 1222, 'USD'),
('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
('Toyota', '7203', 'Japan', 2006, 'JPY'),
('Samsung', '005930', 'Korea', 70600, 'KRW'),
('Kakao', '035720', 'Korea', 125000, 'KRW'),
]
# Schema
stockSchema = ['name', 'ticker', 'country', 'price', 'currency']
# createDataFrame()
df = spark.createDataFrame(data = stocks, schema=stockSchema)
# createOrReplaceTempView()
df.createOrReplaceTempView("stock")
UDF
์ฌ์ฉ์๊ฐ ์ง์ ์ ์ํ๋ ํจ์๋ฅผ ๋งํ๋ค.
User Define Function.
1. spark.udf.register()
์ ์๋ ํจ์๋ฅผ spark์์ ์ธ ์ ์๋๋ก ๋ฑ๋กํ๋ค.
spark.udf.register("ํจ์ ์ด๋ฆ", ํจ์, returnํ ๋ฐ์ดํฐ ํ์ )
from pyspark.sql.types import LongType
def squared(n):
return n * n
spark.udf.register("squared", squared, LongType())
์ฌ์ฉํ๊ธฐ
# ๊ฐ๊ฒฉ์ ์ ๊ณฑํ๋ค.
spark.sql("select name, squared(price) from stocks").show()
+-------+--------------+
| name|squared(price)|
+-------+--------------+
| Google| 8904256|
|Netflix| 416025|
| Amazon| 12376324|
| Tesla| 1493284|
|Tencent| 233289|
| Toyota| 4024036|
|Samsung| 4984360000|
| Kakao| 15625000000|
+-------+--------------+
----
# ์ ๊ณฑํ ๊ฐ๊ฒฉ์ด 1,000,000์ ๋์ง ์๋ ๊ฒฝ์ฐ์๋ง ์ถ๋ ฅํ๋๋ก ํ๋ค.
spark.sql("select name, squared(price) from stocks where squared(price) < 1000000").show()
+-------+--------------+
| name|squared(price)|
+-------+--------------+
|Netflix| 416025|
|Tencent| 233289|
+-------+--------------+
2. ์กฐ๊ธ ๋ ์ค์ฉ์ ์ธ ํจ์๋ฅผ ๋ง๋ค์ด ๋ณด์.
# ํตํ๋ฅผ ํ๊ธ๋ก ๋ณํํ๊ณ , ๊ฐ๊ฒฉ์ด๋ ํฉ์ณ์ฃผ์.
def currency_ko(n):
if n == 'USD':
return '๋ฌ๋ฌ'
elif n == 'KRW':
return '์'
elif n == 'JPY':
return '์'
else:
return '์์'
spark.udf.register("currency_ko", currency_ko)
<function __main__.currency_ko(n)>
----
# SQL๋ฌธ์ CONCAT() ํจ์๋ฅผ ์จ์คฌ๋ค.
spark.sql("select name, concat(price, currency_ko(currency)) as price from stocks").show()
+-------+--------+
| name| price|
+-------+--------+
| Google|2984๋ฌ๋ฌ|
|Netflix| 645๋ฌ๋ฌ|
| Amazon|3518๋ฌ๋ฌ|
| Tesla|1222๋ฌ๋ฌ|
|Tencent| 483์์|
| Toyota| 2006์|
|Samsung| 70600์|
| Kakao|125000์|
+-------+--------+
UDF์ ๋ํด ์์๋ณด์๋ค.
๋ค์ ํฌ์คํ ์์ SparkSQL์ ๋ฐฑ์๋ ํ๋ก์ธ์ฑ์ ๋ํด ์์๋ณด์.
์๊ณ ํ์ จ์ต๋๋ค.
'๐ Data Engineering > Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[SparkSQL] ํ์ ๋ฐ์ดํฐ ๋ค์ด/์ ์ฒ๋ฆฌ/๋ถ์ feat. TLC (0) | 2022.05.10 |
---|---|
[SparkSQL] Catalyst, Tungsten ์๋ ์๋ฆฌ (0) | 2022.05.09 |
[SparkSQL] DataFrame ๋ค๋ฃจ๊ธฐ (0) | 2022.05.07 |
[Spark] SparkSQL ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.06 |
[Spark] Reduction ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.01 |