Spark Streaming
- Introduction - {:.} Installing SBT - {:.} Maven & SBT Configuration
- Word Count Tutorial - {:.} Spark Master & Slave - {:.} build.sbt - {:.} Scala Code - {:.} Compile & Submit - {:.} Result

Introduction
Spark Streaming은 Core Spark의 extension으로서, 여러 sources로 부터 데이터를 받아서 처리할 수 있도록 도와줍니다. Kafka, Flume, Twitter, ZeroMQ, Kinesis, 또는 TCP Sockets 등등으로 부터 받을 수 있습니다.
내부적으로는 다음과 같이 데이터를 받아서 처리하게 됩니다.
Spark Streaming은 데이터 스트림을 받은후, 데이터를 Batches로 나누게 됩니다.
이후 Spark Engine은 배치를 처리하게 됩니다.
Installing SBT
Maven & SBT Configuration
Maven
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.2.0</version>
</dependency>
SBT
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.2.0"
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka_2.10 |
Flume | spark-streaming-flume_2.10 |
Kinesis | spark-streaming-kinesis-asl_2.10 [Amazon Software License] |
spark-streaming-twitter_2.10 | |
ZeroMQ | spark-streaming-zeromq_2.10 |
MQTT | spark-streaming-mqtt_2.10 |
Word Count Tutorial
Spark Master & Slave
start-master.sh -h hostname
start-slave -h hostname
hostname을 0.0.0.0으로 쓰면 에러가 날수 있습니다. (String으로 쓸것)
Name | Port | Description |
---|---|---|
Spark Master | 8081 | Spark Master 주소를 볼수 있습니다. |
Spark Web Interface | 4040 | |
History Server | 18080 |
build.sbt
name := "WordCount"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.5.2"
libraryDependencies += "junit" % "junit" % "4.10"
Scala Code
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object StreamingTutorial {
def main(args: Array[String]): Unit = {
// Spark Configuration
val conf = new SparkConf()
conf.setAppName("anderson-streaming-tutorial")
conf.setMaster("spark://hostname:7077")
conf.set("spark.ui.conf", "4045")
// Streaming Context
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream
val lines = ssc.socketTextStream("hostname", 9099)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
- 에러 발생시, 0.0.0.0으로 되어 있는 hostname을 String이 들어간 FQDN으로 바꿔줍니다.
Compile & Submit
sbt package
spark-submit --class StreamingTutorial --master spark://sf-dev:7077 target/scala-2.10/wordcount_2.10-1.0.jar
다른 shell화면을 띄우고 확인을 합니다.
nc -l sf-dev 9099
Result
-------------------------------------------
Time: 1469389644000 ms
-------------------------------------------
(b,2)
(hello,1)
(apple,2)
(a,5)
(hi,2)
(c,2)