2022. 9. 19. 15:43ใ๐ Data Engineering/Apache Airflow
์ ๋ฒ ํฌ์คํ ์์ airflow์ spark์ ์ฐ๋ํ๋ ๋ฐฉ๋ฒ์ ๋ํด ์์๋ดค์ต๋๋ค. ์ด๋ฒ ํฌ์คํ ์์ ๊ฐ๋จํ ์ค์ต์ ๋ฐ๋ก ์งํํด๋ณด๋๋ก ํ๊ฒ ์ต๋๋ค.
* ๋ณธ ํฌ์คํ ์ ํด๋น ๊ฐ์๋ฅผ ์ฐธ๊ณ ํ ๊ฒ์์ ๋ฐํ๋๋ค.
[pyspark ์ธํ ํ๋ฌ ๊ฐ๊ธฐ]
https://mengu.tistory.com/25?category=932924
[Airflow&Spark ์ฐ๋ํ๋ฌ ๊ฐ๊ธฐ]
๋ชฉ์ฐจ
๐ Airflow x Spark ์ฐ๋ ์๋ฆฌ
๐ ์ฐ๋ํ๊ธฐ
๐ mini project
mini project
TLC trip data๋ก ๊ฐ๋จํ ์ค์ต์ ํด๋ณด๊ฒ ์ต๋๋ค.
Airflow๋ฅผ ํตํด ์๋ํํ ํ์ดํ๋ผ์ธ์ ๋ค์๊ณผ ๊ฐ์ต๋๋ค.
(1) Data preprocessing
๋ฐ์ดํฐ๋ฅผ ์๋ง๊ฒ ๊ฐ๊ณตํฉ๋๋ค.
(2) Data Analytics & Report
๊ฐ๊ณต๋ ๋ฐ์ดํฐ๋ฅผ ๋ฐํ์ผ๋ก ํ๋ก๊ทธ๋จ์ด ๋ถ์ํ๊ณ , ๋ณด๊ณ ์ ํ์ผ์ ์์ฑํฉ๋๋ค.
๐ ๋ฐ์ดํฐ ๋ค์ด๋ก๋ํ๊ธฐ
https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
(1) ์ฌ์ดํธ์ ์ ์ํด์ 2021.01 ๋ฐ์ดํฐ๋ฅผ ๋ค์ด๋ก๋ํ์ฌ์ค๋๋ค. ๋นจ๊ฐ ์ฌ๊ฐํ์ ๋ค์ด๋ฐ์ ์ฃผ์ธ์.
(2) ํ์ผ์ ์์ ์ด ์ํ๋ ํด๋์ ๋ฃ์ด์ค๋๋ค.
๐ Spark ์์ ์์ฑ
(1) preprocess.py
SparkSession์ ๋ถ๋ฌ์ค๊ณ , ํ์ผ์ ๋ถ๋ฌ์ต๋๋ค.
์ฌ๋ฌ ๋ณ์๋ค์ ๋ํด, ์ด์์น๋ค์ ์ ๊ฑฐํด์ฃผ๊ณ ๋ค์ parquet ํ์ผ๋ก ๊ตฌ์์ค๋๋ค.
# spark ์ฐพ์์ค๊ธฐ
import findspark
findspark.init()
# ํจํค์ง๋ฅผ ๊ฐ์ ธ์ค๊ณ
from pyspark import SparkConf, SparkContext
import pandas as pd
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
MAX_MEMORY="5g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
.config("spark.executor.memory", MAX_MEMORY)\
.config("spark.driver.memory", MAX_MEMORY)\
.getOrCreate()
trip_files = "๋ฐ์ดํฐ ํ์ผ ๊ฒฝ๋ก.parquet"
trips_df = spark.read.parquet(f"file:///{trip_files}", inferSchema=True, header=True)
trips_df.show(5)
trips_df.createOrReplaceTempView("trips")
query = """
SELECT
PULocationID as pickup_location_id,
DOLocationID as dropoff_location_id,
trip_miles,
HOUR(Pickup_datetime) as pickup_time,
DATE_FORMAT(TO_DATE(Pickup_datetime), 'EEEE') AS day_of_week,
driver_pay
FROM
trips
WHERE
driver_pay < 5000
AND driver_pay > 0
AND trip_miles > 0
AND trip_miles < 500
AND TO_DATE(Pickup_datetime) >= '2021-01-01'
AND TO_DATE(Pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query)
data_dir = "๋ฐ์ดํฐ ํ์ผ ํด๋"
data_df.write.format("parquet").mode('overwrite').save(f"{data_dir}/")
(2) Analytics.py
์ ์ฒ๋ฆฌ๋ฅผ ๋๋๋ค๋ฉด, ํ์ผ์ ๋ถ๋ฌ์์ ๋ถ์์ ์์ํฉ๋๋ค.
์ด๋ฒ ํฌ์คํ ์์ ๊ฐ๋จํ๊ฒ ๋ฅผ ๊ทธ๋ฆฌ๊ณ ์ ์ฅํ๋ ๊ฒ์ผ๋ก ํ๊ฒ ์ต๋๋ค.
# spark ์ฐพ์์ค๊ธฐ
import findspark
findspark.init()
from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
MAX_MEMORY="10g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
.config("spark.executor.memory", MAX_MEMORY)\
.config("spark.driver.memory", MAX_MEMORY)\
.getOrCreate()
data_dir = "/home/mingu/airflow/data-engineering/02-airflow/data/"
data_df = spark.read.parquet(f"{data_dir}/data/")
# ๋ฐ์ดํฐ createOrReplaceTempView()
data_df.createOrReplaceTempView("trips")
analytics ํ์ผ๋ ์ฒ์ ๋ถ๋ถ์ ๋๊ฐ์ต๋๋ค. ๋ฐ์ดํฐ๋ฅผ ๋ค๋ฃจ๊ธฐ ์ํด createOrReplaceTempView()๋ฅผ ํด์ค์ผ ํฉ๋๋ค.
์ด์ ๊ฐ์ข ๊ทธ๋ํ๋ฅผ ๊ทธ๋ฆด ์๊ฐ์ ๋๋ค.
๊ทธ๋ํ๋ฅผ ๊ทธ๋ฆฌ๋ฉด ๋ฐ๋ก ์ํ๋ ํด๋์ ์ ์ฅํฉ๋๋ค.
(1) ๊ฑฐ๋ฆฌ, ์๊ธ๊ณผ์ ์ฐํฌ๋ ๊ทธ๋ํ
# 1. trip_miles ์ driver_pay ๊ฐ์ ์ฐํฌ๋ ๊ทธ๋ํ
miles_pay = spark.sql("SELECT trip_miles, driver_pay FROM trips").toPandas()
fig, ax = plt.subplots(figsize=(16,6))
plt.title('miles_pay', fontsize = 30)
sns.scatterplot(
x = 'trip_miles',
y = 'driver_pay',
data = miles_pay
)
plt.savefig("result/miles_pay.png")
(2) ์์ผ๋ณ ๊ฑฐ๋ฆฌ/์๊ธ ๊ทธ๋ํ
# 2. ์์ผ๋ณ ๊ฑฐ๋ฆฌ, ์๊ธ ๊ทธ๋ํ
weekday_df = spark.sql("SELECT day_of_week, MEAN(trip_miles) as distance_mean, MEAN(driver_pay) AS fare_mean from trips GROUP BY day_of_week").toPandas()
for i in ['distance_mean', 'fare_mean']:
fig, ax = plt.subplots(figsize=(16,6))
plt.title(i, fontsize = 30)
sns.barplot(
x = 'day_of_week',
y = i,
data = weekday_df
)
plt.savefig(f"result/{i}.png")
(3) ์์ผ๋ณ ํ์ ์น๊ฐ ์ ๊ทธ๋ํ
# 3. ์์ผ๋ณ ํ์ count ๊ทธ๋ํ
week_count = spark.sql("SELECT day_of_week, COUNT(day_of_week) as count FROM trips GROUP BY day_of_week").toPandas()
fig, ax = plt.subplots(figsize=(16,6))
plt.title('week_count', fontsize = 30)
sns.barplot(
x = 'day_of_week',
y = 'count',
data = week_count
)
plt.savefig("result/week_count.png")
(4) ์๊ฐ๋ณ ์น๊ฐ ์ ๊ทธ๋ํ
# 4. pickup_time ๊ทธ๋ํ
time_count = spark.sql("SELECT pickup_time, COUNT(pickup_time) as count FROM trips GROUP BY pickup_time").toPandas()
fig, ax = plt.subplots(figsize=(16,6))
plt.title('time_count', fontsize = 30)
sns.barplot(
x = 'pickup_time',
y = 'count',
data = time_count
)
plt.savefig("result/time_count.png")
๐ Airflow๋ฅผ ํตํ ์ฐ๊ฒฐ
Airflow DAG๋ฅผ ์์ฑํด์ค ์๊ฐ์ ๋๋ค.
SparkSubmitOperator()์ ์ด์ฉํด์ preprocess.py, analytics.py ํ์ผ๋ค์ ์คํํ๋๋ก ์ฐ๊ฒฐํ๊ฒ ์ต๋๋ค.
(1) ์ฒซ ์ธํ
from datetime import datetime
from email.mime import application
from airflow import DAG
# from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime
default_args = {
'start_date' : datetime(2022,9,16)
}
(2) DAG ์ ๋ ฅ
conn_id๋ ์ด์ ํฌ์คํ ์์ ์์ฑํ connection id์ ๋๋ค. application์ ์คํํ๊ณ ์ ํ๋ ํ์ผ์ ๊ฒฝ๋ก๋ฅผ ์ ๋ ฅํด์ฃผ์๋ฉด ๋ฉ๋๋ค. task_id๋ ๋ง ๊ทธ๋๋ก DAG์ ๋ช ์ํ Task๋ฅผ ์ ๋ ฅํด๋์๋ฉด ๋ฉ๋๋ค.
์์กด์ฑ ํ์๋ ์์ง ๋ง๊ณ ํด ์ฃผ์ธ์!
with DAG(dag_id="spark-airflow example",
schedule_interval='@daily',
default_args=default_args,
tags=['spark'],
catchup=False) as dag :
# preprocess
preprocess = SparkSubmitOperator(
application = "/home/mingu/airflow/blogs/preprocessing.py",
task_id = "preprocess",
conn_id = "spark_local"
)
# analytics
analytics = SparkSubmitOperator(
application = "/home/mingu/airflow/blogs/analytics.py",
task_id = "analytics",
conn_id = "spark_local"
)
preprocess >> analytics
(2) ์ ์ฒด ์ฝ๋
from datetime import datetime
from email.mime import application
from airflow import DAG
# from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from datetime import datetime
default_args = {
'start_date' : datetime(2022,9,16)
}
with DAG(dag_id="spark-airflow example",
schedule_interval='@daily',
default_args=default_args,
tags=['spark'],
catchup=False) as dag :
# preprocess
preprocess = SparkSubmitOperator(
application = "/home/mingu/airflow/blogs/preprocessing.py",
task_id = "preprocess",
conn_id = "spark_local"
)
# analytics
analytics = SparkSubmitOperator(
application = "/home/mingu/airflow/blogs/analytics.py",
task_id = "analytics",
conn_id = "spark_local"
)
preprocess >> analytics
๐ ์คํ
๋จผ์ 2๊ฐ์ cmd์ ๋ค์ ๋ช ๋ น์ด๋ค์ ์ ๋ ฅํด์ค๋๋ค.
airflow webserver
airflow scheduler
localhost:8080์ผ๋ก ์ ์!
spark-airflow DAG๋ก ์ด๋ํ ํ, ์คํ์ํค๋ฉด?
์ ์งํ๋๋ ๊ฒ์ ๋ณด์ค ์ ์์ต๋๋ค.
๋ํ ์์ ์ด ๋๋๋ฉด ์ด๋ ๊ฒ result ํด๋์ ํ์ผ๋ค์ด ์์ฑ๋ฉ๋๋ค.
์ง๊ธ๊น์ง Airflow&Spark ์ฐ๋ ๋ฐ ํ์ฉ์ ํด๋ณด์์ต๋๋ค.
ํน์ฌ๋ Analytics.py๋ฅผ ์คํํ๋ ์ค, ๋ฉ๋ชจ๋ฆฌ ์ด๊ณผ ๋ฌธ์ ๋ฅผ ์ผ์ผํค๋ ๊ฒฝ์ฐ๊ฐ ์์ต๋๋ค. ์ด ๊ฒฝ์ฐ, ๊ฑฐ๋ฆฌ์ ์๊ธ์ ์ฐํฌ๋ ๋ฐ์ดํฐ๊ฐ ๋๋ฌด ํฐ ๋๋จธ์ง ์ฉ๋์ ์ด๊ณผํ๊ธฐ ๋๋ฌธ์ ๋ฐ์ํ๋ ๊ฒ์ ๋๋ค. ํ์ผ ์์ฒด์ ์ฉ๋์ ์ค์ด๋์ง, ์์ ๊ฑฐ๋ฆฌ/์๊ธ ๋ฐ์ดํฐ๋ ์์ฑํ์ง ์์์ผ๋ก์จ ๋ฌธ์ ๋ฅผ ํด๊ฒฐํ ์ ์์ต๋๋ค. ๋ค๋ฅธ ๊ทธ๋ํ๋ค์ ์ฉ๋์ด ์ ์ด ์ํํ๊ฒ ์๋ํ ๊ฒ์ ๋๋ค!
'๐ Data Engineering > Apache Airflow' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Airflow] Airflow & Spark ์ฐ๋ํด์ ํ์ฉํ๊ธฐ (1) (0) | 2022.09.17 |
---|---|
[Airflow] Airflow ๊ธฐ์ด ์ง์ (0) | 2022.09.15 |
[Airflow Error] 403:Forbidden (0) | 2022.09.08 |