安装zk集群
1.上传zk安装包
2.解压
1 |
|
2.config/server.properties
添加zk地址:zookeeper.connect=node-1:2181,node-2:2181,node-3:2181
修改broker.id(唯一的):broker.id=0
3.启动1
/bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &
4.创建topic1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic test
5.列出所有topic1
bin/kafka-topics.sh --list --zookeeper localhost:2181
6.向topic中写入数据1
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
7.消费数据1
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
8.查看指定topic的详情1
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
producer配置文件
#指定kafka节点列表,用于获取metadata,不必全部指定
metadata.broker.list=master:9092,work1:9092
指定分区处理类。默认kafka.producer.DefaultPartitioner,表通过key哈希到对应分区
#partitioner.class=kafka.producer.DefaultPartitioner
是否压缩,默认0表示不压缩,1表示用gzip压缩,2表示用snappy压缩。压缩后消息中会有头来指明消息压缩类型,故在消费者端消息解压是透明的无需指定。
compression.codec=none
指定序列化处理类
serializer.class=kafka.serializer.DefaultEncoder
如果要压缩消息,这里指定哪些topic要压缩消息,默认empty,表示不压缩。
#compressed.topics=
设置发送数据是否需要服务端的反馈,有三个值0,1,-1
0: producer不会等待broker发送ack
1: 当leader接收到消息之后发送ack
-1: 当所有的follower都同步消息成功后发送ack.
request.required.acks=0
在向producer发送ack之前,broker允许等待的最大时间 ,如果超时,broker将会向producer发送一个error ACK.意味着上一次消息因为某种原因未能成功(比如follower未能同步成功)
request.timeout.ms=10000
同步还是异步发送消息,默认“sync”表同步,”async”表异步。异步可以提高发送吞吐量,
#也意味着消息将会在本地buffer中,并适时批量发送,但是也可能导致丢失未发送过去的消息
producer.type=sync
在async模式下,当message被缓存的时间超过此值后,将会批量发送给broker,默认为5000ms
此值和batch.num.messages协同工作.
queue.buffering.max.ms = 5000
在async模式下,producer端允许buffer的最大消息量
无论如何,producer都无法尽快的将消息发送给broker,从而导致消息在producer端大量沉积
此时,如果消息的条数达到阀值,将会导致producer端阻塞或者消息被抛弃,默认为10000
queue.buffering.max.messages=20000
如果是异步,指定每次批量发送数据量,默认为200
batch.num.messages=500
当消息在producer端沉积的条数达到”queue.buffering.max.meesages”后
阻塞一定时间后,队列仍然没有enqueue(producer仍然没有发送出任何消息)
此时producer可以继续阻塞或者将消息抛弃,此timeout值用于控制”阻塞”的时间
-1: 无阻塞超时限制,消息不会被抛弃
0:立即清空队列,消息被抛弃
queue.enqueue.timeout.ms=-1
当producer接收到error ACK,或者没有接收到ACK时,允许消息重发的次数
因为broker并没有完整的机制来避免消息重复,所以当网络异常时(比如ACK丢失)
有可能导致broker接收到重复的消息,默认值为3.
message.send.max.retries=3
producer刷新topic metada的时间间隔,producer需要知道partition leader的位置,以及当前topic的情况
因此producer需要一个机制来获取最新的metadata,当producer遇到特定错误时,将会立即刷新
(比如topic失效,partition丢失,leader失效等),此外也可以通过此参数来配置额外的刷新机制,默认值600000
topic.metadata.refresh.interval.ms=60000
server配置文件
#broker的全局唯一编号,不能重复
broker.id=0
#用来监听链接的端口,producer或consumer将在此端口建立连接
port=9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接受套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/export/logs/kafka
#topic在当前broker上的分片个数
num.partitions=2
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#滚动生成新的segment文件的最大时间
log.roll.hours=168
#日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824
#周期性检查文件大小的时间
log.retention.check.interval.ms=300000
#日志清理是否打开
log.cleaner.enable=true
#broker需要使用zookeeper保存meta数据
zookeeper.connect=master:2181,work1:2181,work2:2181
#zookeeper链接超时时间
zookeeper.connection.timeout.ms=6000
#partion buffer中,消息的条数达到阈值,将触发flush到磁盘
log.flush.interval.messages=10000
#消息buffer的时间,达到阈值,将触发flush到磁盘
log.flush.interval.ms=3000
#删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true
#此处的host.name为本机IP(重要),如果不改,则客户端会抛出:Producer connection to localhost:9092 unsuccessful 错误!
host.name=master
1.从TCP端口中读取数据1
2yum install -y nc
nc -lk 9000
#注意:要指定并行度,如在本地运行设置setMaster(“local[2]”),相当于启动两个线程,一个给receiver,一个给computer。
#如果是在集群中运行,必须要求集群中可用core数大于1
#提交spark-streaming任务1
bin/spark-submit --class cn.test.spark.streaming.TCPDemo /root/streaming-1.0.jar
2.结合flume
2.1 flume push方式
#首先启动spark-streaming应用程序1
bin/spark-submit --class cn.test.spark.streaming.FlumeWordCount /root/streaming-1.0.jar
#再启动flmue1
bin/flume-ng agent -n a1 -c conf/ -f conf/flume-push.conf -Dflume.root.logger=WARN,console
2.2 flume poll方式
#首先将下载好的spark-streaming-flume-sink_2.10-1.6.1.jar和scala-library-2.10.5.jar还有commons-lang3-3.3.2.jar三个包放入到flume的lib目录下
#启动flume1
bin/flume-ng agent -n a1 -c conf/ -f conf/flume-poll.conf -Dflume.root.logger=WARN,console
#再启动spark-streaming应用程序1
bin/spark-submit --class cn.test.spark.streaming.FlumePollWordCount /root/streaming-1.0.jar
3.结合kafka
#首先启动zk1
bin/kafka-server-start.sh config/server.properties
#创建topic1
bin/kafka-topics.sh --create --zookeeper 192.168.80.10:2181 --replication-factor 1 --partitions 1 --topic wordcount
#查看主题1
bin/kafka-topics.sh --list --zookeeper 192.168.80.10:2181
#启动一个生产者发送消息1
bin/kafka-console-producer.sh --broker-list 192.168.80.10:9092 --topic wordcount
#启动spark-streaming应用程序
1 | bin/spark-submit --class cn.test.spark.streaming.KafkaWordCount /root/streaming-1.0.jar 192.168.80.10:2181 group1 wordcount 1 |