应用场景
上一篇文章中讲解了RabbitMQ中的广播模式,一个消息生产之后,可以让任何与此交换机绑定的Queue接收到,但是现在又有了另外一种场景,发短信的消息只能由发短信的消费者收到,改如何做到呢?
方案
看下如下代码,将Queue与Exchange方式;
err = ch.QueueBind(
q.Name, // queue name
"", // binding key
"order", // exchange
false,
nil)
Copy复制
如上代码可以指定一个binding key,意味着将此routing key与当前交换机绑定,需要注意的一点是在fanout交换机类型会忽略此binding key的配置,因此在routing模式中需要使用direct交换机类型。
Direct Exchange
1,在发布订阅模式中使用的交换机类型是fanout类型,此类型的交换机会无脑的把消息分发给所有与此交换机绑定的队列。
2,direct交换机就不同了,此交换机消息分发必须依赖于routing key与binding key的匹配关系。
The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.
官网解释给出的解释如上,大家可以阅读理解一下,下面说一下我的理解:
消息在发送的时候指定routing key
消息队列在与交换机绑定的时候会设置binding key
如果消息中的routing key 与队列与交换机绑定时设置的binding key相等,此队列就会收到此消息。
3,在前面Hello World案例与Work Queue案例中在没有指定交换机情况下如何分发消息的呢?
默认交换机类型direct
队列在声明的时候如果没有指定交换机,会使用默认的direct交换机,翻译有时总是很苍白,直接看下源码中的说明:
Every queue declared gets a default binding to the empty exchange "" which has
the type "direct" with the routing key matching the queue's name. With this
default binding, it is possible to publish messages that route directly to
this queue by publishing to "" with the routing key of the queue name.
QueueDeclare("alerts", true, false, false, false, nil)
Publish("", "alerts", false, false, Publishing{Body: []byte("...")})
Delivery Exchange Key Queue
-----------------------------------------------
key: alerts -> "" -> alerts -> alerts
Copy复制源码中解释的意思是,每个队列在声明的时候,都会默认和一个名字为空的交换机绑定,并且绑定时的binding key是当前队列的名称,并且在发送消息的时候消息的routing key也需要是当前队列名称。
4,还有一点需要提一下,如果在绑定的时候多个消费者的队列与同一个binding key绑定,此时在多个消费者之间类似于fanout交换机类型的消息广播模式。
其实这块也不难理解,在使用fanout交换机实现的发布订阅模式中,不设置binding key,和此处设定的binding key都一样其实是一个道理,多个队列都接收到了同样的消息。
上面介绍了路由工作模式,接下来就看下如何实现:
Sending
连接rabbitmq
创建channel
以上两步不做具体展开
定义交换机
err = ch.ExchangeDeclare(
"order", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err!=nil{
log.Fatalf("%s: %s", "Failed to open a channel", err)
}
Copy复制发送消息
err = ch.Publish(
"order", // exchange
"test_key", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(fmt.Sprintf("routing key info log:%v",i)),
})
if err!=nil{
log.Fatalf("%s: %s", "Failed to publish msg", err)
}
Copy复制
注意:
1,交换机类型direct
2,发送消息时设置routing key
Receiving
连接rabbitmq
创建channel
以上两步不做具体展开
定义交换机
err = ch.ExchangeDeclare(
"order", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err!=nil{
log.Fatalf("%s: %s", "declare exchange error", err)
}
Copy复制定义Queue
q, err := ch.QueueDeclare(
"", // name
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
Copy复制此处定义一个Temporary queues,并指定exclusive=true
绑定队列
err = ch.QueueBind(
q.Name, // queue name
"info", // binding key
"order", // exchange
false,
nil)
Copy复制注意:
1,此处的binding key要和生产者中发送消息的routing key保持一致。
2,此处可以绑定多个不同的消息队列,bind key 不要重复,如果重复就类似于发布订阅模式。
消费消息
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
log.Printf("Failed to register a consumer %v",err)
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf(" [x] %s", d.Body)
}
}()
log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
<-forever
Copy复制
总结
1,路由模式使用的交换机类型为direct,其实可以把之前Hello World案例,工作队列案例,都理解为路由模式,只是这里显式指定路由key,之前隐式指定路由key为队列名字
2,还有一个问题,在发布订阅模式/路由模式中使用的队列都为Temporary queues,并指定exclusive=true。
3,针对于发布订阅模式,以及这里的路由模式,如何实现类似于之前工作队列那种高效的消息处理能力呢?
针对于这个问题,只要理解Binding Key Routing Key决定消息分发到那个队列中,具体如何消费就取决单个消息队列绑定了几个消费者。
如何理解上面这句话呢?
1,可以对比下Hello World案例与工作队列案例,这两个案例中Routing Key 与Binding Key都是采用默认的队列名,为啥工作队列可以实现轮询消费呢?根本原因在于工作队列中一个队列中绑定了多个消费者。
4,针对于第3条的总结,自然就可以实现:
发布/订阅模式实现消息的高效处理,
路由模式中针对单个路由key如何实现消息的高效消费
针对于以上问题的解决方案即是:
匿名队列转为命名队列
Routing Key与Binding Key保持一致
以上两条即可以实现任何模式下消息的的高效处理