2022. 5. 20. 14:08ใ๐ Data Engineering/Apache Spark
MLlib
Machine Learning Library
์คํํฌ์ ์ปดํฌ๋ํธ ์ค ํ๋๋ก, ๋จธ์ ๋ฌ๋ ํ์ดํ๋ผ์ธ ๊ฐ๋ฐ์ ์ฝ๊ฒ ํ๊ธฐ ์ํด ๊ฐ๋ฐ๋์๋ค.
MLlib์๋ ๋ค์ ์๊ณ ๋ฆฌ์ฆ(Classification ๋ฑ) + ํ์ดํ๋ผ์ธ(Training, Evaluation ๋ฑ) + FE + Utils(Statistics ๋ฑ)์ ์ฌ๋ฌ ์ปดํฌ๋ํธ๊ฐ ์๋ค. MLlib์ DataFrame ์์์ ๋์ํ๋ฉฐ, MLlib API๋ฅผ Spark ML์ด๋ผ๊ณ ๋ถ๋ฅธ๋ค.
์ด๋ฒ ํฌ์คํ
์์ MLlib์ ํ์ฉ์ ์๊ณ ๋ฆฌ์ฆ, ํ์ดํ๋ผ์ธ, FE, Utils์ ๊ฑธ์ณ์ ์ฐจ๊ทผ์ฐจ๊ทผ ์ดํด๋ณด๋๋ก ํ๊ฒ ๋ค.
MLlib ํฌ์คํ ์์ ์ฌ์ฉํ ๋ฐ์ดํฐ๋ ์ด์ ํฌ์คํ ์์ ๋ค๋ค๋ 'ํ์ ๋ฐ์ดํฐ'์ด๋ค.
https://mengu.tistory.com/50?category=932924
์์ ๋ฐ์ดํฐ๋ฅผ ๋ฐ์๋์ง ์์๋ ์ถฉ๋ถํ MLlib์ ๊ฒฌํํ ์ ์์ผ๋ ๋ฌด๋ฆฌํ์ง ์์๋ ๋๋ค.
๊ฐ๋ณด์.
"๋ณธ ํฌ์คํ ์ ํจ์คํธ์บ ํผ์ค์ ๊ฐ์๋ฅผ ๋ฃ๊ณ , ์ ๋ฆฌํ ์๋ฃ์์ ๋ฐํ๋๋ค."
Basic Settings
๊ธฐ๋ณธ ์ธํ ์ด๋ค. ์ํฌํธ ํด์ค์ผ ํ ๊ฒ๋ค์ ๋ฏธ๋ฆฌ ํด๋จ๊ณ , SparkSession์ ์ด์ด๋์๋ค.
# ํฐํธ ์ค์
from matplotlib import font_manager, rc
font_path = 'C:\\WINDOWS\\Fonts\\HBATANG.TTF'
font = font_manager.FontProperties(fname=font_path).get_name()
rc('font', family=font)
# basic settings
import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession
MAX_MEMORY = "5g"
spark = SparkSession.builder.master('local').appName("taxi-fare-prediction")\
.config("spark.executor.memory", MAX_MEMORY)\
.config("spark.driver.memory", MAX_MEMORY).getOrCreate()
# ๋ฐ์ดํฐ๊ฐ ์๋ ํ์ผ
zone_data = "C:/DE study/data-engineering/01-spark/data/taxi_zone_lookup.csv"
trip_files = "C:/DE study/data-engineering/01-spark/data/trips/*"
# ๋ฐ์ดํฐ ๋ก๋
trips_df = spark.read.csv(f"file:///{trip_files}", inferSchema = True, header = True)
zone_df = spark.read.csv(f"file:///{zone_data}", inferSchema = True, header = True)
# ๋ฐ์ดํฐ ์คํค๋ง
trips_df.printSchema()
zone_df.printSchema()
# ๋ฐ์ดํฐ createOrReplaceTempView()
trips_df.createOrReplaceTempView("trips")
zone_df.createOrReplaceTempView("zone")
root
|-- VendorID: integer (nullable = true)
|-- tpep_pickup_datetime: string (nullable = true)
|-- tpep_dropoff_datetime: string (nullable = true)
|-- passenger_count: integer (nullable = true)
|-- trip_distance: double (nullable = true)
|-- RatecodeID: integer (nullable = true)
|-- store_and_fwd_flag: string (nullable = true)
|-- PULocationID: integer (nullable = true)
|-- DOLocationID: integer (nullable = true)
|-- payment_type: integer (nullable = true)
|-- fare_amount: double (nullable = true)
|-- extra: double (nullable = true)
|-- mta_tax: double (nullable = true)
|-- tip_amount: double (nullable = true)
|-- tolls_amount: double (nullable = true)
|-- improvement_surcharge: double (nullable = true)
|-- total_amount: double (nullable = true)
|-- congestion_surcharge: double (nullable = true)
root
|-- LocationID: integer (nullable = true)
|-- Borough: string (nullable = true)
|-- Zone: string (nullable = true)
|-- service_zone: string (nullable = true)
DataFrame
๋ฐ์ดํฐ๋ฅผ ์ผ๋จ DataFrame ํ์์ผ๋ก ๊ฐ์ ธ์์ผ ํ๋ค. ๋ํ ์ํ๋ Input๊ณผ Output์ ๋จ๊ธฐ๊ณ ๋ ์ ๊ฑฐํด๋ฌ์ผ ํ๋ค.
query = '''
SELECT
trip_distance,
total_amount
FROM
trips
WHERE
total_amount < 5000
AND total_amount > 0
AND trip_distance > 0
AND trip_distance < 500
AND passenger_count < 4
AND TO_DATE(tpep_pickup_datetime) >= '2021-01-01'
AND TO_DATE(tpep_pickup_datetime) < '2021-08-01'
'''
# ์ํ๋ data๋ง ๋จ๊ฒจ๋๊ธฐ
data_df = spark.sql(query)
data_df.createOrReplaceTempView('data')
data_df.show
+-------------+------------+
|trip_distance|total_amount|
+-------------+------------+
| 16.5| 70.07|
| 1.13| 11.16|
| 2.68| 18.59|
| 12.4| 43.8|
| 9.7| 32.3|
| 9.3| 43.67|
| 9.58| 46.1|
| 16.2| 45.3|
| 3.58| 19.3|
| 0.91| 14.8|
| 2.57| 12.8|
| 0.4| 5.3|
| 3.26| 17.3|
| 13.41| 47.25|
| 18.3| 61.42|
| 1.53| 14.16|
| 2.0| 11.8|
| 16.6| 54.96|
| 15.5| 56.25|
| 1.3| 16.8|
+-------------+------------+
only showing top 20 rows
ํ๋ จ ๋ฐ์ดํฐ์ ๊ณผ ํ ์คํธ ๋ฐ์ดํฐ์ ์ ๋ฏธ๋ฆฌ ๋๋๋ค.
# 8:2 ๋น์จ๋ก ๋ฐ์ดํฐ๋ฅผ ๋๋๊ธฐ
train_df, test_df = data_df.randomSplit([0.8, 0.2], seed=1)
print(f'---> {train_df.count()}')
print(f'---> {test_df.count()}')
---> 10500253
---> 2625787
Transformer
๋ฐ์ดํฐ๋ฅผ ํ์ต์ด ๊ฐ๋ฅํ ํฌ๋ฉง์ผ๋ก ๋ฐ๊พธ๋ ์ญํ ์ ํ๋ค. DataFrame์ ๋ฐ์ ์๋ก์ด DataFrame์ ๋ง๋๋๋ฐ, ๋ณดํต ํ๋ ์ด์์ column์ ๋ํ๊ฒ ๋๋ค.
- Data Normalization
- Tokenization
- one-hot encoding ... etc
๋ชจ๋ธ์ ์ ์ฉํ๊ธฐ ์ํด VectorAssembler๋ฅผ ์ฌ์ฉํ๋ค. ๋ชจ๋ธ์ ๋ค์ด๊ฐ features๋ค์ Vector๋ก ์์ถํด์ ์ฝ๊ฒ ๋ฌ๋ํ ์ ์๋๋ก ํ๋ค. vtrain_df๋ฅผ ๋ณด๋ฉด, ์๋ก์ด ์นผ๋ผ์ด ๋ฑ์ฅํ๊ณ , features๋ trip_distance๋ฅผ ๋ฒกํฐํํ ๊ฒ์์ ํ์ธํ ์ ์๋ค.
from pyspark.ml.feature import VectorAssembler
vassembler = VectorAssembler(inputCols=["trip_distance"], outputCol="features")
vtrain_df = vassembler.transform(train_df)
vtrain_df.show()
+-------------+------------+--------+
|trip_distance|total_amount|features|
+-------------+------------+--------+
| 0.01| 3.05| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
| 0.01| 3.3| [0.01]|
+-------------+------------+--------+
only showing top 20 rows
Estimator
๋ชจ๋ธ์ ํ์ต๊ณผ์ ์ ์ถ์ํํ ์ปดํฌ๋ํธ๋ค. fit() ํจ์๋ฅผ ๊ฐ์ง๊ณ ์์ผ๋ฉฐ, ํ๊ท/๋ถ๋ฅ/๊ตฐ์ง ๋ฑ ๋ค์ํ ๋จธ์ ๋ฌ๋ ์๊ณ ๋ฆฌ์ฆ์ ์ฌ์ฉํ ์ ์๋๋ก ํ๋ค. fit()์ DataFrame์ ๋ฐ์ Model์ ๋ฐํํ๊ณ , Model์ transform() ํจ์๋ฅผ ํตํด ์์ธกํ๋ค.
# ์ปดํฌ๋ํธ ๋ถ๋ฌ์ค๊ธฐ
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(
maxIter=50,
labelCol = 'total_amount',
featuresCol = 'features')
# fit() / ํ์ต
model = lr.fit(vtrain_df)
ํ ์คํธ ๋ฐ์ดํฐ์ ์ ๋ฒกํฐํํ๊ณ , transform() ํจ์๋ฅผ ์ด์ฉํ์ฌ ์ถ๋ก ํ๊ธฐ
# ํ
์คํธ ๋ฐ์ดํฐ์
๋ฒกํฐํ ์ํค๊ธฐ
vectorassembler = VectorAssembler(inputCols=['trip_distance'], outputCol='features')
vtest_df = vectorassembler.transform(test_df)
# ์ถ๋ก
prediction = model.transform(vtest_df)
prediction.show()
+-------------+------------+--------+-----------------+
|trip_distance|total_amount|features| prediction|
+-------------+------------+--------+-----------------+
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.3| [0.01]|9.430440745312902|
| 0.01| 3.8| [0.01]|9.430440745312902|
| 0.01| 3.8| [0.01]|9.430440745312902|
| 0.01| 3.8| [0.01]|9.430440745312902|
| 0.01| 3.8| [0.01]|9.430440745312902|
| 0.01| 3.8| [0.01]|9.430440745312902|
| 0.01| 3.8| [0.01]|9.430440745312902|
+-------------+------------+--------+-----------------+
only showing top 20 rows
Model ์์ ๋ด์ฅ๋์ด ์๋ ํจ์๋ค์ ์ด์ฉํด์, ๋ชจ๋ธ ์ฑ๋ฅ์ ๊ฐ๋จํ๊ฒ ํ์ธํ ์ ์๋ค.
# ๋ชจ๋ธ์ ์ฑ๋ฅ์ ํ๊ฐํ์.
print(model.summary.r2)
print(model.summary.rootMeanSquaredError)
0.7648633777017714
6.30781413196623
๋จธ์ ๋ฌ๋์ ๋ค๋ค๋ณธ ์ฌ๋์ด๋ผ๋ฉด, Tensorflow๋ Spark๋ ํ์ฉ ๋ฐฉ์์ ์์ฃผ ๋น์ทํ๋ค๊ณ ๋๋ ๊ฒ์ด๋ค.
๋ค๋ง, ์ผ๋ฐ Pandas๋ฅผ ์ฐ๋ ๊ฒ๊ณผ ๋ฌ๋ฆฌ Spark๋ฅผ ์ผ์ ๋์ ๋ถ์ฐ์ /๋ณ๋ ฌ์ ์ ๊ทผ์ด ๋ ํจ์ฉ์ฑ์ด ๋๊ธฐ์ '๋ค๋ฆ'์ ์ธ์งํ๊ธธ ๋ฐ๋๋ค.
๊ทธ๋ผ ๋ค์ ํฌ์คํ ์์ Transform์ ์ข ๋ ์ฌํํ๊ณ , Pipeline์ ์์ธํ ๋ค๋ค๋ณด๋๋ก ํ๊ฒ ๋ค.
'๐ Data Engineering > Apache Spark' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[SparkML] MLlib Parameter ํ๋ ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.22 |
---|---|
[SparkML] MLlib Pipeline ๊ตฌ์ถํ๊ธฐ (0) | 2022.05.21 |
[SparkSQL] ํ์ ๋ฐ์ดํฐ ๋ค์ด/์ ์ฒ๋ฆฌ/๋ถ์ feat. TLC (0) | 2022.05.10 |
[SparkSQL] Catalyst, Tungsten ์๋ ์๋ฆฌ (0) | 2022.05.09 |
[SparkSQL] UDF ๊ฐ๋ ๋ฐ ์ฝ๋ (0) | 2022.05.08 |