[SparkSQL] ํƒ์‹œ ๋ฐ์ดํ„ฐ ๋‹ค์šด/์ „์ฒ˜๋ฆฌ/๋ถ„์„ feat. TLC

2022. 5. 10. 11:32ใ†๐Ÿ›  Data Engineering/Apache Spark

 

 

 

 

์ด์ „ ํฌ์ŠคํŒ…์—์„œ ๊ณต๋ถ€ํ•œ SparkSQL ์ง€์‹์„ ๋ฐ”ํƒ•์œผ๋กœ, ์‹ค์ œ Taxi ๋ฐ์ดํ„ฐ๋ฅผ ์ „์ฒ˜๋ฆฌํ•ด๋ณด์ž.

* ์ „์ฒ˜๋ฆฌ๋ž€? ์ด์ƒ์น˜ ์ œ๊ฑฐ, ๊ทธ๋ฃนํ™” ๋“ฑ ๋ฐ์ดํ„ฐ ๋ถ„์„์ด ์šฉ์ดํ•˜๋„๋ก ๋ฐ์ดํ„ฐ๋ฅผ ๋ณ€ํ˜•ํ•˜๋Š” ๊ณผ์ •์„ ๋งํ•œ๋‹ค.

 

 

TLC Trip Record Data์—์„œ ๋จผ์ € ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„์˜ค์ž. TLC๋Š” ๋ฏธ๊ตญ์˜ ํƒ์‹œ ์šด์ „ ๋ฐ์ดํ„ฐ๋ฅผ ๋ชจ์•„๋†“์€ ์•„์ฃผ ์œ ์šฉํ•œ ์‚ฌ์ดํŠธ๋‹ค.

[https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page]

 

TLC Trip Record Data - TLC

TLC Trip Record Data The yellow and green taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The data

www1.nyc.gov

 

 

 

"๋ณธ ํฌ์ŠคํŒ…์€ ํŒจ์ŠคํŠธ์บ ํผ์Šค์˜ ๊ฐ•์˜๋ฅผ ๋“ฃ๊ณ , ์ •๋ฆฌํ•œ ์ž๋ฃŒ์ž„์„ ๋ฐํž™๋‹ˆ๋‹ค."

 

 

 

 

 

Data Download


1) ๊ธฐ๋ณธ ํŽ˜์ด์ง€

 

 

 

 

2) 2021๋…„๋„ 01์›”~07์›” Yellow Taxi Trip Records(CSV) ํด๋ฆญํ•ด์„œ ๋‹ค์šด๋กœ๋“œํ•˜๊ธฐ

 

 

 

 

3) Taxi Zone Lookup Table (CSV) ๋‹ค์šด๋กœ๋“œ

 

 

 

 

4) Spark ์ž‘์—…ํ•˜๋Š” ๊ฒฝ๋กœ์— data ํด๋” ์ƒ์„ฑํ•˜๊ธฐ C:\์ž์‹ ์ด ์ž‘์—…ํ•˜๋Š” ๊ฒฝ๋กœ\data / data ํŒŒ์ผ ์•ˆ์— trips ํด๋” ์ƒ์„ฑํ•˜๊ธฐ

 

 

 

5) data ํด๋” ์•ˆ์— Lookup Table(CSV) ํŒŒ์ผ์„ ๋„ฃ๊ณ , trips ํด๋” ์•ˆ์— Yellow Taxi Trip Records(CSV) ๋„ฃ๊ธฐ

 

C:\์ž์‹ ์ด ์ž‘์—…ํ•˜๋Š” ๊ฒฝ๋กœ\data

 

C:\์ž์‹ ์ด ์ž‘์—…ํ•˜๋Š” ๊ฒฝ๋กœ\data\trips

 

 

 

