2022. 5. 12. 22:08ใ๐งช Data Science/Analytics
๋น์ ์ NEWYORK์์ ํ์ ๊ธฐ์ฌ๋ก ์ผํ๊ณ ์๋ค.
ํ์๋ Yellow Taxi์ด๋ค.
ํ์ ๊ธฐ์ฌ๋ก ์์กดํ๊ธฐ ์ํด์ ๋ค์๊ณผ ๊ฐ์ ๋ ธ๋ ฅ์ด ํ์ํ๋ค.
1) ๊ทผ๋ฌด ์๊ฐ ๋์, ์๋์ ์ต๋ํ ๋ง์ด ํ์ธ ๊ฒ (์๋)
2) ๋ง์ ์๊ธ + ํ๋ถํ ํ (์๊ธ)
3) ํ๊ธ ๊ฒฐ์ ๋ ์๋์ ํฌํจ์ํค์ง ์๊ณ ์ฌ์ฉํ๊ธฐ (์ธ๊ธ ์ ์ฝ)
๋ ๊ฐ์ง๋ ๋จ์ํ ๋ ธ๋ ฅ๋ง์ผ๋ก ์ด๋ค์ง์ง ์๋๋ค.
๋น ๋ฐ์ดํฐ๋ฅผ ์ด์ฉํด ์๋์ด ๋ง์ ์ฅ์์ ์๊ฐ์ ํน์ ํ๊ณ , ๋ฏธ๋ฆฌ ๋๊ธฐํ๊ณ ์์ด์ผ ํ๋ค.
๋ํ ์๋์ด ํน์ ์ฅ์๋ฅผ ์๊ตฌํด๋, ๊ทธ๊ณณ์ด ๋ฐ์ดํฐ ์ ์๋์ด ๋ง์ ์ฅ์๊ฐ ์๋๋ผ๋ฉด ์ฉ๊ธฐ ์๊ฒ Pass ํ ํ์๋ ์๋ค.
๊ทธ๋ ๋ค๋ฉด ์ง๊ธ๋ถํฐ ๋ฐ์ดํฐ๋ฅผ ์ดํด๋ณด๋ฉฐ ์ ๋ต์ ๊ตฌ์ถํด๋ณด์.
[Data]
๋ฐ์ดํฐ๋ SparkSQL ํฌ์คํ ์์ ์ ์ฒ๋ฆฌํ Yellow Taxi ๋ฐ์ดํฐ(cleaned)๋ฅผ ์ฌ์ฉํ๋๋ก ํ๊ฒ ๋ค.
NEWYORK Yellow Taxi์ ์ดํ์ 2021.01~2021.07๊น์ง ๋ชจ๋ ๋ชจ์๋์ ๋ฐ์ดํฐ๋ค.
[Yellow Taxi Data: https://mengu.tistory.com/50]
Basic Setting
from matplotlib import font_manager, rc
font_path = 'C:\\WINDOWS\\Fonts\\HBATANG.TTF'
font = font_manager.FontProperties(fname=font_path).get_name()
rc('font', family=font)
import os
import findspark
findspark.init(os.environ.get("SPARK_HOME"))
import pyspark
from pyspark import SparkConf, SparkContext
import pandas as pd
import faulthandler
faulthandler.enable()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master('local').appName("taxi-analysis").getOrCreate()
# ๋ฐ์ดํฐ๊ฐ ์๋ ํ์ผ
zone_data = "C:/DE study/data-engineering/01-spark/data/taxi_zone_lookup.csv"
trip_files = "C:/DE study/data-engineering/01-spark/data/trips/*"
# ๋ฐ์ดํฐ ๋ก๋
trips_df = spark.read.csv(f"file:///{trip_files}", inferSchema = True, header = True)
zone_df = spark.read.csv(f"file:///{zone_data}", inferSchema = True, header = True)
# ๋ฐ์ดํฐ ์คํค๋ง
trips_df.printSchema()
zone_df.printSchema()
# ๋ฐ์ดํฐ createOrReplaceTempView()
trips_df.createOrReplaceTempView("trips")
zone_df.createOrReplaceTempView("zone")
# ์ธ๋งํ column๋ง ๋ชจ์๋์.
query = '''
SELECT
t.VendorID,
TO_DATE(t.tpep_pickup_datetime) AS pickup_date,
HOUR(t.tpep_pickup_datetime) AS pickup_time,
TO_DATE(t.tpep_dropoff_datetime) AS dropoff_date,
HOUR(t.tpep_dropoff_datetime) AS dropoff_time,
t.passenger_count,
t.trip_distance,
t.payment_type,
t.fare_amount,
t.tip_amount,
t.tolls_amount,
t.total_amount,
pz.Zone as pzone,
dz.Zone as dzone
FROM
trips t
LEFT JOIN
zone pz
ON
t.PULocationID == pz.LocationID
LEFT JOIN
zone dz
ON
t.DOLocationID == dz.LocationID
'''
taxi_df = spark.sql(query)
taxi_df.createOrReplaceTempView("taxi")
spark.sql('select * from taxi').show(5)
+--------+-----------+-----------+------------+------------+---------------+-------------+------------+-----------+----------+------------+------------+-----------------+--------------+
|VendorID|pickup_date|pickup_time|dropoff_date|dropoff_time|passenger_count|trip_distance|payment_type|fare_amount|tip_amount|tolls_amount|total_amount| pzone| dzone|
+--------+-----------+-----------+------------+------------+---------------+-------------+------------+-----------+----------+------------+------------+-----------------+--------------+
| 2| 2021-03-01| 0| 2021-03-01| 0| 1| 0.0| 2| 3.0| 0.0| 0.0| 4.3| NV| NV|
| 2| 2021-03-01| 0| 2021-03-01| 0| 1| 0.0| 2| 2.5| 0.0| 0.0| 3.8| Manhattanville|Manhattanville|
| 2| 2021-03-01| 0| 2021-03-01| 0| 1| 0.0| 2| 3.5| 0.0| 0.0| 4.8| Manhattanville|Manhattanville|
| 1| 2021-03-01| 0| 2021-03-01| 0| 0| 16.5| 1| 51.0| 11.65| 6.12| 70.07|LaGuardia Airport| NA|
| 2| 2021-03-01| 0| 2021-03-01| 0| 1| 1.13| 1| 5.5| 1.86| 0.0| 11.16| East Chelsea| NV|
+--------+-----------+-----------+------------+------------+---------------+-------------+------------+-----------+----------+------------+------------+-----------------+--------------+
only showing top 5 rows
# cleaning -------------------------------------------------------------------
# data clearning
# fare, tip, tolls ์ ๋ํ ๊ฒ์ ๋์ค์ df ๋ง๋ค๋ ์ ์ฒ๋ฆฌํด์ฃผ์.
query = '''
SELECT
*
FROM
taxi t
WHERE
t.total_amount < 2000
AND t.total_amount > 0
AND t.trip_distance < 100
AND t.passenger_count < 6
AND t.pickup_date >= '2021-01-01'
AND t.pickup_date < '2021-08-01'
AND t.dropoff_date >= '2021-01-01'
AND t.dropoff_date < '2021-08-03'
'''
df_c = spark.sql(query)
df_c.createOrReplaceTempView('cleaned')
์ ๋ต 1. ์๋์ ์ต๋ํ ๋ง์ด ํ์ธ ๊ฒ
์๋์ ๋ง์ด ํ์ฐ๋ ค๋ฉด ์ด๋ป๊ฒ ํด์ผํ ๊น?
์๋์ด ๋ง์ ์ฅ์, ์๊ฐ์ ๋๊ธฐํ๊ณ ์์ด์ผ ํ๋ค. ๋ํ์ฌ ์๋์ด Pick ํ ์ฅ์์๋ ์๋์ด ๋ง์์ผ ํ๋ค. ๊ทธ๋์ผ ๋ค์ ์๋์ ๋ฐ๊ธฐ๊ฐ ์์ํด์ง๊ธฐ ๋๋ฌธ์ด๋ค. Pick up์ด ๋ง์ด ๋ฐ์ํ๋ ์ฅ์์ ์๊ฐ์ ์์๋ด๊ณ , ํ๋ฃจ์ ๊ทผ๋ฌด ๊ฒฝ๋ก๋ฅผ ์ด๋ป๊ฒ ์ก์์ผ ํ ์ง ๊ณํ์ ์ง ๋ณด์.
A. Pick up์ด ๋ง์ด ๋ฐ์ํ๋ ์ฅ์ Top 5
# cleaned ๋ฐ์ดํฐ์์ ์ํ๋ ๋ฐ์ดํฐ๋ง ์ถ์ถํ๊ธฐ
query = '''
SELECT
c.pzone,
COUNT(*) AS trips
FROM
cleaned c
GROUP BY
c.pzone
'''
df_pzone = spark.sql(query).toPandas()
# pick-up Top 5
pzone = df_pzone[df_pzone['trips'] > 30000].sort_values('trips', ascending=False)
pzone.head()
1,2๋ฑ์ Upper East Side South, Upper East Side North์ผ๋ก ๋์ ํฉ์น trips ์๊ฐ ๋ฌด๋ ค 144๋ง์ด๋ค. 3๋ฑ์ Midtown, 4๋ฑ์ Penn Station ๋ถ๊ทผ, 5๋ฑ์ ๋ง์ปจ ์คํ์ด๋ค. ๊ฐ๊ฐ์ ์ฅ์๋ฅผ ์ง๋์ ํ์ํ๋ค.
๊ฐ๊ฐ ์ฅ์๋ค์ ํน์ง์ ๋์ดํด๋ณด์๋ฉด ๋ค์๊ณผ ๊ฐ๋ค.
1-2๋ฑ. Upper East Side South/North: ํ๊ฒฉ ๋์ ์ฃผํ๊ฐ. ๋ถ์ ์ธต ๊ฑฐ์ฃผ๋ฏผ๋ค์ด ์์ฒญ ๋ง๋ค. ๊ณ ๊ธ ๋ ์คํ ๋, ๋งค๋์จ๊ฐ์ ๋ช ํ ๋งค์ฅ์ผ๋ก ์ ๋ช ํ๋ค. ์ฌ๊ธฐ์ ํ์๋ฅผ ํ๋ค๋ฉด Tip์ ๋ง์ด ๋ฐ์ ์ ์์ง ์์๊น๋ผ๊ณ ๊ฐ์ค์ ์ธ์๋์.
3๋ฑ. Midtown Center: ๋ด์์์ ์๋ 4๊ฐ์ Center๋ก ๋ค์ํ ์ฌ๊ฐ์ ์์ ๋ค์ ๋๋ฆด ์ ์๋ค.
4๋ฑ. Penn Station/Madison Sq West: ๋งค๋์จ ์คํ์ด ๊ฐ๋ ์๋๋ฅผ ์ง๋๋ ์ฃผ์ ๋์ ๋ฐ ๋์ ๊ฐ ๋์์ฒ ๋์ ์ค์ฌ์ง. ์๋น๊ณผ ์์ ์ด ๋น์ฐํ ์ฆ๋นํด ์๋ค.
5๋ฑ. Lincoln Square East: Upper East Side์ ์๋ ๊ด์ฅ๊ณผ ์ฃผ๋ณ ์ง์ญ์ ์ด๋ฆ์ด๋ค. ๊ด๊ด ๋ช ์๋ก ๋ค์ํ ์์ ๊ณผ ๋ฏธ์ ๊ด, ์์ ๊ด๋ค์ด ์์นํด ์๋ค.
๋น์ ์ด ๋ด์์ ํ์ ๊ธฐ์ฌ๋ผ๋ฉด, ์์ ์ง์ญ์ ๋์๋ค๋ ์ผ ๋ถ๊ทํ ์๋๋ค์ ํ์ธ ์ ์์ ๊ฒ์ด๋ค.
ํ์ง๋ง ์ด ์ง์ญ๋ค์ ๊ตํต์ด ๋งํ ์คํ๋ ค ๋นํจ์จ์ ์ด๋ํ ์ ์๋ค. ๋ฐ๋ผ์ ๋นํจ์จ์ ๊ฐ์ํ ๋งํ ๊ฐ์น๊ฐ ์๋์ง ํ์ธํ๊ณ ์ ๊ทผํด์ผ ํ ๊ฒ์ด๋ค.
B. Pick up์ด ๋ง์ด ๋ฐ์ํ๋ ์๊ฐ๋
# ์๊ฐ๋๋ณ trips ๋น๋
query = '''
SELECT
c.pickup_time,
COUNT(*) AS trips
FROM
cleaned c
GROUP BY
c.pickup_time
'''
df_time = spark.sql(query).toPandas()
# ์๊ฐ๋๋ณ ์ถํด๊ทผ plot ๊ทธ๋ฆฌ๊ธฐ
fig, ax = plt.subplots(figsize=(16,6))
plt.title('trips count by time', fontsize = 30)
sns.barplot(
x = 'pickup_time',
y = 'trips',
data = df_time
)
์๋ฒฝ 1์-5์๊น์ง ํ์ Pick-up์ด ํ์ ํ ๋ฎ๋ค. ๋ฐ๋ผ์ ํ ์ฆ์ ๋ถ์ฌ์ ์ด์์ ํด์ผ ํ๋ค. ํ์ง๋ง ๊ตณ์ด ์๋ฒฝ์ 1์๊ฐ ์ด์ ํ๋ ๊ฒ๋ณด๋ค ๋ฎ์ 2 ์๊ฐ ํ๋ ๊ฒ์ด ๊ฑด๊ฐ, ์ด์ต ์ธก๋ฉด์์ ์ข์ ์ ์๋ค.
Trips ๋น๋๋ 6์๋ถํฐ ๊ณ์ํด์ ์ฌ๋ผ๊ฐ๋ค๊ฐ 18์์ Peak๋ฅผ ์ฐ๊ณ ๋ด๋ ค๊ฐ๋ค. 14์-18์ ์ฌ์ด์๋ ๋ฌด์กฐ๊ฑด ์ด์ ๋๋ฅผ ์ก๊ณ ์์ด์ผ ํ๋ค๋ ๊ฒฐ๋ก ์ด ๋์จ๋ค. ๋ง์ฝ Top 5 ์ฅ์๋ค์ด ๊ฐ๊ฐ ๋ค๋ฅธ ์๊ฐ๋ ํน์ฑ์ ๊ฐ์ง๊ณ ์๋ค๋ฉด ์ด๋จ๊น? Upper East Side๋ ์์นจ์ Peak๋ฅผ ์ฐ๊ณ , Penn Station์ ์ ๋ ์ Peak๋ฅผ ์ฐ๋๋ค๋ฉด, ๋น์ ์ ์์นจ์ Upper East Side๋ก ์ถ๊ทผํ๊ณ Penn Station์์ ํด๊ทผํ๋ฉด ๋ ๊ฒ์ด๋ค.
C. Top 5 ์ฅ์๋ณ ์๊ฐ๋ Trips ๋น๋์
# query
query = '''
SELECT
pickup_time,
COUNT(*) AS trips
FROM
c
GROUP BY
pickup_time
'''
# ์ฅ์๋ณ ์๊ฐ๋๋ณ ํน์ง
for i in ['Upper East Side South', 'Upper East Side North', 'Midtown Center', 'Penn Station/Madison Sq West', 'Lincoln Square East']:
c = df_c[df_c['pzone'] == i]
c.createOrReplaceTempView('c')
df_time = spark.sql(query).toPandas()
# A. ์๊ฐ๋๋ณ ์ถํด๊ทผ plot ๊ทธ๋ฆฌ๊ธฐ
fig, ax = plt.subplots(figsize=(16,6))
plt.title(f'trips count at {i}', fontsize = 30)
sns.barplot(
x = 'pickup_time',
y = 'trips',
data = df_time
)
[1๋ฑ Upper East Side South]
ํน์ง: ์์นจ์ trips ์ ์กฐ. 14์์ ์ต๊ณ ์ ์ ์ฐ๊ณ ํ๋ฝํ๋ค.
[2๋ฑ Upper East Side North]
ํน์ง: 8์๋ถํฐ trips๊ฐ ํ๋ฐ. 15์์ ์ ์ ์ ์ฐ๊ณ ํ๋ฝ.
[3๋ฑ Midtown Center]
ํน์ : ์ค์ ์๋ ๋ง์ด ์ ์กฐํ๋ค. ์ ๋ 16-19์์ ํ๋ฐํ๋ค.
[4๋ฑ Penn Station/Madison Sq West]
ํน์ง: ์์นจ 6์์๋ ํ๋ฐํ๋ค. ์ถ/ํด๊ทผ ์ํฅ ๋ง์ด ๋ฐ๋๋ค. 18์์ ์ ์ .
[5๋ฑ Lincoln Square East]
ํน์ง: 8-9์๋ถํฐ ํ๋ฐ. 14์-18์๊น์ง ์ค์ํ trips ๋น๋๋ฅผ ๋ณด์ธ๋ค.
Pick up์ด ์ ์ผ ๋ง์ด ๋ฐ์ํ๋ ์ง์ญ Top 5๋ฅผ ์์๋ณด๊ณ , ๊ฐ ์ฅ์๋ณ ์๊ฐ๋ ์ํฉ์ ์ดํด๋ณด์๋ค.
์ด๋ก์จ '์๋์ ๋ง์ด ๋ฐ๊ธฐ ์ํ' ๋น์ ์ ๊ทผ๋ฌด ๊ฒฝ๋ก๊ฐ ์ผ์ถ ์ง์๋ค.
[Penn Station -> Upper East Side]
์์นจ์ ์ถ๊ทผํ๋ฉด, ๋จผ์ Penn Station์ผ๋ก ๋ฌ๋ ค๊ฐ์ ์ถ๊ทผ์๋ค์ Pick up ํด์ผ ํ๋ค. Penn Station ์ง์ญ์ ๋ค๋ฅธ ์ง์ญ์ ๋นํด ์์นจ๋ถํฐ ์ด์ฉ์๊ฐ ์๋์ ์ผ๋ก ๋ง์ด ๋๋ฌธ์ด๋ค.
๊ทธ๋ฌ๋ค๊ฐ ์ฌ์ฌ ์ ์ฌ์๊ฐ์ด ๋ค๊ฐ์ค๋ฉด, ๋ค๋ฅธ ์ง์ญ์์ ๊ฐ๋จํ๊ฒ ๋ฐฅ์ ๋จน๋๋ค. ์๊ณค์ฆ์ด ๋ชฐ๋ ค์ฌ ๋ ์ฆ์, Upper East Side์์ ์ ๋
๊น์ง ๋์๋ค๋
์ผ ํ๋ค.
ํ์ง๋ง ์ค์ ์ํฉ์์ ๋ค๋ฅผ ์ ์๋ค. ๊ตํต์ด ๋งํ์ ์คํ๋ ค ์๋์ ๋ชป ๋ฐ๊ฑฐ๋, ๊ฒฝ์์(ํ์ ๊ณต๊ธ์)๊ฐ ๋๋ฌด ๋ง์ ์คํ๋ ค ์๋ ์ก๊ธฐ๊ฐ ๋น ๋ฏํ ์ ์๋ค. ๋ฐ์ดํฐ๋ฅผ ์ด์ฉํ๋ ์ค์ ์ํฉ๊ณผ ์ ์กฐํํ์ฌ ์ ๋ต์ ์ง์ผํ ๊ฒ์ด๋ค.
์ง๊ธ๊น์ง ์๋์ ๋ง์ด ๋ฐ๋ ์ ๋ต์ ๋ฐ์ดํฐ๋ฅผ ํตํด ์ง๋ณด์๋ค.
๋ค์ ํฌ์คํ ์์ ํ๊ณผ ์๊ธ, ๊ฑฐ๋ฆฌ๋ฅผ ๋ฐํ์ผ๋ก ์ ๋ต์ ๋ ์ธ์ธํ๊ฒ ์ง ๋ณด๋๋ก ํ๊ฒ ๋ค.