[Airflow] Airflow & Spark ์—ฐ๋™ํ•ด์„œ ํ™œ์šฉํ•˜๊ธฐ (1)

2022. 9. 17. 13:42ใ†๐Ÿ›  Data Engineering/Apache Airflow

 

 

 

 

 

์ด๋ฒˆ ํฌ์ŠคํŒ…์—์„  Airlfow์™€ Spark์˜ ์—ฐ๋™์— ๋Œ€ํ•ด ๋‹ค๋ฃจ๊ฒ ์Šต๋‹ˆ๋‹ค.

Spark์—์„œ์˜ ์ž‘์—…์„  Airflow๋ฅผ ํ†ตํ•ด ์ž๋™ํ™”์‹œํ‚ค๋Š” ์ž‘์—…๊นŒ์ง€ ํ•ด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค.

๋‹น์—ฐํžˆ Airlflow์™€ pyspark ํ™˜๊ฒฝ์ด ์„ธํŒ…๋˜์–ด ์žˆ์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค.

 

 

 

* ๋ณธ ํฌ์ŠคํŒ…์€ ํ•ด๋‹น ๊ฐ•์˜๋ฅผ ์ฐธ๊ณ ํ•œ ๊ฒƒ์ž„์„ ๋ฐํž™๋‹ˆ๋‹ค.

 

 

 

 

[pyspark ์„ธํŒ…ํ•˜๋Ÿฌ ๊ฐ€๊ธฐ]

https://mengu.tistory.com/25?category=932924 

 

[Spark] ์ดˆ๊ธฐ ํ™˜๊ฒฝ ์„ธํŒ… ft. ํ˜ธํ™˜ ๋ฌธ์ œ ํ•ด๊ฒฐ

์ด๋ฒˆ ํฌ์ŠคํŒ…์€ Spark ์ดˆ๊ธฐ ํ™˜๊ฒฝ ์„ธํŒ…์ด๋‹ค. Spark, pyspark, java ๋“ฑ ๊ทธ๋ƒฅ ์„ค์น˜ํ•ด์„œ ๋๋‚ด๋ฉด ๋˜๋Š” ๊ฑฐ ์•„๋‹ˆ๋ƒ ํ•  ์ˆ˜ ์žˆ์ง€๋งŒ, ์ค‘๊ฐ„์— ๋ฒ„์ „ ํ˜ธํ™˜ ๋ฌธ์ œ๊ฐ€ ์กด์žฌํ•ด์„œ ๋ง‰ํž ์ˆ˜ ์žˆ๋‹ค. ๊ทธ ๋ถ€๋ถ„์„ ์ง‘์–ด์ฃผ๊ณ ์ž ํฌ์ŠคํŒ…

mengu.tistory.com

 

 

 

๋ชฉ์ฐจ

๐Ÿ“ƒ Airflow x Spark ์—ฐ๋™ ์›๋ฆฌ

๐Ÿ“ƒ ์—ฐ๋™ํ•˜๊ธฐ

๐Ÿ“ƒ mini project

 

 

 

 

Airflow x Spark ์—ฐ๋™ ์›๋ฆฌ

 

๐Ÿ“Œ ์›๋ฆฌ

 

๋ณธ๋ž˜ Airflow์—์„œ ๊ฐ„๋‹จํ•œ ๋ฐ์ดํ„ฐ ์ž‘์—…์ด ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค. 

ํ•˜์ง€๋งŒ ๋Œ€๋Ÿ‰์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์˜ฎ๊ธฐ๊ณ , ๊ฐ€๊ณตํ•˜๋Š” ์ž‘์—…์€ Airflow์—์„œ ์ง์ ‘ ์ˆ˜ํ–‰ํ•˜์ง€ ์•Š๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค. ์–ด๋””๊นŒ์ง€๋‚˜ Airflow๋Š” ์›Œํฌ ํ”„๋กœ์šฐ ํ”Œ๋žซํผ์ด๊ธฐ ๋•Œ๋ฌธ์ž…๋‹ˆ๋‹ค. ๊ทธ๋ ‡๋‹ค๋ฉด ์–ด๋–ป๊ฒŒ Spark ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋„๋ก ํ• ๊นŒ์š”?

 

 

 

 

Airflow ์•ˆ์— ์žˆ๋Š” SparkOperator๋ฅผ ์ด์šฉํ•˜๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

Airflow๋Š” ์ž์ฒด์ ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋‹ค๋ฃจ๊ธฐ ๋ณด๋‹จ, ์ด๋ฏธ Spark ์ž‘์—…์ด ๊ตฌํ˜„๋˜์–ด ์žˆ๋Š” 'Task ํŒŒ์ผ'์„ ์‹คํ–‰์‹œํ‚ค๋Š” ๊ฒƒ์ด ํšจ์œจ์ ์ด๋ผ ํŒ๋‹จํ–ˆ์Šต๋‹ˆ๋‹ค. ์ด์— ๋”ฐ๋ผ DAG Task๋“ค์€ ๊ฑฐ๋Œ€ํ•œ ๊ธฐ๋Šฅ์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ๋ณด๋‹จ, Spark ํŒŒ์ผ์„ ์‹คํ–‰์‹œํ‚ค๋Š” ๊ฒƒ์— ์ดˆ์ ์ด ๋งž์ถฐ์ง‘๋‹ˆ๋‹ค. ์›Œํฌ ํ”Œ๋กœ์šฐ๋Š” ๋”์šฑ ๊ฐ„ํŽธํ•ด์ง€๊ณ , Task๋“ค์€ ๊ฐ์ž์˜ ํŒŒ์ผ์—์„œ ์ฝคํŒฉํŠธํ•˜๊ฒŒ ๊ด€๋ฆฌ๋  ์ˆ˜ ์žˆ์–ด ํŽธ๋ฆฌํ•ฉ๋‹ˆ๋‹ค.

 

 

 

 

