[Airflow] Airflow & Spark ์—ฐ๋™ํ•ด์„œ ํ™œ์šฉํ•˜๊ธฐ (2)

2022. 9. 19. 15:43ใ†๐Ÿ›  Data Engineering/Apache Airflow

 

 

 

 

 

์ €๋ฒˆ ํฌ์ŠคํŒ…์—์„  airflow์™€ spark์„ ์—ฐ๋™ํ•˜๋Š” ๋ฐฉ๋ฒ•์— ๋Œ€ํ•ด ์•Œ์•„๋ดค์Šต๋‹ˆ๋‹ค. ์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„  ๊ฐ„๋‹จํ•œ ์‹ค์Šต์„ ๋ฐ”๋กœ ์ง„ํ–‰ํ•ด๋ณด๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

 

 

 

* ๋ณธ ํฌ์ŠคํŒ…์€ ํ•ด๋‹น ๊ฐ•์˜๋ฅผ ์ฐธ๊ณ ํ•œ ๊ฒƒ์ž„์„ ๋ฐํž™๋‹ˆ๋‹ค.

 

 

 

 

[pyspark ์„ธํŒ…ํ•˜๋Ÿฌ ๊ฐ€๊ธฐ]

https://mengu.tistory.com/25?category=932924 

 

[Spark] ์ดˆ๊ธฐ ํ™˜๊ฒฝ ์„ธํŒ… ft. ํ˜ธํ™˜ ๋ฌธ์ œ ํ•ด๊ฒฐ

์ด๋ฒˆ ํฌ์ŠคํŒ…์€ Spark ์ดˆ๊ธฐ ํ™˜๊ฒฝ ์„ธํŒ…์ด๋‹ค. Spark, pyspark, java ๋“ฑ ๊ทธ๋ƒฅ ์„ค์น˜ํ•ด์„œ ๋๋‚ด๋ฉด ๋˜๋Š” ๊ฑฐ ์•„๋‹ˆ๋ƒ ํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, ์ค‘๊ฐ„์— ๋ฒ„์ „ ํ˜ธํ™˜ ๋ฌธ์ œ๊ฐ€ ์กด์žฌํ•ด์„œ ๋ง‰ํž ์ˆ˜ ์žˆ๋‹ค. ๊ทธ ๋ถ€๋ถ„์„ ์ง‘์–ด์ฃผ๊ณ ์ž ํฌ์ŠคํŒ…

mengu.tistory.com

 

 

[Airflow&Spark ์—ฐ๋™ํ•˜๋Ÿฌ ๊ฐ€๊ธฐ]

https://mengu.tistory.com/124

 

[Airflow] Airflow & Spark ์—ฐ๋™ํ•ด์„œ ํ™œ์šฉํ•˜๊ธฐ (1)

์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„  Airlfow์™€ Spark์˜ ์—ฐ๋™์— ๋Œ€ํ•ด ๋‹ค๋ฃจ๊ฒ ์Šต๋‹ˆ๋‹ค. Spark์—์„œ์˜ ์ž‘์—…์„ Airflow๋ฅผ ํ†ตํ•ด ์ž๋™ํ™”์‹œํ‚ค๋Š” ์ž‘์—…๊นŒ์ง€ ํ•ด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. ๋‹น์—ฐํžˆ Airlflow์™€ pyspark ํ™˜๊ฒฝ์ด ์„ธํŒ…๋˜์–ด ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. * ๋ณธ

mengu.tistory.com

 

 

๋ชฉ์ฐจ

๐Ÿ“ƒ Airflow x Spark ์—ฐ๋™ ์›๋ฆฌ

๐Ÿ“ƒ ์—ฐ๋™ํ•˜๊ธฐ

๐Ÿ“ƒ mini project

 

 

 

mini project

TLC trip data๋กœ ๊ฐ„๋‹จํ•œ ์‹ค์Šต์„ ํ•ด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

Airflow๋ฅผ ํ†ตํ•ด ์ž๋™ํ™”ํ•  ํŒŒ์ดํ”„๋ผ์ธ์€ ๋‹ค์Œ๊ณผ ๊ฐ™์Šต๋‹ˆ๋‹ค. 

 

(1) Data preprocessing

๋ฐ์ดํ„ฐ๋ฅผ ์•Œ๋งž๊ฒŒ ๊ฐ€๊ณตํ•ฉ๋‹ˆ๋‹ค.

 

(2) Data Analytics & Report

๊ฐ€๊ณต๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ”ํƒ•์œผ๋กœ ํ”„๋กœ๊ทธ๋žจ์ด ๋ถ„์„ํ•˜๊ณ , ๋ณด๊ณ ์„œ ํŒŒ์ผ์„ ์ƒ์„ฑํ•ฉ๋‹ˆ๋‹ค.

 

 

 

