跳至主要內容
  • Hostloc 空間訪問刷分
  • 售賣場
  • 廣告位
  • 賣站?

4563博客

全新的繁體中文 WordPress 網站
  • 首頁
  • [Kafka] 求助, 同一个服务如何组播消费 kafka 某个 topic 的消息呢?
未分類
31 8 月 2020

[Kafka] 求助, 同一个服务如何组播消费 kafka 某个 topic 的消息呢?

[Kafka] 求助, 同一个服务如何组播消费 kafka 某个 topic 的消息呢?

資深大佬 : BBCCBB 20

我现在用的是启动的时候动态生成 groupId, 比如 name + uuid 的方式

但是这样重启后就会导致原来的 consumerGroup 对应的实例都被销毁了.但 kafka 里依然存在原来的 consumerGroup, 监控上看已经被销毁的 consumerGroup 也会发现堆积越来越严重, 有谁知道正确的使用姿势吗??

不胜感激

大佬有話說 (31)

  • 資深大佬 : useben

    生成唯一 groupId, 存到文件, 启动时读文件, 有就用原来的, 没有再生成写到文件…

  • 主 資深大佬 : BBCCBB

    @useben 这个在固定机器上是可以的, 但我们这里 docker 镜像每次都不知道到哪个机器上了.

  • 資深大佬 : SingeeKing

    环境变量?

  • 資深大佬 : hustmisa

    首先 consumer 的使用业务需求是什么,新启动的 groupId 对启动期间的数据是否可以丢弃?
    如果可以丢弃 kafka 配置 retention 很短就可以了,这样不会堆积;如果不能丢弃,配置 retention 长一些这样重启换 groupId 也能续接数据(大概是你现在的方式),但是就不要频繁换 groupId 了啊。我是这么理解的不知道对不对

  • 主 資深大佬 : BBCCBB

    @SingeeKing 都是同一个服务, 不同实例,这个不方便给每个实例加环境变量, 哈哈

    @hustmisa 可以丢弃, 因为应用上有 ack 超时重试机制, 要命的是重启后老的 groupId 不会自动心跳超时消失, 会在监控上看到消息不断堆积. 其实我想实现的就是 rocketmq 的广播的功能.. 但我们使用 kafka.

  • 資深大佬 : mosesyou

    为什么用唯一或者固定的 groupId 不行

  • 主 資深大佬 : BBCCBB

    @mosesyou 这个业务场景需要同一个服务里的不同实例全都消费到每个 MQ

  • 資深大佬 : zardly666

    用 redis 做一个类似选 ID 的东西,服务启动份数等于 ID 份数。

    for (int i = 0; i < 启动份数; i++) {
    if (redisUtil.setnx( + i, “lockthing”,time )) {
    bucketConfig.setConsumeZsetBucketNum(i);
    log.info(“此实例的消费者为” + i);
    break;
    }
    }

    服务启动的时候,第一个服务拿到 consumerId+1 ;
    第二个服务拿到 consumerId+2 ;
    这样,就复用几个了吧。

  • 資深大佬 : zardly666

    代码没删干净,大概意思就是服务启动动态去拿自己所属的 consumer

  • 資深大佬 : wisej

    另一种思路,服务同一个 groupid,分发由服务自己来做(拿到服务其它实例的 ip )

    另外旧 cg 堆积会有什么负面影响么?除了消息会冗余地保存,直到 retention 设置的时间被清除

  • 資深大佬 : sonice

    想多了,consumerGroup 堆积能有多少,起停一次多一个,也不会有很多啊。这也不会导致 zk 性能降低啊

  • 資深大佬 : amwyyyy

    原来 consumerGroup 的堆积只是个数字,消息数据只有一份,不管你有几个 consumerGroup 。过期的 consumerGroup 会被清理掉。

  • 資深大佬 : kifile

    我的理解,题主的意思是因为 ConsumerGroup 的 GroupId 每次重启会重新生成一个新的,导致监控面板上出现了废弃的 groupId 的 Lag 不断增大的现象。

    如果重启时 Consumer 的 offset 没有什么意义,那就在重启新应用前,删除老的 ConsumerGroup,做一个这种策略不就好了?

  • 主 資深大佬 : BBCCBB

    @zardly666 这个倒是可以做, 类似 snowflake 算法 workid 的生成. 但相对较麻烦, 老哥但还有没有简单点的解决办法啊

  • 資深大佬 : mosesyou

    纯 docker 么,如果是 k8s 的话,用 statefulset,可以实现每个实例有固定递增编码 0,1,2….

  • 主 資深大佬 : BBCCBB

    @mosesyou 我们将 docker 镜像上传到云上, 然后后续的流程我得研究一下, 问一下我们负责这一块的同事, 如果可行的话这得确是一个好办法. 多谢.

  • 資深大佬 : yangbonis

    mq 不是本来就组播工作的?所有订阅都会收到。

  • 主 資深大佬 : BBCCBB

    @yangbonis 是需要同一个服务不同实例都收到, 你说的这个大概是不同服务.

  • 資深大佬 : j2gg0s

    @BBCCBB 瞎逼设计,每个实例根据消息在自己的内存里面做些什么工作吗?不能搞个 redis 或者 db ?

    然后,kafka 的监控看到堆积是没有什么大影响的,因为消息只存一份。
    如果你觉得不爽,可以在实例 shutdown 的时候了,把 consumergroup 注销掉?

  • 資深大佬 : j2gg0s

    @j2gg0s 每次重启,重头还是重新开始消费呢?

  • 資深大佬 : sampeng

    @j2gg0s 嗯。然后磁盘就爆了…

  • 主 資深大佬 : BBCCBB

    @j2gg0s 业务场景你都不清楚你瞎 bb 个毛.

  • 主 資深大佬 : BBCCBB

    im 推消息, 量小, 所以还不想做路由中心. 所以采用广播.

  • 資深大佬 : j2gg0s

    @sampeng kafka 的磁盘被爆炸了?

  • 資深大佬 : rockyou12

    我觉得最好用其它 mq,kafka 本来就不适合这种场景,你这业务看起来也不需要持久化,redis 的 sub/pub 可能都够了

  • 資深大佬 : lwldcr

    你这个问题 加一个预处理步骤就可以了吧。

    比如你一组应用有 10 个实例,那你提前分配好 groupId 名字,如 cg_1, cg_2,…, 然后存到一个地方:DB 、Redis 等

    然后每个应用实例启动时 去存储的地方请求分配一个 groupId,用这个 groupId 启动 kafka 消费服务不就完事了

  • 資深大佬 : JKeita

    固定 group id 每次启动清除 offset 怎样?

  • 資深大佬 : yty2012g

    固定 group id,每次设置 offset 到最新应该就可以满足

  • 資深大佬 : IamNotShady

    redis 的 pub/sub 不香吗?

  • 資深大佬 : timonwong

    不要用 High Level Consumer API 就完了,之前用 go 写了一个,也用到了线上一年,不过不保证无 bug

    https://github.com/imperfectgo/kafkasub

  • 資深大佬 : timonwong

    原理是手动维护 offset,如果程序不死 retry 的时候保持 offset,程序死了从最新的来,可以按照自己的需求来调整。

    不过有一点要注意的真的是 IM 的话,因为 kafka 的 partition reblance IO 相当大,可能造成非常大的 E2E 的 latency,这点要注意(虽然可以通过配置限制 IO 来绕过)。 总的来说,其实不适合 IM 这个场景

文章導覽

上一篇文章
下一篇文章

AD

其他操作

  • 登入
  • 訂閱網站內容的資訊提供
  • 訂閱留言的資訊提供
  • WordPress.org 台灣繁體中文

51la

4563博客

全新的繁體中文 WordPress 網站
返回頂端
本站採用 WordPress 建置 | 佈景主題採用 GretaThemes 所設計的 Memory
4563博客
  • Hostloc 空間訪問刷分
  • 售賣場
  • 廣告位
  • 賣站?
在這裡新增小工具