Cassandra + Spark
1. Installation
1.1 Installing Cassandra DB + CQLSH
- [https://cassandra.apache.org//download.html](https://cassandra.apache.org//download.html) 들어갑니다.
- Latest GA Version 을 다운로드 받습니다.
아래와 같이 설치 가능합니다.
$ wget https://dlcdn.apache.org/cassandra/4.1.5/apache-cassandra-4.1.5-bin.tar.gz
$ tar -zxvf apache-cassandra-4.1.5-bin.tar.gz
# 원하는 장소로 이동
$ mv apache-cassandra-4.1.5 ~/apps/
# 이후 .bashrc (ubuntu) 또는 .bash_profile (mac) 을 설정합니다.
$ vi ~/.bashrc
다음과 같이 내용을 (수정 필요) .bashrc 또는 .bash_profile 에 넣습니다.
# Cassandra
CASSANDRA_HOME=/home/anderson/apps/apache-cassandra-4.1.5
export PATH=$PATH:$CASSANDRA_HOME/bin
Cassandra 실행도 시켜봅니다. in
$ cassandra -f
버젼 확인및 접속
$ cqlsh --version
cqlsh 6.1.0
# 접속
$ cqlsh localhost 9042
1.2 Installing only CQLSH
아래와 같이 하면 된다고 하는데, 저는 안되서 그냥 위에꺼 전체 설치 했습니다.
$ pip install cqlsh
2. Cassandra Quick Reference
2.1 Node Status
현재 노드 상태 확인
$ nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host ID Rack
UN 127.0.0.1 205.33 KiB 16 100.0% 36cbcf5a-4753-4491-8fec-2dd168613512 rack1
그외
- nodetool info : 각종 시스템 정보
- nodetool ring: 노드가 소유한 토큰의 분포 확인 가능
2.2 Basic CQL
Show databases / tables 같은거
# 기본적인 것
desc keyspaces
desc tables
만약 CQLSH 버젼이 높고 -> 서버는 3.11.5 같은 낮은 버젼일 경우 desc 가 작동을 안합니다.
이 경우에는 다음과 같은 명령어로 가능합니다.
SELECT keyspace_name, table_name FROM system_schema.tables;
KeySpace or Table 생성
-- KeySpace 생성
CREATE KEYSPACE IF NOT EXISTS my_keyspace
WITH replication = {
'class': 'SimpleStrategy',
'replication_factor': 1};
- SimpleStrategy: 단일 데이터센터 내에서 데이터 복제. 여러개 데이터 센터의 경우 NetworkTopologyStrategy 사용
- Replication Factor: 복제본 갯수. 3이라면 클러스터내의 노드중 3개에 데이터가 복제
NetworkTopologyStrategy 사용시 다음과 같이 생성 가능. dc1에 3개 복제하고, dc2에 2개 복제
CREATE KEYSPACE IF NOT EXISTS example_keyspace
WITH replication = {
'class': 'NetworkTopologyStrategy',
'dc1': 3,
'dc2': 2
};
2.3 JSON Select & Insert
특이하게도, JSON 형식으로 출력하거나, json을 insert 할 수도 있습니다.
SELECT JSON * FROM keyspace.table;
여기서 출력한 것을 복사한후, insert 할수도 있습니다.
INSERT INTO keyspace.table JSON
`<json string here>`;
3. Java + Spark Example
3.1 Gradle
- com.datastax.spark:spark-cassandra-connector_2.12:3.4.1: Spaprk 에서 Cassandra 접속 가능
- com.datastax.oss:java-driver-core:4.17.0: 다이렉트로 Cassandra DB 에 접속 가능 / Spark 없어도 됨
dependencies {
implementation "com.github.jnr:jnr-posix:3.1.15"
implementation 'joda-time:joda-time:2.12.7'
implementation group: 'org.projectlombok', name: 'lombok', version: '1.18.34'
implementation 'org.apache.spark:spark-core_2.12:3.4.1'
implementation 'org.apache.spark:spark-sql_2.12:3.4.1'
implementation 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.1'
// implementation 'com.datastax.oss:java-driver-core:4.17.0'
testImplementation platform('org.junit:junit-bom:5.10.0')
testImplementation 'org.junit.jupiter:junit-jupiter'
}
3.2 Spark Setup + Adding Data to Cassandra
@BeforeEach
public void setup() {
SparkConf conf = new SparkConf()
.setAppName("Local Spark Example")
.setMaster("local[2]")
// .set("spark.cassandra.auth.username", "user_id")
// .set("spark.cassandra.auth.password", "password")
// .set("spark.cassandra.input.throughputMBPerSec", "1")
.set("spark.cassandra.connection.host", "127.0.0.1");
spark = SparkSession.builder()
.config(conf)
.getOrCreate();
addTestData();
}
protected void addTestData() {
try (CqlSession session = CqlSession.builder()
.addContactEndPoint(
new DefaultEndPoint(new InetSocketAddress("localhost", 9042)))
.withLocalDatacenter("datacenter1")
// .withAuthCredentials("your_username", "your_password") // 사용자 인증 정보 추가
.build()) {
String createKeySpace = "CREATE KEYSPACE IF NOT EXISTS my_keyspace "
+ "WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};";
String createTable =
"CREATE TABLE IF NOT EXISTS my_keyspace.users ("
+ "uid UUID PRIMARY KEY, "
+ "name text, "
+ "age int, "
+ "married boolean,"
+ "created_at timestamp);";
System.out.println(createTable);
session.execute(createKeySpace);
session.execute(createTable);
}
// 데이터 생성
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("uid", DataTypes.StringType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("age", DataTypes.IntegerType, false),
DataTypes.createStructField("married", DataTypes.BooleanType, false),
DataTypes.createStructField("created_at", DataTypes.TimestampType, false)
});
Timestamp timestamp = new Timestamp(new Date().getTime());
Dataset<Row> userData = spark.createDataFrame(Arrays.asList(
RowFactory.create(UUID.randomUUID().toString(), "Anderson", 40, true, timestamp),
RowFactory.create(UUID.randomUUID().toString(), "Alice", 25, false, timestamp),
RowFactory.create(UUID.randomUUID().toString(), "Yoona", 21, false, timestamp)
), schema);
userData.write()
.format("org.apache.spark.sql.cassandra")
.mode(SaveMode.Append)
.option("keyspace", "my_keyspace")
.option("table", "users")
.save();
}
3.3 Spark 로 데이터 다 가져오기
public void readAllTable() {
// 방법 1
// Spark 에서 전체 데이터를 다 가져오기.
Dataset<Row> df = spark.read()
.format("org.apache.spark.sql.cassandra")
.option("keyspace", "my_keyspace")
.option("table", "users")
.load();
assertTrue(df.count() >= 3);
Row andersonRow = df.filter("name = 'Anderson'").first();
assertEquals(40, (int) andersonRow.getAs("age"));
assertEquals(true, andersonRow.getAs("married"));
df.show();
}
3.4 Spark Cassandra Connector 사용 - 1번
버젼에 따라서 이게 될수도 있고, 2번이 될수도 있음. 회사에서는 해당 1번은 안되고, 2번이 됐는데, 내 컴퓨터에서는 그 반대.
public void readThroughCassandraConnector1() {
CassandraTableScanJavaRDD<CassandraRow> rdd =
javaFunctions(spark.sparkContext())
.cassandraTable("my_keyspace", "users")
.select(column("uid"),
column("name"),
column("age"),
column("married"),
column("created_at").as("createdAt"),
writeTime("name").as("writetime"));
JavaRDD<Row> javaRdd = rdd.map(row -> {
return RowFactory.create(
row.getString("uid"),
row.getString("name"),
row.getInt("age"),
row.getBoolean("married"),
new Timestamp(row.getLong("createdAt")),
row.getLong("writetime"));
});
StructType schema = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("uid", DataTypes.StringType, false),
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("age", DataTypes.IntegerType, false),
DataTypes.createStructField("married", DataTypes.BooleanType, false),
DataTypes.createStructField("createdAt", DataTypes.TimestampType, false),
DataTypes.createStructField("writetime", DataTypes.LongType, false)
});
Dataset<Row> dataset = spark.createDataFrame(javaRdd, schema);
dataset.show();
System.out.println(dataset);
}
3.5 Spark Cassandra Connector 사용 - 2번
public void readThroughCassandraConnector2() {
// Spark Cassandra Connector를 사용해서, 좀더 자세한 정보를 가져오는 방법
// 회사에서는 됐는데, 지금 여기서는 안됨. select 에서 empty 가 나옴.
CassandraTableScanJavaRDD<DataBean> rdd = javaFunctions(spark.sparkContext())
.cassandraTable("my_keyspace", "users", mapRowTo(DataBean.class))
.select(column("uid"),
column("name"),
column("age"),
column("married"),
column("created_at").as("createdAt"),
writeTime("name").as("writetime"));
JavaRDD<Row> javaRdd = rdd.map(row -> {
return RowFactory.create(
row.getUid(),
row.getName(),
row.getAge(),
row.getMarried(),
row.getCreatedAt(),
row.getWritetime()
);
});
Dataset<Row> dataset = spark.createDataFrame(javaRdd, DataBean.class);
dataset.show();
}
DataBean.java
package ai.incredible.cassandra;
import lombok.Data;
import lombok.ToString;
import java.sql.Timestamp;
@Data
@ToString
public class DataBean {
protected String uid;
protected String name;
protected Integer age;
protected Boolean married;
protected Timestamp createdAt;
protected Long writetime;
}
3.1 CQL Session 으로 Direct Connection
// CQL 로 direct 접속을 해서 데이터를 가져옵니다.
// 해당 방법은 spark.read() 를 사용하는 것이 아니며, 이를 spark 에서 사용시에
// driver 에서 바로 가져오는 것이기 때문에 distributed loading 이 되는 것이 아닙니다.
// Spark 에서 쓰는 것 보다는 따로 CQL 로 접속해야 할때 사용하면 좋은 방법입니다.
try (CqlSession session = CqlSession.builder()
.addContactEndPoint(
new DefaultEndPoint(new InetSocketAddress("localhost", 9042)))
.withLocalDatacenter("datacenter1")
// .withAuthCredentials("your_username", "your_password") // 사용자 인증 정보 추가
.build()) {
// 중요한점! ALLOW FILTERING 에 끝에 들어갔음.
// Cassandra 에서는 WHERE statement 가 연산량이 많은듯 함.
// 그래서 WHERE 사용시 반드시 뒤에 ALLOW FILTERING 써줘야 함
// 또한 setPageSize 를 통해서 한번에 얼마나 가져올지를 정함
String query = "SELECT name, age, WRITETIME(name) as created_at "
+ "FROM my_keyspace.users WHERE name='Anderson' ALLOW FILTERING;";
ResultSet resultSet = session.execute(SimpleStatement.builder(query)
.setPageSize(5).build());
List<Row> rows = new ArrayList<>();
do {
for (com.datastax.oss.driver.api.core.cql.Row cassandraRow : resultSet) {
rows.add(RowFactory.create(
cassandraRow.getString("name"),
cassandraRow.getInt("age"),
new Timestamp(cassandraRow.getLong("created_at") / 1000)
));
}
} while (!resultSet.isFullyFetched());
StructType schema2 = DataTypes.createStructType(new StructField[] {
DataTypes.createStructField("name", DataTypes.StringType, false),
DataTypes.createStructField("age", DataTypes.IntegerType, false),
DataTypes.createStructField("created_at", DataTypes.TimestampType, false)
});
Dataset<Row> df2 = spark.createDataFrame(rows, schema2);
df2.show();