[SparkML] MLlib ๊ฐœ๋… ๋ฐ ์‹ค์Šต ์ฝ”๋“œ

2022. 5. 20. 14:08ใ†๐Ÿ›  Data Engineering/Apache Spark

 

MLlib

Machine Learning Library

์ŠคํŒŒํฌ์˜ ์ปดํฌ๋„ŒํŠธ ์ค‘ ํ•˜๋‚˜๋กœ, ๋จธ์‹ ๋Ÿฌ๋‹ ํŒŒ์ดํ”„๋ผ์ธ ๊ฐœ๋ฐœ์„ ์‰ฝ๊ฒŒ ํ•˜๊ธฐ ์œ„ํ•ด ๊ฐœ๋ฐœ๋˜์—ˆ๋‹ค.

MLlib์—๋Š” ๋‹ค์‹œ ์•Œ๊ณ ๋ฆฌ์ฆ˜(Classification ๋“ฑ) + ํŒŒ์ดํ”„๋ผ์ธ(Training, Evaluation ๋“ฑ) + FE + Utils(Statistics ๋“ฑ)์˜ ์—ฌ๋Ÿฌ ์ปดํฌ๋„ŒํŠธ๊ฐ€ ์žˆ๋‹ค. MLlib์€ DataFrame ์œ„์—์„œ ๋™์ž‘ํ•˜๋ฉฐ, MLlib API๋ฅผ Spark ML์ด๋ผ๊ณ  ๋ถ€๋ฅธ๋‹ค.


์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„  MLlib์˜ ํ™œ์šฉ์„ ์•Œ๊ณ ๋ฆฌ์ฆ˜, ํŒŒ์ดํ”„๋ผ์ธ, FE, Utils์— ๊ฑธ์ณ์„œ ์ฐจ๊ทผ์ฐจ๊ทผ ์‚ดํŽด๋ณด๋„๋ก ํ•˜๊ฒ ๋‹ค.

MLlib ํฌ์ŠคํŒ…์—์„œ ์‚ฌ์šฉํ•  ๋ฐ์ดํ„ฐ๋Š” ์ด์ „ ํฌ์ŠคํŒ…์—์„œ ๋‹ค๋ค˜๋˜ 'ํƒ์‹œ ๋ฐ์ดํ„ฐ'์ด๋‹ค.

 

https://mengu.tistory.com/50?category=932924

 

[SparkSQL] ํƒ์‹œ ๋ฐ์ดํ„ฐ ๋‹ค์šด/์ „์ฒ˜๋ฆฌ/๋ถ„์„ feat. TLC

์ด์ „ ํฌ์ŠคํŒ…์—์„œ ๊ณต๋ถ€ํ•œ SparkSQL ์ง€์‹์„ ๋ฐ”ํƒ•์œผ๋กœ, ์‹ค์ œ Taxi ๋ฐ์ดํ„ฐ๋ฅผ ์ „์ฒ˜๋ฆฌํ•ด๋ณด์ž. * ์ „์ฒ˜๋ฆฌ๋ž€? ์ด์ƒ์น˜ ์ œ๊ฑฐ, ๊ทธ๋ฃนํ™” ๋“ฑ ๋ฐ์ดํ„ฐ ๋ถ„์„์ด ์šฉ์ดํ•˜๋„๋ก ๋ฐ์ดํ„ฐ๋ฅผ ๋ณ€ํ˜•ํ•˜๋Š” ๊ณผ์ •์„ ๋งํ•œ๋‹ค. TLC Trip Recor

mengu.tistory.com

 

์œ„์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„๋†“์ง€ ์•Š์•„๋„ ์ถฉ๋ถ„ํžˆ MLlib์€ ๊ฒฌํ•™ํ•  ์ˆ˜ ์žˆ์œผ๋‹ˆ ๋ฌด๋ฆฌํ•˜์ง€ ์•Š์•„๋„ ๋œ๋‹ค.

๊ฐ€๋ณด์ž.

 

 

"๋ณธ ํฌ์ŠคํŒ…์€ ํŒจ์ŠคํŠธ์บ ํผ์Šค์˜ ๊ฐ•์˜๋ฅผ ๋“ฃ๊ณ , ์ •๋ฆฌํ•œ ์ž๋ฃŒ์ž„์„ ๋ฐํž™๋‹ˆ๋‹ค."

 

 

 

 

Basic Settings

๊ธฐ๋ณธ ์„ธํŒ…์ด๋‹ค. ์ž„ํฌํŠธ ํ•ด์ค˜์•ผ ํ•  ๊ฒƒ๋“ค์€ ๋ฏธ๋ฆฌ ํ•ด๋†จ๊ณ , SparkSession์„ ์—ด์–ด๋‘์—ˆ๋‹ค.

 

# ํฐํŠธ ์„ค์ •
from matplotlib import font_manager, rc
font_path = 'C:\\WINDOWS\\Fonts\\HBATANG.TTF'
font = font_manager.FontProperties(fname=font_path).get_name()
rc('font', family=font)