์ด์ œ ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ํŒŒ์ผ ์„ธํŒ…์ด ๋๋‚ฌ๋‹ค. ๋ณธ๊ฒฉ์ ์œผ๋กœ Spark๋ฅผ ์„ธํŒ…ํ•˜๊ณ  ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ๋ฅผ ์‹œ์ž‘ํ•ด๋ณด์ž.

 

 

 

 

 

Basic Settings


# matplotlib ํฐํŠธ ์ง€์ •
from matplotlib import font_manager, rc
font_path = '์ž์‹ ์ด ์“ฐ๊ณ  ์‹ถ์€ ํฐํŠธ ๊ฒฝ๋กœ'
font = font_manager.FontProperties(fname=font_path).get_name()
rc('font', family=font)

 

import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession

spark = SparkSession.builder.master('local').appName("taxi-analysis").getOrCreate()


# ๋ฐ์ดํ„ฐ๊ฐ€ ์žˆ๋Š” ํŒŒ์ผ
# 'trips/*'์„ ํ†ตํ•ด trips ํด๋” ์•ˆ์— ์žˆ๋Š” ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ๋ถˆ๋Ÿฌ์˜ต๋‹ˆ๋‹ค.
zone_data = "C:/Spark ์ž‘์—… ๊ฒฝ๋กœ/data/taxi_zone_lookup.csv"
trip_files = "C:/Spark ์ž‘์—… ๊ฒฝ๋กœ/data/trips/*"


# ๋ฐ์ดํ„ฐ ๋กœ๋“œ
trips_df = spark.read.csv(f"file:///{trip_files}", inferSchema = True, header = True)
zone_df = spark.read.csv(f"file:///{zone_data}", inferSchema = True, header = True)

 

๋ฐ์ดํ„ฐ ์Šคํ‚ค๋งˆ ์‚ดํŽด๋ณด๊ณ , createOrReplaceTempView() ์ ์šฉํ•˜๊ธฐ

 

# ๋ฐ์ดํ„ฐ ์Šคํ‚ค๋งˆ
trips_df.printSchema()
zone_df.printSchema()


root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: string (nullable = true)
 |-- tpep_dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

----

# ๋ฐ์ดํ„ฐ createOrReplaceTempView()
trips_df.createOrReplaceTempView("trips")
zone_df.createOrReplaceTempView("zone")

 

 

 

[trips_df์˜ ์นผ๋Ÿผ ์„ค๋ช…]

1. VendorID: ๊ธฐ๋ก์„ ์ œ๊ณตํ•˜๋Š” the TPEP provider๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” ์ฝ”๋“œ

2. tpep_pickup_datetime: ์Šน์ฐจ ๋‚ ์งœ ๋ฐ ์‹œ๊ฐ„

3. tpep_dropoff_datetime: ํ•˜์ž ๋‚ ์งœ ๋ฐ ์‹œ๊ฐ„

4. passenger_count: ์Šน๊ฐ ์ˆ˜

5. trip_distance: ๊ฑฐ๋ฆฌ(mile)

6. PULocationID: ์Šน์ฐจ ์žฅ์†Œ(ID)

7. DOLocationID: ํ•˜์ฐจ ์žฅ์†Œ(ID)

8. payment_type: ์ง€๋ถˆ ๋ฐฉ๋ฒ•

9. fare_amount: ์š”๊ธˆ
10. extra: ์ถ”๊ฐ€ ์š”๊ธˆ
11. tip_amount: ํŒ
12. tolls_amount: ํ†จ๋น„
13. total_amount: ์ด ๋น„์šฉ

 

 

[zone_df์˜ ์นผ๋Ÿผ ์„ค๋ช…]

1. LocationID: ์ง€์—ญ ID
2. Borough: ํฌ๊ฒŒ ๋ณธ ์ง€์—ญ๊ตฌ
3. Zone: ์ง€์—ญ ๋™

 

 

 

 

 

 

๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ


1) ์“ธ๋งŒํ•œ column๋งŒ ๋ชจ์•„๋†“๊ธฐ

