基于Topic的消息发布与消费模式
创始人
2025-07-05 21:30:55
0

闲话

朋友们,好久不见,不知道你们最近怎样,但相信你们一定都挺好。已经有一段时间没有更新了,个中原因不好细说,但是归根结底也许是自己懒。这个不好,大家不要学。今天主要就是想分享一下关于消息处理机制的一些想法。

基本概念

1.Topic

同一个topic下消息的格式一致,例如topic为order-update-message消息的格式都是一个统一的OrderUpdateMessage的结构

2.key主键

同一主键下的消息列表具有顺序性,例如key为订单号order-0001的消息列表(Queue)下,可能包含的消息列表(Queue)如下:

OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="modifying", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

3.Group消费者组

同一个topic下同一个group下的消费者,对这个group下的消息队列进行抢占式消费。例如同一个消费者组group-1下的消费者consumer-1和消费者consumer-2,以及另外一个消费者组group-2下的消费者consumer-3,消息消费的结果可能如下:

// consumer-1消费的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)

// consumer-2消费的消息
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

// consumer-3消费的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

Kafka的消息处理机制就是以这样的形式实现的。

4.优势 

生产者和消费者完全解耦,生产者无需关注是否有消费者在消费,消费者也无需知道生产者是否在生成新的消息。

生产者只关注消息是否成功的发送到消息处理中间件,消费者只关注能否从消息处理中间件消费到消息。

消费者可以按组消费,同组内的消费者进行抢占式消费。

RabbitMq中的优秀实践

1.RabbitMq消息处理机制

生产者讲带有指定RoutingKey的消息发送到对应的Exchange上,Exchange通过Binding定义的路由规格,将消息按照BindingKey分发到不同的Queue上,消费者从Queue拉取消息消费。

  • Exchange & RoutingKey & Topic:RoutingKey决定了消息会被发送到哪个Exchange上,这和topic是类似的概念。
  • Bind & BindingKey & Group:Exchange根据Binding定义的路由规格,将消息按照BindingKey分发到不同的Queue上,这里可以认为是对应了Group的概念。
  • Queue & Group:Queue则是维护了一个Group下的某个队列下的所有消息。

优秀实践

因此如果要以RabbitMq实现基于Topic和Group实现的消息生产和消费的机制,可以将消息定义成以下类似的结构:

// Exchange: {value="order-update", type="fanout"}
// binding1: {value="promotion-service", bindingKey="order.*.paid"}
// binding2: {value='inventory-service', bindingKey="order.*"}
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)

假设此时有promotion-service(1个实例)和inventory-service(2个实例)两个消费者消费消息,则对应的消息消费的结果可能是:

// inventory-service
// Exchange: {value="order-update", type="fanout"}
// QueueBinding: {value=Queue('inventory-service'), bindingKey="order.*"}
// inventory-service实例1消费到的消息
OrderUpdateMessage(id="msg-0001", orderId = "order-0001", action="create", ...)
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)
// inventory-service实例2消费到的消息
OrderUpdateMessage(id="msg-0003", orderId = "order-0001", action="modified", ...)
OrderUpdateMessage(id="msg-0004", orderId = "order-0001", action="delivering", ...)
OrderUpdateMessage(id="msg-0005", orderId = "order-0001", action="delivered", ...)

// promotion-service
// Exchange: {value="order-update", type="fanout"}
// QueueBinding: {value="promotion-service", bindingKey="order.*.paid"}
// promotion-service实例1消费到的消息
OrderUpdateMessage(id="msg-0002", orderId = "order-0001", action="paid", ...)
OrderUpdateMessage(id="msg-0006", orderId = "order-0002", action="paid", ...)

总结

RabbitMQ的Exchange支持不同类型(Direct, Fanout, Topic, Headers),以及Binding可以对消息以更灵活的通配符的方式将消息分发到对应的Queue上,因此其消息处理机制更加灵活。

基于Topic的消息发布与消费模式,能够将消费者和生产者完全解耦,相对RabbitMQ中的所支持的灵活处理消息的方式,更加简单且易于理解,这也是Kafka的消息处理机制。

通过对比不同的中间件的消息处理机制也许能找到更好的实践方式。

相关内容

热门资讯

如何允许远程连接到MySQL数... [[277004]]【51CTO.com快译】默认情况下,MySQL服务器仅侦听来自localhos...
如何利用交换机和端口设置来管理... 在网络管理中,总是有些人让管理员头疼。下面我们就将介绍一下一个网管员利用交换机以及端口设置等来进行D...
施耐德电气数据中心整体解决方案... 近日,全球能效管理专家施耐德电气正式启动大型体验活动“能效中国行——2012卡车巡展”,作为该活动的...
20个非常棒的扁平设计免费资源 Apple设备的平面图标PSD免费平板UI 平板UI套件24平图标Freen平板UI套件PSD径向平...
德国电信门户网站可实时显示全球... 德国电信周三推出一个门户网站,直观地实时提供其安装在全球各地的传感器网络检测到的网络攻击状况。该网站...
为啥国人偏爱 Mybatis,... 关于 SQL 和 ORM 的争论,永远都不会终止,我也一直在思考这个问题。昨天又跟群里的小伙伴进行...
《非诚勿扰》红人闫凤娇被曝厕所... 【51CTO.com 综合消息360安全专家提醒说,“闫凤娇”、“非诚勿扰”已经被黑客盯上成为了“木...
2012年第四季度互联网状况报... [[71653]]  北京时间4月25日消息,据国外媒体报道,全球知名的云平台公司Akamai Te...