๐Ÿ“Œ ๋ฐ์ดํ„ฐ ๋‹ค์šด๋กœ๋“œํ•˜๊ธฐ

 

https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page

 

TLC Trip Record Data - TLC

TLC Trip Record Data Yellow and green taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The data use

www1.nyc.gov

 

 

(1) ์‚ฌ์ดํŠธ์— ์ ‘์†ํ•ด์„œ 2021.01 ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค์šด๋กœ๋“œํ•˜์—ฌ์ค๋‹ˆ๋‹ค. ๋นจ๊ฐ„ ์‚ฌ๊ฐํ˜•์„ ๋‹ค์šด๋ฐ›์•„ ์ฃผ์„ธ์š”.

 

 

(2) ํŒŒ์ผ์„ ์ž์‹ ์ด ์›ํ•˜๋Š” ํด๋”์— ๋„ฃ์–ด์ค๋‹ˆ๋‹ค.

 

 

 

 

 

๐Ÿ“Œ Spark ์ž‘์—… ์ž‘์„ฑ

 

(1) preprocess.py

SparkSession์„ ๋ถˆ๋Ÿฌ์˜ค๊ณ , ํŒŒ์ผ์„ ๋ถˆ๋Ÿฌ์˜ต๋‹ˆ๋‹ค.

์—ฌ๋Ÿฌ ๋ณ€์ˆ˜๋“ค์— ๋Œ€ํ•ด, ์ด์ƒ์น˜๋“ค์„ ์ œ๊ฑฐํ•ด์ฃผ๊ณ  ๋‹ค์‹œ parquet ํŒŒ์ผ๋กœ ๊ตฌ์›Œ์ค๋‹ˆ๋‹ค.

# spark ์ฐพ์•„์˜ค๊ธฐ
import findspark
findspark.init()

# ํŒจํ‚ค์ง€๋ฅผ ๊ฐ€์ ธ์˜ค๊ณ 
from pyspark import SparkConf, SparkContext
import pandas as pd
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession


MAX_MEMORY="5g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()


trip_files = "๋ฐ์ดํ„ฐ ํŒŒ์ผ ๊ฒฝ๋กœ.parquet"
trips_df = spark.read.parquet(f"file:///{trip_files}", inferSchema=True, header=True)


trips_df.show(5)
trips_df.createOrReplaceTempView("trips")


query = """
SELECT 
    PULocationID as pickup_location_id,
    DOLocationID as dropoff_location_id,
    trip_miles,
    HOUR(Pickup_datetime) as pickup_time,
    DATE_FORMAT(TO_DATE(Pickup_datetime), 'EEEE') AS day_of_week,
    driver_pay
FROM
    trips
WHERE
    driver_pay  < 5000
    AND driver_pay  > 0
    AND trip_miles > 0
    AND trip_miles < 500
    AND TO_DATE(Pickup_datetime) >= '2021-01-01'
    AND TO_DATE(Pickup_datetime) < '2021-08-01'
"""
data_df = spark.sql(query)


data_dir = "๋ฐ์ดํ„ฐ ํŒŒ์ผ ํด๋”"
data_df.write.format("parquet").mode('overwrite').save(f"{data_dir}/")

 

 

 

(2) Analytics.py

์ „์ฒ˜๋ฆฌ๋ฅผ ๋๋ƒˆ๋‹ค๋ฉด, ํŒŒ์ผ์„ ๋ถˆ๋Ÿฌ์™€์„œ ๋ถ„์„์„ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค.

์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„  ๊ฐ„๋‹จํ•˜๊ฒŒ ๋ฅผ ๊ทธ๋ฆฌ๊ณ  ์ €์žฅํ•˜๋Š” ๊ฒƒ์œผ๋กœ ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

 

# spark ์ฐพ์•„์˜ค๊ธฐ
import findspark
findspark.init()

from pyspark.sql import SparkSession
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns


MAX_MEMORY="10g"
spark = SparkSession.builder.appName("taxi-fare-prediciton")\
                .config("spark.executor.memory", MAX_MEMORY)\
                .config("spark.driver.memory", MAX_MEMORY)\
                .getOrCreate()


data_dir = "/home/mingu/airflow/data-engineering/02-airflow/data/"
data_df = spark.read.parquet(f"{data_dir}/data/")


# ๋ฐ์ดํ„ฐ createOrReplaceTempView()
data_df.createOrReplaceTempView("trips")

analytics ํŒŒ์ผ๋„ ์ฒ˜์Œ ๋ถ€๋ถ„์€ ๋˜‘๊ฐ™์Šต๋‹ˆ๋‹ค. ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃจ๊ธฐ ์œ„ํ•ด createOrReplaceTempView()๋ฅผ ํ•ด์ค˜์•ผ ํ•ฉ๋‹ˆ๋‹ค.

 

 

