什么是Spark Streaming
Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。
Spark Streaming有哪些优点?
1 易用
2 容错
3 易整合到Spark体系
Spark与Storm的对比
DStream
1.什么是DStream
Discretized Stream是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,对数据的操作也是按照RDD为单位来进行的,计算过程由Spark engine来完成
2.DStream相关操作
DStream上的原语与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
Transformations on DStreams
Transformation Meaning
1 |
|
特殊的Transformations
1.UpdateStateByKey Operation
UpdateStateByKey原语用于记录历史记录,上文中Word Count示例中就用到了该特性。若不用UpdateStateByKey来更新状态,那么每次数据进来后分析完成后,结果输出后将不在保存
2.Transform Operation
Transform原语允许DStream上执行任意的RDD-to-RDD函数。通过该函数可以方便的扩展Spark API。此外,MLlib(机器学习)以及Graphx也是通过本函数来进行结合的。
3.Window Operations
Window Operations有点类似于Storm中的State,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的允许状态
Output Operations on DStreams
1 |
|
1.用Spark Streaming实现实时WordCount
安装并启动生成者
首先在一台Linux(ip:192.168.100.23)上用yum安装nc工具
yum install -y nc
启动一个服务端并监听9999端口
nc -lk 7777
2.编写Spark Streaming程序1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
object NetworkWordCount {
def main(args: Array[String]) {
//设置日志级别
LoggerLevel.setStreamingLogLevels()
//创建SparkConf并设置为本地模式运行
//注意local[2]代表开两个线程
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
//设置DStream批次时间间隔为2秒
val ssc = new StreamingContext(conf, Seconds(2))
//通过网络读取数据
val lines = ssc.socketTextStream("192.168.10.101", 9999)
//将读到的数据用空格切成单词
val words = lines.flatMap(_.split(" "))
//将单词和1组成一个pair
val pairs = words.map(word => (word, 1))
//按单词进行分组求相同单词出现的次数
val wordCounts = pairs.reduceByKey(_ + _)
//打印结果到控制台
wordCounts.print()
//开始计算
ssc.start()
//等待停止
ssc.awaitTermination()
}
启动Spark Streaming程序:由于使用的是本地模式”local[2]”所以可以直接在本地运行该程序
注意:要指定并行度,如在本地运行设置setMaster(“local[2]”),相当于启动两个线程,一个给receiver,一个给computer。如果是在集群中运行,必须要求集群中可用core数大于1
4.在生产者端命令行中输入单词
5.在控制台中查看结果
单词实时累加1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
object NetworkUpdateStateWordCount {
/**
* String : 单词 hello
* Seq[Int] :单词在当前批次出现的次数
* Option[Int] : 历史结果
*/
val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {
//iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(0)).map(x=>(it._1,x)))
iter.flatMap{case(x,y,z)=>Some(y.sum + z.getOrElse(0)).map(m=>(x, m))}
}
def main(args: Array[String]) {
LoggerLevel.setStreamingLogLevels()
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkUpdateStateWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
//做checkpoint 写入共享存储中
ssc.checkpoint("c://aaa")
val lines = ssc.socketTextStream("192.168.10.100", 9999)
//reduceByKey 结果不累加
//val result = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
//updateStateByKey结果可以累加需要传入一个自定义的累加函数:updateFunc
val results = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
results.print()
ssc.start()
ssc.awaitTermination()
}
Spark Streaming整合Kafka完成网站点击流实时统计
1.安装并配置zk
2.安装并配置Kafka
3.启动zk
4.启动Kafka
5.创建topic1
2bin/kafka-topics.sh --create --zookeeper node1.test.cn:2181,node2.test.cn:2181 \
--replication-factor 3 --partitions 3 --topic urlcount
6.编写Spark Streaming应用程序
1 | object UrlCount { |