2022. 5. 8. 12:57γπ Data Engineering/Apache Spark
UDF
User Define Function. μ¦, μ¬μ©μ μ§μ ν¨μλ₯Ό λ§νλ€.
μμ ν¬μ€ν μμ λ°μ΄ν° νλ μμ μ²λ¦¬νλ μ¬λ¬ ν¨μλ€μ 보μλ€.
SparkSQLμμ μ¬μ©μκ° μ§μ μ΄λ€ μμ μ μννλ ν¨μλ₯Ό λͺ λͺ νκ³ , Sparkμ λ±λ‘νμ¬ μΈ μ μλ€.
μμΈνκ² μμ보μ.
"λ³Έ ν¬μ€ν μ ν¨μ€νΈμΊ νΌμ€μ κ°μλ₯Ό λ£κ³ , μ 리ν μλ£μμ λ°νλλ€."
Basic Setting
import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName("dataframe").getOrCreate()
# Data
stocks = [
('Google', 'GOOGL', 'USA', 2984, 'USD'),
('Netflix', 'NFLX', 'USA', 645, 'USD'),
('Amazon', 'AMZN', 'USA', 3518, 'USD'),
('Tesla', 'TSLA', 'USA', 1222, 'USD'),
('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
('Toyota', '7203', 'Japan', 2006, 'JPY'),
('Samsung', '005930', 'Korea', 70600, 'KRW'),
('Kakao', '035720', 'Korea', 125000, 'KRW'),
]
# Schema
stockSchema = ['name', 'ticker', 'country', 'price', 'currency']
# createDataFrame()
df = spark.createDataFrame(data = stocks, schema=stockSchema)
# createOrReplaceTempView()
df.createOrReplaceTempView("stock")
UDF
μ¬μ©μκ° μ§μ μ μνλ ν¨μλ₯Ό λ§νλ€.
User Define Function.
1. spark.udf.register()
μ μλ ν¨μλ₯Ό sparkμμ μΈ μ μλλ‘ λ±λ‘νλ€.
spark.udf.register("ν¨μ μ΄λ¦", ν¨μ, returnν λ°μ΄ν° νμ )
from pyspark.sql.types import LongType
def squared(n):
return n * n
spark.udf.register("squared", squared, LongType())
μ¬μ©νκΈ°
# κ°κ²©μ μ κ³±νλ€.
spark.sql("select name, squared(price) from stocks").show()
+-------+--------------+
| name|squared(price)|
+-------+--------------+
| Google| 8904256|
|Netflix| 416025|
| Amazon| 12376324|
| Tesla| 1493284|
|Tencent| 233289|
| Toyota| 4024036|
|Samsung| 4984360000|
| Kakao| 15625000000|
+-------+--------------+
----
# μ κ³±ν κ°κ²©μ΄ 1,000,000μ λμ§ μλ κ²½μ°μλ§ μΆλ ₯νλλ‘ νλ€.
spark.sql("select name, squared(price) from stocks where squared(price) < 1000000").show()
+-------+--------------+
| name|squared(price)|
+-------+--------------+
|Netflix| 416025|
|Tencent| 233289|
+-------+--------------+
2. μ‘°κΈ λ μ€μ©μ μΈ ν¨μλ₯Ό λ§λ€μ΄ 보μ.
# ν΅νλ₯Ό νκΈλ‘ λ³ννκ³ , κ°κ²©μ΄λ ν©μ³μ£Όμ.
def currency_ko(n):
if n == 'USD':
return 'λ¬λ¬'
elif n == 'KRW':
return 'μ'
elif n == 'JPY':
return 'μ'
else:
return 'μμ'
spark.udf.register("currency_ko", currency_ko)
<function __main__.currency_ko(n)>
----
# SQLλ¬Έμ CONCAT() ν¨μλ₯Ό μ¨μ€¬λ€.
spark.sql("select name, concat(price, currency_ko(currency)) as price from stocks").show()
+-------+--------+
| name| price|
+-------+--------+
| Google|2984λ¬λ¬|
|Netflix| 645λ¬λ¬|
| Amazon|3518λ¬λ¬|
| Tesla|1222λ¬λ¬|
|Tencent| 483μμ|
| Toyota| 2006μ|
|Samsung| 70600μ|
| Kakao|125000μ|
+-------+--------+
UDFμ λν΄ μμ보μλ€.
λ€μ ν¬μ€ν μμ SparkSQLμ λ°±μλ νλ‘μΈμ±μ λν΄ μμ보μ.
μκ³ νμ ¨μ΅λλ€.
'π Data Engineering > Apache Spark' μΉ΄ν κ³ λ¦¬μ λ€λ₯Έ κΈ
[SparkSQL] νμ λ°μ΄ν° λ€μ΄/μ μ²λ¦¬/λΆμ feat. TLC (0) | 2022.05.10 |
---|---|
[SparkSQL] Catalyst, Tungsten μλ μ리 (0) | 2022.05.09 |
[SparkSQL] DataFrame λ€λ£¨κΈ° (0) | 2022.05.07 |
[Spark] SparkSQL κ°λ λ° μ½λ (0) | 2022.05.06 |
[Spark] Reduction κ°λ λ° μ½λ (0) | 2022.05.01 |