์Šน/ํ•˜์ฐจ ์‹œ๊ฐ„, ์žฅ์†Œ, ์ง€๋ถˆ ๋ฐฉ๋ฒ•, ์Šน๊ฐ ์ˆ˜, ์ „์ฒด ๋น„์šฉ ๋“ฑ ๋ถ„์„์— ํ•„์š”ํ•œ ์นผ๋Ÿผ๋งŒ ๋‚จ๊ธฐ๊ณ  ๋‚˜๋จธ์ง„ ์ง€์› ๋‹ค.

 

# ์“ธ๋งŒํ•œ column๋งŒ ๋ชจ์•„๋†“์ž.
query = '''
SELECT
    t.VendorID,
    TO_DATE(t.tpep_pickup_datetime) AS pickup_date,
    HOUR(t.tpep_pickup_datetime) AS pickup_time,
    TO_DATE(t.tpep_dropoff_datetime) AS dropoff_date,
    HOUR(t.tpep_dropoff_datetime) AS dropoff_time,
    t.passenger_count,
    t.trip_distance,
    t.payment_type,
    t.fare_amount,
    t.tip_amount,
    t.tolls_amount,
    t.total_amount,
    pz.Zone as pzone,
    dz.Zone as dzone
FROM
    trips t
    LEFT JOIN 
        zone pz
    ON
        t.PULocationID == pz.LocationID
    LEFT JOIN 
        zone dz
    ON
        t.DOLocationID == dz.LocationID
'''
taxi_df = spark.sql(query)
taxi_df.createOrReplaceTempView("taxi")

spark.sql('select * from taxi').show(5)


+--------+-----------+-----------+------------+------------+---------------+-------------+------------+-----------+----------+------------+------------+-----------------+--------------+
|VendorID|pickup_date|pickup_time|dropoff_date|dropoff_time|passenger_count|trip_distance|payment_type|fare_amount|tip_amount|tolls_amount|total_amount|            pzone|         dzone|
+--------+-----------+-----------+------------+------------+---------------+-------------+------------+-----------+----------+------------+------------+-----------------+--------------+
|       2| 2021-03-01|          0|  2021-03-01|           0|              1|          0.0|           2|        3.0|       0.0|         0.0|         4.3|               NV|            NV|
|       2| 2021-03-01|          0|  2021-03-01|           0|              1|          0.0|           2|        2.5|       0.0|         0.0|         3.8|   Manhattanville|Manhattanville|
|       2| 2021-03-01|          0|  2021-03-01|           0|              1|          0.0|           2|        3.5|       0.0|         0.0|         4.8|   Manhattanville|Manhattanville|
|       1| 2021-03-01|          0|  2021-03-01|           0|              0|         16.5|           1|       51.0|     11.65|        6.12|       70.07|LaGuardia Airport|            NA|
|       2| 2021-03-01|          0|  2021-03-01|           0|              1|         1.13|           1|        5.5|      1.86|         0.0|       11.16|     East Chelsea|            NV|
+--------+-----------+-----------+------------+------------+---------------+-------------+------------+-----------+----------+------------+------------+-----------------+--------------+
only showing top 5 rows

 

์•ž์˜ 5๊ฐœ ๋ฐ์ดํ„ฐ๋งŒ ๋ด๋„ ์•Œ ์ˆ˜ ์žˆ๋“ฏ์ด, trip_distance๊ฐ€ 0์ด๊ณ  ์Šน์ฐจ ์žฅ์†Œ๊ฐ€ NA๋กœ ๋‚˜์™€์žˆ๋Š” ๋“ฑ ๋ฐ์ดํ„ฐ ์ž์ฒด๊ฐ€ ์ข€ ๋”๋Ÿฝ๋‹ค. ๋ถ„์„์— ๋ฐ”๋กœ ๊ฐ–๋‹ค ์“ฐ๋ฉด ์‹ญ์ค‘ํŒ”๊ตฌ ์ด์ƒํ•œ ๊ฒฐ๋ก ์— ๋„๋‹ฌํ•  ๊ฒƒ์ด๋‹ค. ๊นจ๋—ํ•˜๊ฒŒ ์ •์ œํ•ด์ฃผ์ž.

 

 

 

 

