我大约是把kafka消费不到数据的特殊情况都经历了一遍了吧= =、
kafka消费不到数据的原因,首先检查配置之类的,如是否设置了group.id,对应的topic是否正确等等,这些不多说。
下面是我遇到的几种kafka消费不到数据的情况:
1.多分区,单例消费者的情况,只消费到一个分区,应多加几个消费者,不能用单例,直接subscribe的话,rebalance机制启动,手动的话如下
consumer.Assign(new List(){ new TopicPartitionOffset(new TopicPartition("topic", 1), Offset.Stored) });
2.长时间不消费导致 log.retention.hours或者 log.retention.minutes超时,清除log,Offset.Stored失效
解决办法一:
consumer.Assign(new List(){ new TopicPartitionOffset(new TopicPartition("topic", 1), new Offset(index)) });
此处的index为该分区当前的offset,要自己做存储然后手动配置,可测试用。
解决办法二:见问题三,同样解决方式 但是会从头开始消费新进来的数据
3.我一次加数据太多导致磁盘耗尽,kafka管理员帮我改到20G内存,但是仍然有一部分数据超出,分区offset靠前的数据被清除,导致再次消费不到。清除掉的数据无法再次被消费,但是还保存的数据可以消费到
解决办法:
consumer.Assign(new List(){ new TopicPartitionOffset(new TopicPartition("topic", 1), Offset.Beginning) });
或者在配置中加
auto.offset.reset=smallest //.NET 默认是largestauto.offset.reset=earliest//Java 默认是latest
关于该配置的测试,请看下面的链接