์ด์ œ ๊ฐ์ข… ๊ทธ๋ž˜ํ”„๋ฅผ ๊ทธ๋ฆด ์‹œ๊ฐ„์ž…๋‹ˆ๋‹ค.

๊ทธ๋ž˜ํ”„๋ฅผ ๊ทธ๋ฆฌ๋ฉด ๋ฐ”๋กœ ์›ํ•˜๋Š” ํด๋”์— ์ €์žฅํ•ฉ๋‹ˆ๋‹ค.

 

 

(1) ๊ฑฐ๋ฆฌ, ์š”๊ธˆ๊ณผ์˜ ์‚ฐํฌ๋„ ๊ทธ๋ž˜ํ”„

# 1. trip_miles ์™€ driver_pay ๊ฐ„์˜ ์‚ฐํฌ๋„ ๊ทธ๋ž˜ํ”„
miles_pay = spark.sql("SELECT trip_miles, driver_pay FROM trips").toPandas()

fig, ax = plt.subplots(figsize=(16,6))
plt.title('miles_pay', fontsize = 30)
sns.scatterplot(
    x = 'trip_miles',
    y = 'driver_pay',
    data = miles_pay
)


plt.savefig("result/miles_pay.png")

 

 

 

(2) ์š”์ผ๋ณ„ ๊ฑฐ๋ฆฌ/์š”๊ธˆ ๊ทธ๋ž˜ํ”„

# 2. ์š”์ผ๋ณ„ ๊ฑฐ๋ฆฌ, ์š”๊ธˆ ๊ทธ๋ž˜ํ”„
weekday_df = spark.sql("SELECT day_of_week, MEAN(trip_miles) as distance_mean, MEAN(driver_pay) AS fare_mean from trips GROUP BY day_of_week").toPandas()

for i in ['distance_mean', 'fare_mean']:
    fig, ax = plt.subplots(figsize=(16,6))
    plt.title(i, fontsize = 30)
    sns.barplot(
        x = 'day_of_week',
        y = i,
        data = weekday_df
    )
    
    plt.savefig(f"result/{i}.png")

 

 

 

 

(3) ์š”์ผ๋ณ„ ํƒ์‹œ ์Šน๊ฐ ์ˆ˜ ๊ทธ๋ž˜ํ”„

# 3. ์š”์ผ๋ณ„ ํƒ์‹œ count ๊ทธ๋ž˜ํ”„
week_count = spark.sql("SELECT day_of_week, COUNT(day_of_week) as count FROM trips GROUP BY day_of_week").toPandas()

fig, ax = plt.subplots(figsize=(16,6))
plt.title('week_count', fontsize = 30)
sns.barplot(
    x = 'day_of_week',
    y = 'count',
    data = week_count
)

plt.savefig("result/week_count.png")

 

 

 

(4) ์‹œ๊ฐ„๋ณ„ ์Šน๊ฐ ์ˆ˜ ๊ทธ๋ž˜ํ”„

# 4. pickup_time ๊ทธ๋ž˜ํ”„
time_count = spark.sql("SELECT pickup_time, COUNT(pickup_time) as count FROM trips GROUP BY pickup_time").toPandas()

fig, ax = plt.subplots(figsize=(16,6))
plt.title('time_count', fontsize = 30)
sns.barplot(
    x = 'pickup_time',
    y = 'count',
    data = time_count
)


plt.savefig("result/time_count.png")

 

 

 

 

 

๐Ÿ“Œ Airflow๋ฅผ ํ†ตํ•œ ์—ฐ๊ฒฐ

 

Airflow DAG๋ฅผ ์ž‘์„ฑํ•ด์ค„ ์‹œ๊ฐ„์ž…๋‹ˆ๋‹ค.

SparkSubmitOperator()์„ ์ด์šฉํ•ด์„œ preprocess.py, analytics.py ํŒŒ์ผ๋“ค์„ ์‹คํ–‰ํ•˜๋„๋ก ์—ฐ๊ฒฐํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค.

 

 

(1) ์ฒซ ์„ธํŒ…

from datetime import datetime
from email.mime import application
from airflow import DAG
# from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator



from datetime import datetime

default_args = {
    'start_date' : datetime(2022,9,16)
}

 

 

(2) DAG ์ž…๋ ฅ

conn_id๋Š” ์ด์ „ ํฌ์ŠคํŒ…์—์„œ ์ƒ์„ฑํ•œ  connection id์ž…๋‹ˆ๋‹ค. application์€ ์‹คํ–‰ํ•˜๊ณ ์ž ํ•˜๋Š” ํŒŒ์ผ์˜ ๊ฒฝ๋กœ๋ฅผ ์ž…๋ ฅํ•ด์ฃผ์‹œ๋ฉด ๋ฉ๋‹ˆ๋‹ค. task_id๋Š” ๋ง ๊ทธ๋Œ€๋กœ DAG์— ๋ช…์‹œํ•  Task๋ฅผ ์ž…๋ ฅํ•ด๋‘์‹œ๋ฉด ๋ฉ๋‹ˆ๋‹ค.