# basic settings
import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession

MAX_MEMORY = "5g"
spark = SparkSession.builder.master('local').appName("taxi-fare-prediction")\
.config("spark.executor.memory", MAX_MEMORY)\
.config("spark.driver.memory", MAX_MEMORY).getOrCreate()

# ๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ๋Š” ํŒŒ์ผ
zone_data = "C:/DE study/data-engineering/01-spark/data/taxi_zone_lookup.csv"
trip_files = "C:/DE study/data-engineering/01-spark/data/trips/*"

# ๋ฐ์ดํ„ฐ ๋กœ๋“œ
trips_df = spark.read.csv(f"file:///{trip_files}", inferSchema = True, header = True)
zone_df = spark.read.csv(f"file:///{zone_data}", inferSchema = True, header = True)

# ๋ฐ์ดํ„ฐ ์Šคํ‚ค๋งˆ
trips_df.printSchema()
zone_df.printSchema()

# ๋ฐ์ดํ„ฐ createOrReplaceTempView()
trips_df.createOrReplaceTempView("trips")
zone_df.createOrReplaceTempView("zone")


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

 

 

 

 

 

DataFrame


๋ฐ์ดํ„ฐ๋ฅผ ์ผ๋‹จ DataFrame ํ˜•์‹์œผ๋กœ ๊ฐ€์ ธ์™€์•ผ ํ•œ๋‹ค. ๋˜ํ•œ ์›ํ•˜๋Š” Input๊ณผ Output์„ ๋‚จ๊ธฐ๊ณ ๋Š” ์ œ๊ฑฐํ•ด๋‘ฌ์•ผ ํ•œ๋‹ค.

 

 

query = '''
SELECT
    trip_distance,
    total_amount
FROM
    trips
WHERE
    total_amount < 5000
    AND total_amount > 0
    AND trip_distance > 0
    AND trip_distance < 500
    AND passenger_count < 4
    AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
    AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
'''

# ์›ํ•˜๋Š” data๋งŒ ๋‚จ๊ฒจ๋‘๊ธฐ
data_df = spark.sql(query)
data_df.createOrReplaceTempView('data')


data_df.show


+-------------+------------+
|trip_distance|total_amount|
+-------------+------------+
|         16.5|       70.07|
|         1.13|       11.16|
|         2.68|       18.59|
|         12.4|        43.8|
|          9.7|        32.3|
|          9.3|       43.67|
|         9.58|        46.1|
|         16.2|        45.3|
|         3.58|        19.3|
|         0.91|        14.8|
|         2.57|        12.8|
|          0.4|         5.3|
|         3.26|        17.3|
|        13.41|       47.25|
|         18.3|       61.42|
|         1.53|       14.16|
|          2.0|        11.8|
|         16.6|       54.96|
|         15.5|       56.25|
|          1.3|        16.8|
+-------------+------------+
only showing top 20 rows

 

 

 

ํ›ˆ๋ จ ๋ฐ์ดํ„ฐ์…‹๊ณผ ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์…‹์„ ๋ฏธ๋ฆฌ ๋‚˜๋ˆˆ๋‹ค.

 

# 8:2 ๋น„์œจ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋‚˜๋ˆ„๊ธฐ
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)


print(f'---> {train_df.count()}')
print(f'---> {test_df.count()}')


---> 10500253
---> 2625787

 

 

 

 

 

Transformer


๋ฐ์ดํ„ฐ๋ฅผ ํ•™์Šต์ด ๊ฐ€๋Šฅํ•œ ํฌ๋ฉง์œผ๋กœ ๋ฐ”๊พธ๋Š” ์—ญํ• ์„ ํ•œ๋‹ค. DataFrame์„ ๋ฐ›์•„ ์ƒˆ๋กœ์šด DataFrame์„ ๋งŒ๋“œ๋Š”๋ฐ, ๋ณดํ†ต ํ•˜๋‚˜ ์ด์ƒ์˜ column์„ ๋”ํ•˜๊ฒŒ ๋œ๋‹ค.

- Data Normalization

- Tokenization

- one-hot encoding ... etc

 

 

 

๋ชจ๋ธ์— ์ ์šฉํ•˜๊ธฐ ์œ„ํ•ด VectorAssembler๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. ๋ชจ๋ธ์— ๋“ค์–ด๊ฐˆ features๋“ค์„ Vector๋กœ ์••์ถ•ํ•ด์„œ ์‰ฝ๊ฒŒ ๋Ÿฌ๋‹ํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•œ๋‹ค. vtrain_df๋ฅผ ๋ณด๋ฉด, ์ƒˆ๋กœ์šด ์นผ๋Ÿผ์ด ๋“ฑ์žฅํ–ˆ๊ณ , features๋Š” trip_distance๋ฅผ ๋ฒกํ„ฐํ™”ํ•œ ๊ฒƒ์ž„์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

