PySpark for ETL
1. Installation
1. Installing PySpark
다운로드 받고 설치하면 됩니다.
이후 동일한 버젼의 (3.3.2) pyspark 를 설치합니다.
$ pip install pyspark==3.3.2
이후 .bashrc 또는 .bash_profile 등에 spark를 설치한 위치를 설정합니다.
copy & paste 하지말고, 아래 내용을 수정해서 사용합니다.
핵심은 파이썬 뭘 써도 상관없는데 pyspark 버젼은 동일해야 하며, 아래의 환경 설정이 제대로 설정되어 있어야 합니다.
export SPARK_HOME=/home/anderson/app/spark
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH
export PATH=$SPARK_HOME/bin:$SPARK_HOME/python:$PATH
2. Connection
2.1 Hive Connection (안되는듯 함)
사실 가장 자연스럽게 연결 가능한 것이 Hive입니다. Spark 와 찰떡궁합.
필요한 것은 .enableHiveSupport()
를 하면 되고, 그외 정보는 hive-site.xml
에 설정하면 됩니다.
from pyspark.sql import SparkSession
spark = (SparkSession
.builder
.appName("Python Spark SQL Hive integration example")
.enableHiveSupport()
.getOrCreate())
spark.sql("USE <database>")
spark.sql("SELECT * FROM <table>").show()
hive-site.xml 은 다음과 같이 설정합니다.
2.2 Presto Connection
아래와 같이 JDBC 로 Presto 에 연결하는 방법이 있을수 있습니다.
문제는 Spark에서 지원안하는 Presto Types들이 있는데, ARRAY, MAP, ROW 등이며, 이 경우 데이터를 가져오는데 에러가 날수 있습니다.
또한 실행시 테이블 전체를 가져오는 .option("dbtable", "shcema.table")
방식을 사용하던가 또는 ..
.option("query", sql)
을 사용하는 방법.. 둘중에 하나를 하면 됩니다.
from pyspark.sql import SparkSession
# SparkSession 생성
spark = (SparkSession.builder
.appName("Presto JDBC Example")
.config("spark.jars", "/path/to/presto-jdbc-350.jar")
.getOrCreate())
sql = '''
select user_id, name, age, height from db.users
'''
# Presto 테이블 읽기
df = (spark.read
.format("jdbc")
.option("url", "jdbc:presto://presto-databse-url.com:443/hive")
# .option('query', sql)
.option("dbtable", "your_schema.test_table")
.option("driver", "com.facebook.presto.jdbc.PrestoDriver")
.option("user", "user_id")
.option("password", "password")
.load())
3. Parquet
3.1 Loading & Sampling
- spark.sql.warehouse.dir: hive-warehouse 디렉트로에 실제 database 그리고 테이블이 들어감
- spark.driver.extraJavaOptions -Dderby.system.home: metastore_db 안에 데이터베이스 이름, 테이블등등의 데이터가 들어감
- spark.driver.memory: Driver 메모리 설정
- spark.executor.memory: Executor 메모리 설정
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.master("local[*]")
.appName("Spark Test")
.config("spark.ui.port", "4050")
.config('spark.driver.memory', '4G')
.config('spark.executor.memory', '16G')
.config('spark.sql.warehouse.dir', '/Users/anderson/hive-warehouse')
.config('spark.driver.extraJavaOptions', '-Dderby.system.home=/Users/anderson/metastore_db')
.enableHiveSupport()
.getOrCreate()
)
spark.sql('show databases').toPandas()
spark.sql('use anderson_db')
spark.sql('select current_database()').toPandas()
읽기
data = spark.read.parquet('./data')
print('Data Row Sie:', data.count())
data.sample(0.001).toPandas()
3.2 Temporary View Table
data.createOrReplaceTempView("users")
sql = """
SELECT * FROM users
WHERE salary >= 200000
"""
users = spark.sql(sql)
users.toPandas()
3.3 Create Hive Table
saveAsTable('테이블이름')
을 해줘야지 hive-warehouse 에 실제 데이터가 저장이 됨.save('저장 경로')
: 으로 저장하게 되면 그냥 해당 경로로 저장이 되며 hive-warehouse 에 저장이 되는게 아님.
spark.sql("CREATE DATABASE IF NOT EXISTS anderson")
spark.sql("USE anderson")
(
data.write.mode("overwrite")
.partitionBy("country")
.format("parquet")
.saveAsTable("users")
)
spark.sql("select * from anderson.users").sample(0.001).toPandas()
이렇게 생성하면 아래와 같은 구조로 테이블이 생성됩니다.
./spark-warehouse
└── anderson.db
└── users
├── country=%22Bonaire
│ └── part-00000-82e73039-7913-48a0-beef-c2317978be87.c000.snappy.parquet
├── country=Afghanistan
│ ├── part-00000-82e73039-7913-48a0-beef-c2317978be87.c000.snappy.parquet
│ ├── part-00001-82e73039-7913-48a0-beef-c2317978be87.c000.snappy.parquet
│ ├── part-00002-82e73039-7913-48a0-beef-c2317978be87.c000.snappy.parquet
│ ├── part-00003-82e73039-7913-48a0-beef-c2317978be87.c000.snappy.parquet
│ └── part-00004-82e73039-7913-48a0-beef-c2317978be87.c000.snappy.parquet
├── country=Aland Islands
│ └── part-00000-82e73039-7913-48a0-beef-c2317978be87.c000.snappy.parquet
├── country=Albania
│ ├── part-00000-82e73039-7913-48a0-beef-c2317978be87.c000.snappy.parquet
│ ├── part-00001-82e73039-7913-48a0-beef-c2317978be87.c000.snappy.parquet
3.4 Save as parquet files
(
data.write.mode("overwrite")
.partitionBy("country")
.format("parquet")
.save("users")
)
아래와 같은 형식으로 생성됩니다.
./users
├── country=%22Bonaire
│ └── part-00000-31286881-2130-4a6f-9183-d6e8eeaa1c77.c000.snappy.parquet
├── country=Afghanistan
│ ├── part-00000-31286881-2130-4a6f-9183-d6e8eeaa1c77.c000.snappy.parquet
│ ├── part-00001-31286881-2130-4a6f-9183-d6e8eeaa1c77.c000.snappy.parquet
│ ├── part-00002-31286881-2130-4a6f-9183-d6e8eeaa1c77.c000.snappy.parquet
│ ├── part-00003-31286881-2130-4a6f-9183-d6e8eeaa1c77.c000.snappy.parquet
│ └── part-00004-31286881-2130-4a6f-9183-d6e8eeaa1c77.c000.snappy.parquet
├── country=Aland Islands
│ └── part-00000-31286881-2130-4a6f-9183-d6e8eeaa1c77.c000.snappy.parquet
4. CSV to Parquet
4.1 Default Initialization
from matplotlib import pylab as plt
from pyspark.sql import SparkSession
spark = (
SparkSession.builder.master("local[*]")
.appName("Sales")
.config("spark.ui.port", "4050")
# .enableHiveSupport()
.getOrCreate()
)
4.2 Read CSV
from pyspark.sql import types as t
data = (
spark.read.option("inferSchema", True)
.option("delimiter", ",")
.option("header", True)
.csv("vgsales.csv")
)
# type casting
data = data.withColumn("Year", data.Year.cast(t.IntegerType()))
# Set Temporary Table
data.createOrReplaceTempView("sales")
print("n rows:", data.count())
data.show(3)
data.limit(3).toPandas()
4.3 Missing Values
from pyspark.sql import functions as F
def display_missing_count(df):
missing_df = df.select(
[
F.count(F.when(F.isnan(c) | F.col(c).isNull(), c)).alias(c)
for c in df.columns
]
).toPandas()
display(missing_df)
display_missing_count(data)
아래는 missing values 를 제거 합니다.
from functools import reduce
def filter_missing_values(df):
return df.where(
reduce(
lambda a, b: a & b,
[~F.isnan(col) & F.col(col).isNotNull() for col in df.columns],
)
)
data = filter_missing_values(data)
display_missing_count(data)
4.4 GroupBy
fig, ax = plt.subplots(1, figsize=(4, 3))
(
data.groupBy("Genre")
.count()
.orderBy(F.col("count").desc())
.toPandas()
.plot(x="Genre", y="count", kind="bar", ax=ax)
)
4.5 SQL Query
query = """
SELECT *
FROM (SELECT *,
RANK(Global_Sales) OVER (PARTITION BY Year ORDER BY Global_Sales) as rank_sales
FROM sales) t
WHERE t.rank_sales = 1
ORDER BY Year
"""
df = spark.sql(query).toPandas()
4.6 Write to Parquet
sales.parquet
디렉토리가 생성됩니다.
# mode: overwrite, append (새로운 파일이 만들어짐)
data.write.mode("overwrite").parquet("sales.parquet")
└── 01 CSV to Parquet.ipynb
├── part-00000-3712a917-89eb-4b60-93f3-9f3973cabb0b-c000.snappy.parquet
└── _SUCCESS