kafka压力、数据丢失测试

运行环境 | centos 7.0 | kafka_2.11-1.1.1 | opt/zookeeper-3.4.6 | flume-1.8.0 | nginx-openresty/1.13.6.2

本文主要通过Nginx做日志服务器 以POST请求生成日志(每次请求约为275字节,JSON格式),利用lua脚本变为相应格式,flume指定SourceTailDirSoucekafka Channel、配置ACK应答机制a1.channels.c1.kafka.producer.acks = -1失败重试次数a1.channels.c1.kafka.producer.retries = 3;kafka主题设定三个分区两个副本

主机名 备注 操作系统 运行服务 配置
master 控制节点 CentOs7 kafka、zookeeper、nginx、flume 1C 2GM
slave0 计算节点 CentOs7 kafka、zookeeper 1C 1.5GM
slave1 块存储节点 CentOs7 kafka、zookeeper 1C 1.5GM

注:C-核 M-内存

启动nginx

1
/usr/local/openresty/nginx/sbin/nginx

启动zookeeper

1
/opt/zookeeper-3.4.6/bin/zkServer.sh start

启动flume

1
bin/flume-ng agent -n a1 -c conf/ -f myconf/nginx-kafka-kill.conf -Dflume.root.logger=INFO,consoledate

启动kafka

1
/opt/kafka_2.11-1.1.1/bin/kafka-server-start.sh -daemon /opt/kafka_2.11-1.1.1/config/server.properties

以下为 IDEA 程序输出log数 与 Nginx采集到的log数、各个节点kafka消费到的log数对比

IDEA log Nginx log Master消费 slave0消费 slave1消费
1000 842 701 722 732
1500 1269 1041 1086 1111
2000 1727 1480 1493 1461
2500 2022 1665 1739 1655
3000 2315 1892 1983 2163
5000 4800 2924 2924 —-

期间杀掉master节点kafka进程,slave0节点停止消费(这里很疑惑)

查看主题详情

1
2
3
4
5
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testkill
Topic:testkill PartitionCount:3 ReplicationFactor:2 Configs:
Topic: testkill Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1
Topic: testkill Partition: 1 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: testkill Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2

IDEA程序中设定5000次请求,请求完后,启动kafka进程

恢复后查看节点详情

1
2
3
4
5
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic testkill
Topic:testkill PartitionCount:3 ReplicationFactor:2 Configs:
Topic: testkill Partition: 0 Leader: 1 Replicas: 0,1 Isr: 1,0
Topic: testkill Partition: 1 Leader: 2 Replicas: 1,2 Isr: 2,1
Topic: testkill Partition: 2 Leader: 2 Replicas: 2,0 Isr: 2,0

kafka启动后,消费之前没有消费的数据,进本地磁盘各分区查找比对数据,均在

结论:初步认定kafka高可用,数据0丢失(杀掉master节点kafka进程,slave0节点停止消费这里有待验证),吞吐量2000条一下压力正常,随着时间推移数据量越大积压越大

kafka offset 维护方式

  • zookeeper维护

使用zookeeper来维护offset
kafka 0.9 以前的版本是将offset 存储在zookeeper上的,kafka在传输数据时,数据消费成功就会修改偏移量,这样就可以保证数据不会丢失而导致传输出错;但是这也存在一个问题:那就是每次消费数据时都要将数据的offset写入一次,效率比较低,而且zookeeper与kafka的offset变化确认也需要走网络IO,这样就会给offset的维护带来不稳定性和低效。

  • kafka自己维护offset

使用broker来维护offset
kafka 0.9 以后,offset的使用了内部的roker来管理,这样仅仅只需要broker,而不要zookeeper来维护,都是将topic提交给__consumer_offsets函数来执行。

  • 也可以将kafka偏移量存入redis中,利用redis事务性维护offset

kafka消息的位置

含义 名称
earlieastLeaderOffsets 存储在broker上的leader节点的最早的消息偏移量
consumerOffsets 消费者消费的消息偏移量位置

情况一:正常情况下,消费的消息偏移量应该大于broker上存储的最早的消息偏移量,即 A < B

情况二:如果A 依然小于 B,则仍可以正常消费

情况三:然而,当 A > B 时,则说明还没有被消费的消息已经被清除

此种情况会抛出 kafka.common.OffsetOutOfRangeException 异常。

在没有外部系统清除kafka消息的情况下,协调设置broker的最大保留大小 log.retention.bytes 和 最大保留时间log.retention.hours 等,来配合消费者端的读取消息。可以通过读取和监控消费者消费的offsets,来保证消息不会被意外清除。

本文结束感谢您的阅读,本文原创–支持原创
顺便打点赏吧~ 有问题请联系我--strivedeer@163.com