์—ฐ๋™ํ•˜๊ธฐ

 

๐Ÿ“Œ ํ•„์š”ํ•œ ํŒจํ‚ค์ง€ ์„ค์น˜

 

airflow์™€ spark ์—ฐ๋™์„ ์œ„ํ•œ ํŒจํ‚ค์ง€๋ฅผ ๊น”์•„์ค๋‹ˆ๋‹ค.

pip install apache-airflow-providers-apache-spark

 

 

 

๐Ÿ“Œ ๊ธฐ๋ณธ ํŒŒ์ผ ์ƒ์„ฑ & Import

 

(1) airflow ์•ˆ์— dags ํด๋”์— ํŒŒ์ผ์„ ํ•˜๋‚˜ ์ƒ์„ฑํ•ด์ค๋‹ˆ๋‹ค.

 

 

(2) ํŒจํ‚ค์ง€๋“ค์„ airflow_spark.py ํŒŒ์ผ ๋‚ด์—์„œ ์ž„ํฌํŠธ ํ•ด์ค๋‹ˆ๋‹ค.

from datetime import datetime
from airflow import DAG
# from airflow.providers.apache.spark.operators.spark_sql import SparkSqlOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

์›๋ž˜ SparkSqlOperator๋ฅผ ํ†ตํ•ด Spark๋ฌธ์„ ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ์ด ๊ฐ€๋Šฅํ•ฉ๋‹ˆ๋‹ค. ํ•˜์ง€๋งŒ ์ด ๋ฐฉ๋ฒ•์€ Airflow ๋‚ด์—์„œ ์‹คํ–‰์‹œํ‚ค๋Š” ๋ฐฉ๋ฒ•์ด๋ฏ€๋กœ ๋Œ€์šฉ๋Ÿ‰/๋ณต์žกํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•  ๋• ์•Œ๋งž์ง€ ์•Š์Šต๋‹ˆ๋‹ค. SparkSubmitOperator๋ฅผ ํ†ตํ•ด ์œ„์—์„œ ์–ธ๊ธ‰ํ–ˆ๋˜ ๋ฐฉ๋ฒ•์œผ๋กœ spark ํŒŒ์ผ์„ ์ง์ ‘ ์‹คํ–‰์‹œํ‚ค๋„๋ก ํ•˜๊ฒ ์Šต๋‹ˆ๋‹ค. 

 

 

 

 

๐Ÿ“Œ Connection

 

Airflow์™€ Spark๊ฐ€ ์—ฐ๋™๋˜๊ธฐ ์œ„ํ•ด์„  Spark Connection์„ ์ƒ์„ฑํ•ด์ค˜์•ผ ํ•ฉ๋‹ˆ๋‹ค. Airlfow๋Š” ์ง€์ •๋œ Connection์„ ๋ฐ”ํƒ•์œผ๋กœ ์ ‘์† ๋ฐ ๊ธฐ๋Šฅ์„ ์ˆ˜ํ–‰ํ•ฉ๋‹ˆ๋‹ค. 

 

(1) Web Server ์—ฐ๊ฒฐ

(base) mingu@DESKTOP-6EBUJPM:~/airflow$ airflow webserver
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
Running the Gunicorn Server with:
Workers: 4 sync
Host: 0.0.0.0:8080
Timeout: 120
Logfiles: - -
Access Logformat:
=================================================================
[2022-09-15 13:18:24 +0900] [233] [INFO] Starting gunicorn 20.1.0
[2022-09-15 13:18:25 +0900] [233] [INFO] Listening at: http://0.0.0.0:8080 (233)
[2022-09-15 13:18:25 +0900] [233] [INFO] Using worker: sync
[2022-09-15 13:18:25 +0900] [234] [INFO] Booting worker with pid: 234
[2022-09-15 13:18:25 +0900] [235] [INFO] Booting worker with pid: 235
[2022-09-15 13:18:25 +0900] [236] [INFO] Booting worker with pid: 236
[2022-09-15 13:18:25 +0900] [237] [INFO] Booting worker with pid: 237

 

 

 

(2) Connection ์ƒ์„ฑ

 

 

 

(3) Connection Type๋ฅผ ๊ผญ Spark๋กœ ์ง€์ •ํ•ด์ฃผ์…”์•ผ ํ•ฉ๋‹ˆ๋‹ค!

 

 

 

์ด๋ ‡๊ฒŒ ๋˜๋ฉด ์—ฐ๋™์€ ์™„๋ฃŒ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์•ž์œผ๋กœ DAG๋ฅผ ์ž‘์„ฑํ•  ๋•Œ SparkSubmitOperator๋ฅผ ์ด์šฉํ•˜์—ฌ Spark ํŒŒ์ผ์„ ์‹คํ–‰์‹œํ‚ค๊ธฐ๋งŒ ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค. ์ž์„ธํ•œ ์ดํ•ด๋ฅผ ์œ„ํ•ด ๊ฐ„๋‹จํ•œ ์‹ค์Šต์„ ์ง„ํ–‰ํ•ด๋ณด๊ฒ ์Šต๋‹ˆ๋‹ค. 

 

 

 

๋‹ค์Œ ํฌ์ŠคํŒ…์—์„œ ๋ต™๊ฒ ์Šต๋‹ˆ๋‹ค!