2) ๋ฐ์ดํ„ฐ์˜ ๋”๋Ÿฌ์šด ๋ถ€๋ถ„๋“ค์„ ์ฐพ์•„๋ณด์ž.

 

# step 1. 2021.01 ์ž๋ฃŒ๋ถ€ํ„ฐ 2021.06 ์ž๋ฃŒ๊นŒ์ง€๋งŒ ๋‹ค์šดํ–ˆ์œผ๋ฏ€๋กœ, ๊ทธ ์™ธ ๋ฐ์ดํ„ฐ๋Š” ์ง€์šด๋‹ค.
# 2002๋…„ ๋ฐ์ดํ„ฐ๊ฐ€ ์™œ ์—ฌ๊ธฐ์—...?

spark.sql('select pickup_date from taxi order by pickup_date').show()


+-----------+
|pickup_date|
+-----------+
| 2002-12-31|
| 2003-01-05|
| 2004-04-04|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
| 2008-12-31|
+-----------+

 

# step 2 - ๋‹ค์–‘ํ•œ ๊ฐ’๋“ค์—์„œ ์ด์ƒ์น˜ ์ฐพ๊ธฐ
# ๋…ธ๋ž€ ํƒ์‹œ์— ์Šน๊ฐ์ด 9๋ช… ํƒˆ ์ˆ˜ ์žˆ์—ˆ๋‚˜?
for i in ['passenger_count', 'trip_distance', 'fare_amount', 'tip_amount', 'tolls_amount', 'total_amount']:
    print(f'{i}์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------')
    taxi_df.select(i).describe().show()
    
    
passenger_count์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+------------------+
|summary|   passenger_count|
+-------+------------------+
|  count|          14166672|
|   mean|1.4253783104458126|
| stddev|  1.04432704905968|
|    min|                 0|
|    max|                 9|
+-------+------------------+

trip_distance์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+-----------------+
|summary|    trip_distance|
+-------+-----------------+
|  count|         15000700|
|   mean|6.628629402627818|
| stddev|671.7293482115828|
|    min|              0.0|
|    max|        332541.19|
+-------+-----------------+

fare_amount์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+------------------+
|summary|       fare_amount|
+-------+------------------+
|  count|          15000700|
|   mean| 12.89269334830367|
| stddev|145.54843567115813|
|    min|            -643.5|
|    max|         398466.38|
+-------+------------------+

tip_amount์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+-----------------+
|summary|       tip_amount|
+-------+-----------------+
|  count|         15000700|
|   mean|2.146797558780939|
| stddev|2.610914434555077|
|    min|          -333.32|
|    max|          1140.44|
+-------+-----------------+

tolls_amount์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+-------------------+
|summary|       tolls_amount|
+-------+-------------------+
|  count|           15000700|
|   mean|0.31795104561765897|
| stddev| 1.6542914124457562|
|    min|             -38.02|
|    max|             956.55|
+-------+-------------------+

total_amount์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+-----------------+
|summary|     total_amount|
+-------+-----------------+
|  count|         15000700|
|   mean|18.75545205708744|
| stddev|145.7442452805979|
|    min|           -647.8|
|    max|         398469.2|
+-------+-----------------+

 

