Java Spark for ETL
1. Installation
1.1 Spark Installation
https://spark.apache.org/downloads.html 링크에 들어가서 spark-3.4.1-bin-hadoop3.tgz 를 눌러서 다운로드 받습니다.
$ tar -zxvf spark-3.4.1-bin-hadoop3.tgz
$ mv spark-3.4.1-bin-hadoop3 ~/app/
$ ln -s ~/app/spark-3.4.1-bin-hadoop3 ~/app/spark
.bashrc 파일을 열어서 다음을 추가 합니다.
export SPARK_HOME=/home/anderson/app/spark
export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9.7-src.zip:$PYTHONPATH
export PATH=$SPARK_HOME/bin:$SPARK_HOME/python:$PATH
2. Java Spark ETL
2.1 Gradle Configuration
build.gradle 파일에 다음의 스파크 라이브러리를 추가 합니다.
중요한건 spark-hive_2.13
을 추가해야지 enableHiveSupport() 함수를 사용할 수 있습니다.
dependencies {
implementation group: 'org.apache.spark', name: 'spark-sql_2.13', version: '3.3.1'
implementation group: 'org.apache.spark', name: 'spark-hive_2.13', version: '3.4.1'
}
2.2 Parquet in Hive SQL
2.2.1 초기화
다음과 같이 spark session을 초기화 합니다.
.enableHiveSupport() 를 해야지 SQL사용이 가능합니다.
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
<생략>
SparkSession spark = SparkSession.builder()
.master("local[2]")
.appName("Spark Example")
.config("spark.ui.port", "4050")
.config("spark.driver.memory", "2G")
.config("spark.executor.memory", "4G")
// .config("spark.sql.warehouse.dir", "/Users/anderson/hive-warehouse")
// .config("spark.driver.extraJavaOptions", '-Dderby.system.home=/Users/anderson/metastore_db")
.enableHiveSupport()
.getOrCreate();
2.2.2 간단한 SQL
간단한 SQL은 다음과 처리 가능합니다.
// .show() 를 사용하면 결과를 standard output 으로 출력
spark.sql("show databases").show();
spark.sql("use default");
spark.sql("select current_database()");
+---------+
|namespace|
+---------+
| default|
+---------+
2.2.3 Parquet 파일 불러오기
Parquet Data 를 불러오는 것은 다음과 같이 합니다.
// /data 디렉토리안에 parquet 파일들이 존재함
String dataPath = String.valueOf(this.getClass().getResource("/data"));
Dataset<Row> data = spark.read().parquet(dataPath);
data.show(5);
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
| registration_dttm| id|first_name|last_name| email|gender| ip_address| cc| country|birthdate| salary| title|comments|
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
|2016-02-03 16:55:29| 1| Amanda| Jordan| ajordan0@com.com|Female| 1.197.201.2|6759521864920116| Indonesia| 3/8/1971| 49756.53| Internal Auditor| 1E+02|
|2016-02-04 02:04:03| 2| Albert| Freeman| afreeman1@is.gd| Male|218.111.175.34| | Canada|1/16/1968|150280.17| Accountant IV| |
|2016-02-03 10:09:31| 3| Evelyn| Morgan|emorgan2@altervis...|Female| 7.161.136.94|6767119071901597| Russia| 2/1/1960|144972.51| Structural Engineer| |
|2016-02-03 09:36:21| 4| Denise| Riley| driley3@gmpg.org|Female| 140.35.109.83|3576031598965625| China| 4/8/1997| 90263.05|Senior Cost Accou...| |
|2016-02-03 14:05:31| 5| Carlos| Burns|cburns4@miitbeian...| |169.113.235.40|5602256255204850|South Africa| | null| | |
+-------------------+---+----------+---------+--------------------+------+--------------+----------------+------------+---------+---------+--------------------+--------+
only showing top 5 rows
2.2.3 Hive Temporary Table 사용
Temporary Table 을 만들고 SQL처리는 다음과 같이 합니다.
// SQL 을 사용할수 있도록 만듬
data.createOrReplaceTempView("users");
Dataset<Row> countryData = spark.sql("SELECT country, count(*) FROM users GROUP BY country");
countryData.show();
+--------------------+--------+
| country|count(1)|
+--------------------+--------+
| Russia| 310|
| Paraguay| 8|
<생략>
| Afghanistan| 21|
+--------------------+--------+
only showing top 20 rows
2.2.3 Dataset - Filter 함수
아래코드처럼 column 을 지정해서 데이터를 가져올 수 있습니다.
Dataset<Row> countries = data
.filter(data.col("country")
.equalTo("Japan"));
countries.show();
또는 lambda 함수를 써서 만들수도 있습니다.
Dataset<Row> salaries = data.filter((Row row) -> {
Double salary = row.getAs("salary");
if (salary == null) {
return false;
}
return salary > 200_000.;
});
3. Spark must-know things
RDD, DataFrame, and DataSet
RDD | DataFrame | Dataset | |
---|---|---|---|
Spark Version | 1.0 | 1.3 | 1.6 |
Year | 2011 | 2013 | 2015 |
Scheme | No | Yes | Yes |
API Level | Low | High | High |
RDD
- Resilient Distributed Dataset
- 중요 포인트는 transformation methods (map, filter, reduce 함수) 리턴은 다시 동일한 RDD 형태를 리턴한다.
- collect() 또는 saveAsObjectFile() 같은 함수가 실행되기 전까지 실제 실행되는 것은 아니다.
rdd.filter(_.age > 21)
DataFrame
- 1.3에서 제공되기 시작했고, 속도 그리고 scalability 가 주요 포인트이다.
- scheme 그리고 serialization을 통해서 노드 사이의 데이터 통신을 더 빠르게 하였다.
- 주로 Scala 에서 많이 활용되며, Java는 제한적인 부분들이 있다.
- RDD가 low level 로서 모든 데이터를 처리 가능하다면, DataFrame 은 tabular dataset 만 처리 가능하다
- 약점은 compile-time type safety 이다. (여러차례의 transformations 그리고 aggregations 거치다 보면 에러날 확률이 높다)
// runtime때 문제 나기 쉽다
df.filter("age > 21")
DataSet
- 1.6에서 도입되었으며, Encoder 를 도입하여, JVM objects 그리고 Spark 내부의 binary format 에서의 translate 를 담당한다.
- Java, Scala 둘다 동작 잘 됨
- type safety 가 더 좋아졌음으로, DataFrame 에서 나오는 문제는 해결되었지만, Type Casting 에 대한 코드를 더 작성해야 한다.
- 솔까.. 귀찮음.
// runtime 때 문제 날수 있는 부분을 해결한다
dataset.filter(_.age < 21)