1. CSV to Parquet

1.1 Default Initialization

from matplotlib import pylab as plt
from pyspark.sql import SparkSession


spark = (
    SparkSession.builder.master("local[*]")
    .appName("Sales")
    .config("spark.ui.port", "4050")
    .getOrCreate()
)

1.2 Read CSV

from pyspark.sql import types as t

data = (
    spark.read.option("inferSchema", True)
    .option("delimiter", ",")
    .option("header", True)
    .csv("vgsales.csv")
)

# type casting
data = data.withColumn("Year", data.Year.cast(t.IntegerType()))

# Set Temporary Table
data.createOrReplaceTempView("sales")

print("n rows:", data.count())
data.show(3)
data.limit(3).toPandas()

1.3 Missing Values

from pyspark.sql import functions as F

def display_missing_count(df):
    missing_df = df.select(
        [
            F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c)
            for c in df.columns
        ]
    ).toPandas()
    display(missing_df)


display_missing_count(data)

아래는 missing values 를 제거 합니다.

from functools import reduce

def filter_missing_values(df):
    return df.where(
        reduce(
            lambda a, b: a & b,
            [~F.isnan(col) & F.col(col).isNotNull() for col in df.columns],
        )
    )


data = filter_missing_values(data)
display_missing_count(data)

1.4 GroupBy

fig, ax = plt.subplots(1, figsize=(4, 3))
(
    data.groupBy("Genre")
    .count()
    .orderBy(F.col("count").desc())
    .toPandas()
    .plot(x="Genre", y="count", kind="bar", ax=ax)
)

1.5 SQL Query

query = """
SELECT *
FROM (SELECT *,
             RANK(Global_Sales) OVER (PARTITION BY Year ORDER BY Global_Sales) as rank_sales
      FROM sales) t
WHERE t.rank_sales = 1
ORDER BY Year
"""
df = spark.sql(query).toPandas()

1.5 Write to Parquet

sales.parquet 디렉토리가 생성됩니다.

# mode: overwrite, append (새로운 파일이 만들어짐)
data.write.mode("overwrite").parquet("sales.parquet")
└── 01 CSV to Parquet.ipynb
    ├── part-00000-3712a917-89eb-4b60-93f3-9f3973cabb0b-c000.snappy.parquet
    └── _SUCCESS