from pyspark.ml.feature import VectorAssembler
vassembler = VectorAssembler(inputCols=["trip_distance"], outputCol="features")
vtrain_df = vassembler.transform(train_df)
vtrain_df.show()


+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
|         0.01|        3.05|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
|         0.01|         3.3|  [0.01]|
+-------------+------------+--------+
only showing top 20 rows

 

 

 

 

 

Estimator


๋ชจ๋ธ์˜ ํ•™์Šต๊ณผ์ •์„ ์ถ”์ƒํ™”ํ•œ ์ปดํฌ๋„ŒํŠธ๋‹ค. fit() ํ•จ์ˆ˜๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ์œผ๋ฉฐ, ํšŒ๊ท€/๋ถ„๋ฅ˜/๊ตฐ์ง‘ ๋“ฑ ๋‹ค์–‘ํ•œ ๋จธ์‹ ๋Ÿฌ๋‹ ์•Œ๊ณ ๋ฆฌ์ฆ˜์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ํ•œ๋‹ค. fit()์€ DataFrame์„ ๋ฐ›์•„ Model์„ ๋ฐ˜ํ™˜ํ•˜๊ณ , Model์€ transform() ํ•จ์ˆ˜๋ฅผ ํ†ตํ•ด ์˜ˆ์ธกํ•œ๋‹ค.

 

# ์ปดํฌ๋„ŒํŠธ ๋ถˆ๋Ÿฌ์˜ค๊ธฐ
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(
    maxIter=50,
    labelCol = 'total_amount',
    featuresCol = 'features')
    
    
# fit() / ํ•™์Šต
model = lr.fit(vtrain_df)

 

 

 

ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์…‹์„ ๋ฒกํ„ฐํ™”ํ•˜๊ณ , transform() ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•˜์—ฌ ์ถ”๋ก ํ•˜๊ธฐ

 

# ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ์…‹ ๋ฒกํ„ฐํ™” ์‹œํ‚ค๊ธฐ
vectorassembler = VectorAssembler(inputCols=['trip_distance'], outputCol='features')
vtest_df = vectorassembler.transform(test_df)


# ์ถ”๋ก 
prediction = model.transform(vtest_df)
prediction.show()


+-------------+------------+--------+-----------------+
|trip_distance|total_amount|features|       prediction|
+-------------+------------+--------+-----------------+
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.3|  [0.01]|9.430440745312902|
|         0.01|         3.8|  [0.01]|9.430440745312902|
|         0.01|         3.8|  [0.01]|9.430440745312902|
|         0.01|         3.8|  [0.01]|9.430440745312902|
|         0.01|         3.8|  [0.01]|9.430440745312902|
|         0.01|         3.8|  [0.01]|9.430440745312902|
|         0.01|         3.8|  [0.01]|9.430440745312902|
+-------------+------------+--------+-----------------+
only showing top 20 rows

 

 

 

Model ์•ˆ์— ๋‚ด์žฅ๋˜์–ด ์žˆ๋Š” ํ•จ์ˆ˜๋“ค์„ ์ด์šฉํ•ด์„œ, ๋ชจ๋ธ ์„ฑ๋Šฅ์„ ๊ฐ„๋‹จํ•˜๊ฒŒ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

 

# ๋ชจ๋ธ์˜ ์„ฑ๋Šฅ์„ ํ‰๊ฐ€ํ•˜์ž.
print(model.summary.r2)
print(model.summary.rootMeanSquaredError)


0.7648633777017714
6.30781413196623

 

 

 

๋จธ์‹ ๋Ÿฌ๋‹์„ ๋‹ค๋ค„๋ณธ ์‚ฌ๋žŒ์ด๋ผ๋ฉด, Tensorflow๋‚˜ Spark๋‚˜ ํ™œ์šฉ ๋ฐฉ์‹์€ ์•„์ฃผ ๋น„์Šทํ•˜๋‹ค๊ณ  ๋Š๋‚„ ๊ฒƒ์ด๋‹ค.

๋‹ค๋งŒ, ์ผ๋ฐ˜ Pandas๋ฅผ ์“ฐ๋Š” ๊ฒƒ๊ณผ ๋‹ฌ๋ฆฌ Spark๋ฅผ ์ผ์„ ๋•Œ์˜ ๋ถ„์‚ฐ์ /๋ณ‘๋ ฌ์  ์ ‘๊ทผ์ด ๋” ํšจ์šฉ์„ฑ์ด ๋†’๊ธฐ์— '๋‹ค๋ฆ„'์„ ์ธ์ง€ํ•˜๊ธธ ๋ฐ”๋ž€๋‹ค.

๊ทธ๋Ÿผ ๋‹ค์Œ ํฌ์ŠคํŒ…์—์„  Transform์„ ์ข€ ๋” ์‹ฌํ™”ํ•˜๊ณ , Pipeline์„ ์ž์„ธํžˆ ๋‹ค๋ค„๋ณด๋„๋ก ํ•˜๊ฒ ๋‹ค.