[Kafka] 求助, 同一个服务如何组播消费 kafka 某个 topic 的消息呢?
我现在用的是启动的时候动态生成 groupId, 比如 name + uuid 的方式
但是这样重启后就会导致原来的 consumerGroup 对应的实例都被销毁了.但 kafka 里依然存在原来的 consumerGroup, 监控上看已经被销毁的 consumerGroup 也会发现堆积越来越严重, 有谁知道正确的使用姿势吗??
不胜感激
我现在用的是启动的时候动态生成 groupId, 比如 name + uuid 的方式
但是这样重启后就会导致原来的 consumerGroup 对应的实例都被销毁了.但 kafka 里依然存在原来的 consumerGroup, 监控上看已经被销毁的 consumerGroup 也会发现堆积越来越严重, 有谁知道正确的使用姿势吗??
不胜感激
@hustmisa 可以丢弃, 因为应用上有 ack 超时重试机制, 要命的是重启后老的 groupId 不会自动心跳超时消失, 会在监控上看到消息不断堆积. 其实我想实现的就是 rocketmq 的广播的功能.. 但我们使用 kafka.
for (int i = 0; i < 启动份数; i++) {
if (redisUtil.setnx( + i, “lockthing”,time )) {
bucketConfig.setConsumeZsetBucketNum(i);
log.info(“此实例的消费者为” + i);
break;
}
}
服务启动的时候,第一个服务拿到 consumerId+1 ;
第二个服务拿到 consumerId+2 ;
这样,就复用几个了吧。
另外旧 cg 堆积会有什么负面影响么?除了消息会冗余地保存,直到 retention 设置的时间被清除
如果重启时 Consumer 的 offset 没有什么意义,那就在重启新应用前,删除老的 ConsumerGroup,做一个这种策略不就好了?
然后,kafka 的监控看到堆积是没有什么大影响的,因为消息只存一份。
如果你觉得不爽,可以在实例 shutdown 的时候了,把 consumergroup 注销掉?
比如你一组应用有 10 个实例,那你提前分配好 groupId 名字,如 cg_1, cg_2,…, 然后存到一个地方:DB 、Redis 等
然后每个应用实例启动时 去存储的地方请求分配一个 groupId,用这个 groupId 启动 kafka 消费服务不就完事了
https://github.com/imperfectgo/kafkasub
不过有一点要注意的真的是 IM 的话,因为 kafka 的 partition reblance IO 相当大,可能造成非常大的 E2E 的 latency,这点要注意(虽然可以通过配置限制 IO 来绕过)。 总的来说,其实不适合 IM 这个场景