์˜์กด์„ฑ ํ‘œ์‹œ๋„ ์žŠ์ง€ ๋ง๊ณ  ํ•ด ์ฃผ์„ธ์š”!

with DAG(dag_id="spark-airflow example",
         schedule_interval='@daily',
         default_args=default_args,
         tags=['spark'],
         catchup=False) as dag :
    
    # preprocess
    preprocess = SparkSubmitOperator(
        application = "/home/mingu/airflow/blogs/preprocessing.py",
        task_id = "preprocess",
        conn_id = "spark_local"
    )

    # analytics
    analytics = SparkSubmitOperator(
        application = "/home/mingu/airflow/blogs/analytics.py",
        task_id = "analytics",
        conn_id = "spark_local"
    )

    preprocess >> analytics

 

 

 

(2) ์ „์ฒด ์ฝ”๋“œ

from datetime import datetime
from email.mime import application
from airflow import DAG
# from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator



from datetime import datetime

default_args = {
    'start_date' : datetime(2022,9,16)
}


with DAG(dag_id="spark-airflow example",
         schedule_interval='@daily',
         default_args=default_args,
         tags=['spark'],
         catchup=False) as dag :
    
    # preprocess
    preprocess = SparkSubmitOperator(
        application = "/home/mingu/airflow/blogs/preprocessing.py",
        task_id = "preprocess",
        conn_id = "spark_local"
    )

    # analytics
    analytics = SparkSubmitOperator(
        application = "/home/mingu/airflow/blogs/analytics.py",
        task_id = "analytics",
        conn_id = "spark_local"
    )

    preprocess >> analytics

 

 

 

 

๐Ÿ“Œ ์‹คํ–‰

 

๋จผ์ € 2๊ฐœ์˜ cmd์— ๋‹ค์Œ ๋ช…๋ น์–ด๋“ค์„ ์ž…๋ ฅํ•ด์ค๋‹ˆ๋‹ค.

airflow webserver
airflow scheduler

 

 

localhost:8080์œผ๋กœ ์ ‘์†!

 

 

 

 

spark-airflow DAG๋กœ ์ด๋™ํ•œ ํ›„, ์‹คํ–‰์‹œํ‚ค๋ฉด?

 

 

์ž˜ ์ง„ํ–‰๋˜๋Š” ๊ฒƒ์„ ๋ณด์‹ค ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

๋˜ํ•œ ์ž‘์—…์ด ๋๋‚˜๋ฉด ์ด๋ ‡๊ฒŒ result ํด๋”์— ํŒŒ์ผ๋“ค์ด ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค.

 

 

 

 

 

 

 

์ง€๊ธˆ๊นŒ์ง€ Airflow&Spark ์—ฐ๋™ ๋ฐ ํ™œ์šฉ์„ ํ•ด๋ณด์•˜์Šต๋‹ˆ๋‹ค.

ํ˜น์—ฌ๋‚˜ Analytics.py๋ฅผ ์‹คํ–‰ํ•˜๋˜ ์ค‘, ๋ฉ”๋ชจ๋ฆฌ ์ดˆ๊ณผ ๋ฌธ์ œ๋ฅผ ์ผ์œผํ‚ค๋Š” ๊ฒฝ์šฐ๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. ์ด ๊ฒฝ์šฐ, ๊ฑฐ๋ฆฌ์™€ ์š”๊ธˆ์˜ ์‚ฐํฌ๋„ ๋ฐ์ดํ„ฐ๊ฐ€ ๋„ˆ๋ฌด ํฐ ๋‚˜๋จธ์ง€ ์šฉ๋Ÿ‰์„ ์ดˆ๊ณผํ•˜๊ธฐ ๋•Œ๋ฌธ์— ๋ฐœ์ƒํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. ํŒŒ์ผ ์ž์ฒด์˜ ์šฉ๋Ÿ‰์„ ์ค„์ด๋˜์ง€, ์•„์˜ˆ ๊ฑฐ๋ฆฌ/์š”๊ธˆ ๋ฐ์ดํ„ฐ๋Š” ์ƒ์„ฑํ•˜์ง€ ์•Š์Œ์œผ๋กœ์จ ๋ฌธ์ œ๋ฅผ ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๋‹ค๋ฅธ ๊ทธ๋ž˜ํ”„๋“ค์€ ์šฉ๋Ÿ‰์ด ์ ์–ด ์›ํ™œํ•˜๊ฒŒ ์ž‘๋™ํ•  ๊ฒƒ์ž…๋‹ˆ๋‹ค!