์ž์„ธํžˆ ๋ณด๋ฉด ์žฌ๋ฐŒ๋Š” ๋ฐ์ดํ„ฐ๋“ค์„ ๋ฐœ๊ฒฌํ•  ์ˆ˜ ์žˆ๋‹ค. trip_distance์˜ ์ตœ๊ณ  ๊ฐ’์ด 332541 mile (535172.863km)์ด๋‹ค. ์ง€๊ตฌ ํ•œ ๋ฐ”ํ€ด์— 42000km์ธ ๊ฒƒ์„ ์ƒ๊ฐํ•˜๋ฉด, ํƒ์‹œ๋Š” ๋„๋Œ€์ฒด ์ง€๊ตฌ๋ฅผ ๋ช‡ ๋ฐ”ํ€ด ๋ˆ ๊ฒƒ์ผ๊นŒ? ๋ง๋„ ์•ˆ ๋˜๋ฏ€๋กœ ์‚ญ์ œํ•ด์ค˜์•ผ ํ•˜๋Š” ๋ฐ์ดํ„ฐ๋‹ค. ๊ทธ ์™ธ์—๋„ ์‚ญ์ œํ•ด์•ผ ํ•  ๋ฐ์ดํ„ฐ๋“ค์ด ๋งŽ๋‹ค. ์Šฌ์ฉ ๋ณด๋ฉด์„œ ์–ด๋–ค ๋ฐ์ดํ„ฐ๋ฅผ ์ „์ฒ˜๋ฆฌํ•ด์ค˜์•ผ ํ• ์ง€ ์Šค์Šค๋กœ ์ƒ๊ฐํ•˜๊ธธ ๋ฐ”๋ž€๋‹ค.

 

 

passenger_count: 9๋ช…์€ ๋„ˆ๋ฌด ๋งŽ๋‹ค. 5๋ช…๊นŒ์ง€ ์ œํ•œ.
trip_distance: 0๋ณด๋‹จ ์ปค์•ผ ํ•˜๊ณ , max๋Š” 100๋งˆ์ผ๋กœ ์ œํ•œ.
fare_amount: 0๋ณด๋‹ค ์ปค์•ผํ•˜๊ณ , max๋Š” 100์œผ๋กœ ์ œํ•œ
tip_amount: 0๋ณด๋‹ค ์ปค์•ผํ•˜๊ณ , max๋Š” 50์œผ๋กœ ์ œํ•œ
tolls_amount: 0๋ณด๋‹ค ์ปค์•ผํ•˜๊ณ , max๋Š” 10์œผ๋กœ ์ œํ•œ
total_amount: 0๋ณด๋‹ค ์ปค์•ผํ•˜๊ณ , max๋Š” 1000์œผ๋กœ ์ œํ•œ

 

 

 

3) Data Cleaning

 

# data clearning
# fare, tip, tolls ์— ๋Œ€ํ•œ ๊ฒƒ์€ ๋‚˜์ค‘์— df ๋งŒ๋“ค๋•Œ ์ „์ฒ˜๋ฆฌํ•ด์ฃผ์ž.
query = '''
SELECT
    *
FROM
    taxi t
WHERE
    t.total_amount < 2000
    AND t.total_amount > 0
    AND t.trip_distance < 100
    AND t.passenger_count < 6
    AND t.pickup_date >= '2021-01-01'
    AND t.pickup_date < '2021-08-01'
    AND t.dropoff_date >= '2021-01-01'
    AND t.dropoff_date < '2021-08-03'
'''

df_c = spark.sql(query)
df_c.createOrReplaceTempView('cleaned')

 

SQL๋ฌธ์„ ์ด์šฉํ•˜์—ฌ ๋ฐ์ดํ„ฐ ์ „์ฒ˜๋ฆฌ๋ฅผ ํ•ด์ฃผ์—ˆ๋‹ค.

์ „์ฒ˜๋ฆฌ๊ฐ€ ์•Œ๋งž๊ฒŒ ๋˜์—ˆ๋Š”์ง€ ํ™•์ธํ•ด๋ณด์ž.

 

# ์ „์ฒ˜๋ฆฌ ํ™•์ธ
spark.sql('SELECT pickup_date FROM cleaned ORDER BY pickup_date').show()


+-----------+
|pickup_date|
+-----------+
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
| 2021-01-01|
+-----------+

 

