[SparkSQL] UDF κ°œλ… 및 μ½”λ“œ

2022. 5. 8. 12:57γ†πŸ›  Data Engineering/Apache Spark

 

 

UDF

 

User Define Function. 즉, μ‚¬μš©μž 지정 ν•¨μˆ˜λ₯Ό λ§ν•œλ‹€.

μ•žμ„  ν¬μŠ€νŒ…μ—μ„œ 데이터 ν”„λ ˆμž„μ„ μ²˜λ¦¬ν•˜λŠ” μ—¬λŸ¬ ν•¨μˆ˜λ“€μ„ λ³΄μ•˜λ‹€.

SparkSQL에선 μ‚¬μš©μžκ°€ 직접 μ–΄λ–€ μž‘μ—…μ„ μˆ˜ν–‰ν•˜λŠ” ν•¨μˆ˜λ₯Ό λͺ…λͺ…ν•˜κ³ , Spark에 λ“±λ‘ν•˜μ—¬ μ“Έ 수 μžˆλ‹€.

μžμ„Έν•˜κ²Œ μ•Œμ•„λ³΄μž. 

 

 

"λ³Έ ν¬μŠ€νŒ…μ€ νŒ¨μŠ€νŠΈμΊ νΌμŠ€μ˜ κ°•μ˜λ₯Ό λ“£κ³ , μ •λ¦¬ν•œ μžλ£Œμž„μ„ λ°νž™λ‹ˆλ‹€."

 

 

 

 

Basic Setting

 

import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName("dataframe").getOrCreate()

# Data

stocks = [
    ('Google', 'GOOGL', 'USA', 2984, 'USD'), 
    ('Netflix', 'NFLX', 'USA', 645, 'USD'),
    ('Amazon', 'AMZN', 'USA', 3518, 'USD'),
    ('Tesla', 'TSLA', 'USA', 1222, 'USD'),
    ('Tencent', '0700', 'Hong Kong', 483, 'HKD'),
    ('Toyota', '7203', 'Japan', 2006, 'JPY'),
    ('Samsung', '005930', 'Korea', 70600, 'KRW'),
    ('Kakao', '035720', 'Korea', 125000, 'KRW'),
]


# Schema

stockSchema = ['name', 'ticker', 'country', 'price', 'currency']


# createDataFrame()
df = spark.createDataFrame(data = stocks, schema=stockSchema)

# createOrReplaceTempView()
df.createOrReplaceTempView("stock")

 

 

 

 

UDF


μ‚¬μš©μžκ°€ 직접 μ •μ˜ν•˜λŠ” ν•¨μˆ˜λ₯Ό λ§ν•œλ‹€.

User Define Function.

 

 

1. spark.udf.register()

μ •μ˜λœ ν•¨μˆ˜λ₯Ό sparkμ—μ„œ μ“Έ 수 μžˆλ„λ‘ λ“±λ‘ν•œλ‹€.

spark.udf.register("ν•¨μˆ˜ 이름", ν•¨μˆ˜, returnν•  데이터 νƒ€μž…)

 

from pyspark.sql.types import LongType

def squared(n):
    return n * n

spark.udf.register("squared", squared, LongType())

 

μ‚¬μš©ν•˜κΈ°

 

# 가격을 μ œκ³±ν–ˆλ‹€.
spark.sql("select name, squared(price) from stocks").show()


+-------+--------------+
|   name|squared(price)|
+-------+--------------+
| Google|       8904256|
|Netflix|        416025|
| Amazon|      12376324|
|  Tesla|       1493284|
|Tencent|        233289|
| Toyota|       4024036|
|Samsung|    4984360000|
|  Kakao|   15625000000|
+-------+--------------+

----

# μ œκ³±ν•œ 가격이 1,000,000을 λ„˜μ§€ μ•ŠλŠ” κ²½μš°μ—λ§Œ 좜λ ₯ν•˜λ„λ‘ ν–ˆλ‹€.
spark.sql("select name, squared(price) from stocks where squared(price) < 1000000").show()


+-------+--------------+
|   name|squared(price)|
+-------+--------------+
|Netflix|        416025|
|Tencent|        233289|
+-------+--------------+

 

 

 

2. 쑰금 더 μ‹€μš©μ μΈ ν•¨μˆ˜λ₯Ό λ§Œλ“€μ–΄ 보자.

 

# 톡화λ₯Ό ν•œκΈ€λ‘œ λ³€ν™˜ν•˜κ³ , κ°€κ²©μ΄λž‘ ν•©μ³μ£Όμž.
def currency_ko(n):
    if n == 'USD':
        return 'λ‹¬λŸ¬'
    elif n == 'KRW':
        return '원'
    elif n == 'JPY':
        return 'μ—”'
    else:
        return 'μœ„μ•ˆ'

spark.udf.register("currency_ko", currency_ko)


<function __main__.currency_ko(n)>

----

# SQL문의 CONCAT() ν•¨μˆ˜λ₯Ό 써쀬닀.
spark.sql("select name, concat(price, currency_ko(currency)) as price from stocks").show()


+-------+--------+
|   name|   price|
+-------+--------+
| Google|2984λ‹¬λŸ¬|
|Netflix| 645λ‹¬λŸ¬|
| Amazon|3518λ‹¬λŸ¬|
|  Tesla|1222λ‹¬λŸ¬|
|Tencent| 483μœ„μ•ˆ|
| Toyota|  2006μ—”|
|Samsung| 70600원|
|  Kakao|125000원|
+-------+--------+

 

 

 

UDF에 λŒ€ν•΄ μ•Œμ•„λ³΄μ•˜λ‹€.

λ‹€μŒ ν¬μŠ€νŒ…μ—μ„  SparkSQL의 λ°±μ—”λ“œ ν”„λ‘œμ„Έμ‹±μ— λŒ€ν•΄ μ•Œμ•„λ³΄μž.

μˆ˜κ³ ν•˜μ…¨μŠ΅λ‹ˆλ‹€.