暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

RabbitMQ-Routing(路由模式)

go技术沙龙 2021-02-07
1332

应用场景

上一篇文章中讲解了RabbitMQ中的广播模式,一个消息生产之后,可以让任何与此交换机绑定的Queue接收到,但是现在又有了另外一种场景,发短信的消息只能由发短信的消费者收到,改如何做到呢?

方案

看下如下代码,将Queue与Exchange方式;

err = ch.QueueBind(
q.Name, // queue name
"", // binding key
"order", // exchange
false,
nil)

Copy
复制
  • 如上代码可以指定一个binding key,意味着将此routing key与当前交换机绑定,需要注意的一点是在fanout交换机类型会忽略此binding key的配置,因此在routing模式中需要使用direct交换机类型。

Direct Exchange

1,在发布订阅模式中使用的交换机类型是fanout类型,此类型的交换机会无脑的把消息分发给所有与此交换机绑定的队列。

2,direct交换机就不同了,此交换机消息分发必须依赖于routing key与binding key的匹配关系。

The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.

官网解释给出的解释如上,大家可以阅读理解一下,下面说一下我的理解:

  • 消息在发送的时候指定routing key

  • 消息队列在与交换机绑定的时候会设置binding key

  • 如果消息中的routing key 与队列与交换机绑定时设置的binding key相等,此队列就会收到此消息。

3,在前面Hello World案例与Work Queue案例中在没有指定交换机情况下如何分发消息的呢?

  • 默认交换机类型direct

  • 队列在声明的时候如果没有指定交换机,会使用默认的direct交换机,翻译有时总是很苍白,直接看下源码中的说明:

    Every queue declared gets a default binding to the empty exchange "" which has
    the type "direct" with the routing key matching the queue's name. With this
    default binding, it is possible to publish messages that route directly to
    this queue by publishing to "" with the routing key of the queue name.

    QueueDeclare("alerts", true, false, false, false, nil)
    Publish("", "alerts", false, false, Publishing{Body: []byte("...")})

    Delivery Exchange Key Queue
    -----------------------------------------------
    key: alerts -> "" -> alerts -> alerts

    Copy
    复制

    源码中解释的意思是,每个队列在声明的时候,都会默认和一个名字为空的交换机绑定,并且绑定时的binding key是当前队列的名称,并且在发送消息的时候消息的routing key也需要是当前队列名称。

4,还有一点需要提一下,如果在绑定的时候多个消费者的队列与同一个binding key绑定,此时在多个消费者之间类似于fanout交换机类型的消息广播模式。

其实这块也不难理解,在使用fanout交换机实现的发布订阅模式中,不设置binding key,和此处设定的binding key都一样其实是一个道理,多个队列都接收到了同样的消息。

上面介绍了路由工作模式,接下来就看下如何实现:

Sending

  • 连接rabbitmq
  • 创建channel

以上两步不做具体展开

  • 定义交换机
    err = ch.ExchangeDeclare(
    "order", // name
    "direct", // type
    true, // durable
    false, // auto-deleted
    false, // internal
    false, // no-wait
    nil, // arguments
    )
    if err!=nil{
    log.Fatalf("%s: %s", "Failed to open a channel", err)
    }

    Copy
    复制
  • 发送消息
    err = ch.Publish(
    "order", // exchange
    "test_key", // routing key
    false, // mandatory
    false, // immediate
    amqp.Publishing{
    ContentType: "text/plain",
    Body: []byte(fmt.Sprintf("routing key info log:%v",i)),
    })
    if err!=nil{
    log.Fatalf("%s: %s", "Failed to publish msg", err)
    }

    Copy
    复制

注意:

1,交换机类型direct

2,发送消息时设置routing key

Receiving

  • 连接rabbitmq
  • 创建channel

以上两步不做具体展开

  • 定义交换机
    err = ch.ExchangeDeclare(
    "order", // name
    "direct", // type
    true, // durable
    false, // auto-deleted
    false, // internal
    false, // no-wait
    nil, // arguments
    )
    if err!=nil{
    log.Fatalf("%s: %s", "declare exchange error", err)
    }

    Copy
    复制
  • 定义Queue
    q, err := ch.QueueDeclare(
    "", // name
    false, // durable
    false, // delete when unused
    true, // exclusive
    false, // no-wait
    nil, // arguments
    )

    Copy
    复制

    此处定义一个Temporary queues,并指定exclusive=true

  • 绑定队列
    err = ch.QueueBind(
    q.Name, // queue name
    "info", // binding key
    "order", // exchange
    false,
    nil)

    Copy
    复制

    注意:

    1,此处的binding key要和生产者中发送消息的routing key保持一致。

    2,此处可以绑定多个不同的消息队列,bind key 不要重复,如果重复就类似于发布订阅模式。

  • 消费消息
    msgs, err := ch.Consume(
    q.Name, // queue
    "", // consumer
    true, // auto ack
    false, // exclusive
    false, // no local
    false, // no wait
    nil, // args
    )
    log.Printf("Failed to register a consumer %v",err)
    forever := make(chan bool)

    go func() {
    for d := range msgs {
    log.Printf(" [x] %s", d.Body)
    }
    }()

    log.Printf(" [*] Waiting for logs. To exit press CTRL+C")
    <-forever

    Copy
    复制

总结

1,路由模式使用的交换机类型为direct,其实可以把之前Hello World案例,工作队列案例,都理解为路由模式,只是这里显式指定路由key,之前隐式指定路由key为队列名字

2,还有一个问题,在发布订阅模式/路由模式中使用的队列都为Temporary queues,并指定exclusive=true。

3,针对于发布订阅模式,以及这里的路由模式,如何实现类似于之前工作队列那种高效的消息处理能力呢?

针对于这个问题,只要理解Binding Key Routing Key决定消息分发到那个队列中,具体如何消费就取决单个消息队列绑定了几个消费者。

如何理解上面这句话呢?

1,可以对比下Hello World案例与工作队列案例,这两个案例中Routing Key 与Binding Key都是采用默认的队列名,为啥工作队列可以实现轮询消费呢?根本原因在于工作队列中一个队列中绑定了多个消费者。

4,针对于第3条的总结,自然就可以实现:

  • 发布/订阅模式实现消息的高效处理,

  • 路由模式中针对单个路由key如何实现消息的高效消费

针对于以上问题的解决方案即是:

  • 匿名队列转为命名队列

  • Routing Key与Binding Key保持一致

以上两条即可以实现任何模式下消息的的高效处理


文章转载自go技术沙龙,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论