2021-01-01 ๋ฐ์ดํ„ฐ๋ถ€ํ„ฐ ์กด์žฌํ•˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

๋‹ค๋ฅธ ์นผ๋Ÿผ๋“ค์˜ ์š”์•ฝ ๋ฐ์ดํ„ฐ๋“ค์„ ํ™•์ธํ•ด๋ณด์ž.

 

# step 2 - ๋‹ค์–‘ํ•œ ๊ฐ’๋“ค์—์„œ ์ด์ƒ์น˜ ์ฐพ์•„์„œ ์—†์• ๊ธฐ
for i in ['passenger_count', 'trip_distance', 'fare_amount', 'tip_amount', 'tolls_amount', 'total_amount']:
    print(f'{i}์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------')
    df_c.select(i).describe().show()


passenger_count์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+------------------+
|summary|   passenger_count|
+-------+------------------+
|  count|          13855011|
|   mean|1.3471861552473687|
| stddev|0.8633321134239649|
|    min|                 0|
|    max|                 5|
+-------+------------------+

trip_distance์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+------------------+
|summary|     trip_distance|
+-------+------------------+
|  count|          13855011|
|   mean|2.8443242809406217|
| stddev|3.6296305328849514|
|    min|               0.0|
|    max|             99.96|
+-------+------------------+

fare_amount์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+------------------+
|summary|       fare_amount|
+-------+------------------+
|  count|          13855011|
|   mean|12.173092585057173|
| stddev|10.921311212928543|
|    min|              -0.8|
|    max|            1320.0|
+-------+------------------+

tip_amount์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+------------------+
|summary|        tip_amount|
+-------+------------------+
|  count|          13855011|
|   mean| 2.189352297880318|
| stddev|2.5818332665863704|
|    min|               0.0|
|    max|             700.0|
+-------+------------------+

tolls_amount์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+-------------------+
|summary|       tolls_amount|
+-------+-------------------+
|  count|           13855011|
|   mean|0.27130274598821397|
| stddev|  1.540763209706977|
|    min|                0.0|
|    max|             956.55|
+-------+-------------------+

total_amount์˜ ๋ฐ์ดํ„ฐ ์š”์•ฝ๋ณธ -----------------------------------
+-------+------------------+
|summary|      total_amount|
+-------+------------------+
|  count|          13855011|
|   mean| 18.08681097854774|
| stddev|13.216750985022102|
|    min|              0.01|
|    max|            1320.8|
+-------+------------------+

 

fare, tolls ๋“ฑ ์ „์ฒ˜๋ฆฌ๋ฅผ ํ•˜์ง€ ์•Š์€ ์นผ๋Ÿผ์€ ๊ทธ๋Œ€๋กœ์ด์ง€๋งŒ, passenger์™€ trip_distance ๋“ฑ์˜ ๋ฐ์ดํ„ฐ๋Š” ํ™•์‹คํžˆ ๋ณ€ํ•œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. ๊ทธ๋ ‡๋‹ค๋ฉด ์ •์ œ๋œ ๋ฐ์ดํ„ฐ๋กœ ๊ฐ„๋‹จํ•œ ๋ถ„์„์„ ์ง„ํ–‰ํ•ด๋ณด์ž.

 

 

 

 

4) ๊ฐ„๋‹จํ•œ ๋ถ„์„ 1. ์š”์ผ ๋ณ„ trips ์ˆ˜ ์„ธ๊ธฐ

SQL ๋ฌธ์˜ DATE_FORMAT()๋ฅผ ์‚ฌ์šฉํ•˜์—ฌ date๋ฅผ ์š”์ผ ๋ฐ์ดํ„ฐ๋กœ ๋ณ€ํ™˜ํ–ˆ๋‹ค. ์š”์ผ ๋ณ„๋กœ ๊ทธ๋ฃน ํ•˜์—ฌ trips ์ˆ˜๋ฅผ ์…Œ๋‹ค.

 

