运行环境 | centos 7.0 | kafka_2.11-1.1.1 | opt/zookeeper-3.4.6 | flume-1.8.0 | nginx-openresty/1.13.6.2
本文主要通过Nginx做日志服务器 以POST请求生成日志(每次请求约为275字节,JSON格式),利用lua脚本变为相应格式,flume指定Source
为TailDirSouce
、kafka Channel
、配置ACK应答机制a1.channels.c1.kafka.producer.acks = -1
失败重试次数a1.channels.c1.kafka.producer.retries = 3
;kafka主题设定三个分区两个副本
主机名 | 备注 | 操作系统 | 运行服务 | 配置 |
---|---|---|---|---|
master | 控制节点 | CentOs7 | kafka、zookeeper、nginx、flume | 1C 2GM |
slave0 | 计算节点 | CentOs7 | kafka、zookeeper | 1C 1.5GM |
slave1 | 块存储节点 | CentOs7 | kafka、zookeeper | 1C 1.5GM |
注:C-核 M-内存
启动nginx
1 | /usr/local/openresty/nginx/sbin/nginx |
启动zookeeper
1 | /opt/zookeeper-3.4.6/bin/zkServer.sh start |
启动flume
1 | bin/flume-ng agent -n a1 -c conf/ -f myconf/nginx-kafka-kill.conf -Dflume.root.logger=INFO,consoledate |
启动kafka
1 | /opt/kafka_2.11-1.1.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.11-1.1.1/config/server.properties |
以下为 IDEA 程序输出log数 与 Nginx采集到的log数、各个节点kafka消费到的log数对比
IDEA log | Nginx log | Master消费 | slave0消费 | slave1消费 |
---|---|---|---|---|
1000 | 842 | 701 | 722 | 732 |
1500 | 1269 | 1041 | 1086 1111 | |
2000 | 1727 | 1480 | 1493 | 1461 |
2500 | 2022 | 1665 | 1739 | 1655 |
3000 | 2315 | 1892 | 1983 | 2163 |
5000 | 4800 | 2924 | 2924 | —- |
期间杀掉master节点kafka进程,slave0节点停止消费(这里很疑惑)
查看主题详情1
2
3
4
5bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testkill
Topic:testkill PartitionCount:3 ReplicationFactor:2 Configs:
Topic: testkill Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
Topic: testkill Partition: 1 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: testkill Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2
IDEA程序中设定5000次请求,请求完后,启动kafka进程
恢复后查看节点详情1
2
3
4
5bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testkill
Topic:testkill PartitionCount:3 ReplicationFactor:2 Configs:
Topic: testkill Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: testkill Partition: 1 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: testkill Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0
kafka启动后,消费之前没有消费的数据,进本地磁盘各分区查找比对数据,均在
结论:初步认定kafka高可用,数据0丢失(杀掉master节点kafka进程,slave0节点停止消费这里有待验证),吞吐量2000条一下压力正常,随着时间推移数据量越大积压越大
kafka offset 维护方式
- zookeeper维护
使用zookeeper来维护offset
kafka 0.9 以前的版本是将offset 存储在zookeeper上的,kafka在传输数据时,数据消费成功就会修改偏移量,这样就可以保证数据不会丢失而导致传输出错;但是这也存在一个问题:那就是每次消费数据时都要将数据的offset写入一次,效率比较低,而且zookeeper与kafka的offset变化确认也需要走网络IO,这样就会给offset的维护带来不稳定性和低效。
- kafka自己维护offset
使用broker来维护offset
kafka 0.9 以后,offset的使用了内部的roker来管理,这样仅仅只需要broker,而不要zookeeper来维护,都是将topic提交给__consumer_offsets函数来执行。
- 也可以将kafka偏移量存入redis中,利用redis事务性维护offset
kafka消息的位置
含义 | 名称 |
---|---|
earlieastLeaderOffsets | 存储在broker上的leader节点的最早的消息偏移量 |
consumerOffsets | 消费者消费的消息偏移量位置 |
情况一:正常情况下,消费的消息偏移量应该大于broker上存储的最早的消息偏移量,即 A < B
情况二:如果A 依然小于 B,则仍可以正常消费
情况三:然而,当 A > B 时,则说明还没有被消费的消息已经被清除
此种情况会抛出 kafka.common.OffsetOutOfRangeException 异常。
在没有外部系统清除kafka消息的情况下,协调设置broker的最大保留大小 log.retention.bytes 和 最大保留时间log.retention.hours 等,来配合消费者端的读取消息。可以通过读取和监控消费者消费的offsets,来保证消息不会被意外清除。