import matplotlib.pyplot as plt
import seaborn as sns

# 1. ์š”์ผ ๋ณ„ trips ์ˆ˜ ์„ธ๊ธฐ

query = '''
SELECT
    DATE_FORMAT(c.pickup_date, 'EEEE') AS day_of_week,
    COUNT(*) AS trips
FROM
    cleaned c
GROUP BY
    day_of_week
'''
weekday_df = spark.sql(query).toPandas()


# ๊ทธ๋ž˜ํ”„ ๊ทธ๋ฆฌ๊ธฐ
fig, ax = plt.subplots(figsize=(16,6))
sns.barplot(
    x = 'day_of_week',
    y = 'trips',
    data = weekday_df
)

 

 

 

 

 

4) ๊ฐ„๋‹จํ•œ ๋ถ„์„ 2. ์ง€๋ถˆ ๋ฐฉ๋ฒ• ํ†ต๊ณ„ ๋‚ด๊ธฐ

์ง€๋ถˆ ์ฝ”๋“œ๋ฅผ ์ง€๋ถˆ ๋ฐฉ๋ฒ• String์œผ๋กœ ๋ณ€ํ™˜ํ•˜๊ณ , ์ง€๋ถˆ ๋ฐฉ๋ฒ•์„ Grouping ํ•˜์—ฌ trips ์ˆ˜๋ฅผ ์„ธ๋„๋ก ํ–ˆ๋‹ค.

 

# 6. ์ง€๋ถˆ ๋ฐฉ๋ฒ• ๋‹จ์ˆœ ํ†ต๊ณ„
payment_type_to_string = {
    1: "Credit Card",
    2: "Cash",
    3: "No Charge",
    4: "Dispute",
    5: "Unknown",
    6: "Voided Trip",
}


# UDF ์ง€์ •ํ•˜๊ธฐ
def parse_payment_type(payment_type):
    return payment_type_to_string[payment_type]

spark.udf.register('parse_payment_type', parse_payment_type)


# ์ฟผ๋ฆฌ ์งœ๊ธฐ
query = '''
SELECT
    parse_payment_type(payment_type) as payment,
    COUNT(*) AS trips
FROM
    cleaned c
GROUP BY
    payment_type
'''

df_payment = spark.sql(query).toPandas()

----

display(df_payment)


	 payment	     trips
0	 Credit Card	 10534838
1	 No Charge	     59181
2	 Dispute	     23856
3	 Cash	         3237135
4	 Unknown	     1

----

# ์ง€๋ถˆ ๋ฐฉ๋ฒ•์œผ๋กœ ๊ทธ๋ž˜ํ”„ ๊ทธ๋ฆฌ๊ธฐ
fig, ax = plt.subplots(figsize=(16,6))
plt.title('payment', fontsize = 30)
sns.barplot(
    x = 'payment',
    y = 'trips',
    data = df_payment
)

 

 

 

 

์ง€๊ธˆ๊นŒ์ง€ TLC Trip Record Data์˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ „์ฒ˜๋ฆฌํ•˜๊ณ  ๊ฐ„๋‹จํ•œ ๋ถ„์„์„ ํ•ด๋ณด์•˜๋‹ค.

์Šค์Šค๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ด๋ฆฌ์ €๋ฆฌ ๋งŒ์ ธ๊ฐ€๋ฉฐ SparkSQL์— ์ต์ˆ™ํ•ด์ง€๊ธธ ๋ฐ”๋ž€๋‹ค.

Analytics ์„น์…˜์—์„œ TLC ๋ฐ์ดํ„ฐ ๋ถ„์„์„ ๋” ์ƒ์„ธํ•˜๊ฒŒ ํ•ด์„œ ํฌ์ŠคํŒ…ํ•˜๋„๋ก ํ•˜๊ฒ ๋‹ค.

์ˆ˜๊ณ ํ•˜์…จ์Šต๋‹ˆ๋‹ค.