初识RabbitMQ体系之3

网上参考大神们的博客,本人做了三个RabbitMQ即时发音信的德姆o。

话不多说,直接上代码!

注:那份文书档案是本身和多少个朋友学习后一路形成的。

亚洲必赢官网 1

 

壹:搭建贰个缓解方案框架:RabbitMQ_Demo

目录

  • RabbitMQ 概念
  • exchange调换机机制
    • 怎么是沟通机
    • binding?
    • Direct Exchange交换机
    • Topic Exchange交换机
    • Fanout Exchange交换机
    • Header Exchange交换机
  • RabbitMQ 的 Hello – Demo(springboot实现)
  • RabbitMQ 的 Hello Demo(spring xml实现)
  • RabbitMQ 在生产条件下选择和出现的难点
    • Spring RabbitMQ 注解
    • 消息的 JSON 传输
    • 音讯持久化,断线重连,ACK。

1.引言

RabbitMQ——Rabbit Message
Queue的简写,但不可能单纯驾驭其为新闻队列,音信代理更确切。RabbitMQ
是叁个由 Erlang
语言开发的AMQP(高级新闻队列协议)的开源达成,其内部结构如下:

亚洲必赢官网 2

RabbitMQ作为三个新闻代理,主要和消息交际,负责接收并转化新闻。RabbitMQ提供了有限补助的新闻机制、跟踪机制和灵活的音讯路由,帮忙消息集群和分布式计划。适用于排队算法、秒杀活动、新闻分发、异步处理、数据同步、处理耗费时间职分、CQ凯雷德S等应用场景。

上面大家就来上学下RabbitMQ。

一.使用VS的NuGet安装包管理工具安装RabbitMQ.Client:

中间蕴蓄6个部分:

RabbitMQ 概念

RabbitMQ
即3个消息队列,重中之重是用来促成应用程序的异步息争耦,同时也能起到音信缓冲,新闻分发的效果。RabbitMQ使用的是AMQP协议,它是1种贰进制协议。私下认可运营端口
567二。

在 RabbitMQ 中,如下图结构:

亚洲必赢官网 3

rabbitmq

  • 左边 P 代表 生产者,也正是往 RabbitMQ 发音讯的次第。
  • 高级中学级正是 RabbitMQ,当中包蕴了 调换机 和 队列。
  • 右手 C 代表 消费者,也正是往 RabbitMQ 拿音信的程序。

那么,个中相比较主要的概念有 伍个,分别为:虚拟主机,交流机,队列,和绑定。

  • 虚拟主机:四个虚拟主机持有1组沟通机、队列和绑定。为啥须要多个虚拟主机呢?非常粗略,RabbitMQ在那之中,用户只可以在虚拟主机的粒度进行权力控制。
    由此,借使急需禁止A组访问B组的交流机/队列/绑定,必须为A和B分别创制3个虚拟主机。每个RabbitMQ服务器都有一个默许的虚拟主机“/”。
  • 交换机:Exchange 用于转载音讯,然则它不会做存储 ,假如未有Queue bind 到 Exchange 的话,它会一贯舍弃掉 Producer
    发送过来的音讯。

    • 此地有三个比较关键的定义:***路由键 ***
      。音信到沟通机的时候,交互机会转载到对应的系列中,那么到底转载到哪个队列,就要遵照该路由键。
  • 绑定:也正是调换机须求和队列相绑定,那之中如上海教室所示,是多对多的涉及。

二. 环境搭建

正文主要遵照Windows下行使Vs Code 基于.net
core实行demo演示。早先在此以前大家需求防微杜渐好以下条件。

  • 安装Erlang运营条件
    下载安装Erlang。
  • 安装RabbitMQ
    下载安装Windows版本的RabbitMQ。
  • 启动RabbitMQ Server
    点击Windows开首按钮,输入RabbitMQ找到RabbitMQ Comman Prompt,以管理人身份运营。
  • 次第执行以下命令运维RabbitMQ服务

    rabbitmq-service install
    rabbitmq-service enable
    rabbitmq-service start
    
  • 执行rabbitmqlctl status检查RabbitMQ状态

  • 安装管理平台插件
    执行rabbitmq-plugins enable rabbitmq_management即可成功安装,使用默许账号密码(guest/guest)登录即可。

亚洲必赢官网 4

一:RabbitMQ 公用类库项目

exchange调换机机制

3. Hello RabbitMQ

在开端从前大家先来精晓下新闻模型:
亚洲必赢官网 5
买主(consumer)订阅有些队列。生产者(producer)创造信息,然后公布到行列(queue)中,队列再将音讯发送到监听的主顾。

下边大家我们通过demo来了然RabbitMQ的着力用法。

 

2:1个劳动者控制台项目

怎么样是调换机

rabbitmq的message
model实际上海消防息不直接发送到queue中,中间有一个exchange是做消息分发,producer甚至不知底新闻发送到那几个队列中去。因而,当exchange收到message时,必须精确驾驭该如何分发。是append到自然规则的queue,照旧append到几个queue中,还是被吐弃?这个规则都以通过exchagne的四种type去定义的。

The core idea in the messaging model in RabbitMQ is that the producer
never sends any messages directly to a queue. Actually, quite often
the producer doesn’t even know if a message will be delivered to any
queue at all.

Instead, the producer can only send messages to an exchange. An
exchange is a very simple thing. On one side it receives messages from
producers and the other side it pushes them to queues. The exchange
must know exactly what to do with a message it receives. Should it be
appended to a particular queue? Should it be appended to many queues?
Or should it get discarded. The rules for that are defined by the
exchange type.

exchange是八个音信的agent,每八个虚拟的host中都有定义。它的职分是把message路由到不相同的queue中。

三.一.音信的发送和吸收接纳

创办RabbitMQ文件夹,打开命令提醒符,分别创制多个控制台项目Send、Receive。

dotnet new console --name Send //创建发送端控制台应用
cd Send //进入Send目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

dotnet new console --name Receive //创建接收端控制台应用
cd Receive //进入Receive目录
dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包
dotnet restore //恢复包

咱俩先来添加音讯发送端逻辑:

//Send.cs 
public static void Main(string[] args)
{
    //1.1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构建byte消息数据包
            string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);
            //6. 发送数据包
            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

初识RabbitMQ体系之3。再来完善新闻接收端逻辑:

//Receive.cs  省略部分代码
public static void Main()
{
    //1.实例化连接工厂
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立连接
    using (var connection = factory.CreateConnection())
    {
        //3. 创建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明队列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 构造消费者实例
            var consumer = new EventingBasicConsumer(channel);
            //6. 绑定消息接收后的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(6000);//模拟耗时
                Console.WriteLine (" [x] Done");
            };
            //7. 启动消费者
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

先运转音信接收端,再运营音信发送端,结果如下图。

亚洲必赢官网 6

从上边包车型大巴代码中可以看出,发送端和消费端的代码前四步都以千篇1律的。重要的分别在于发送端调用channel.BasicPublish办法发送消息;而接收端供给实例化1个EventingBasicConsumer实例来拓展消息处理逻辑。其它一些内需留意的是:新闻接收端和出殡和埋葬端的队列名称(queue)必须保持一致,那里钦赐的队列名叫hello。

二.劳动者端代码:

三:七个买主要控制制台项目

binding?

exchange和queue通过routing-key关联,那两者之间的涉嫌是正是binding。如下图所示,X表示调换机,中黄代表队列,交流机通过八个routing-key去binding3个queue,routing-key有哪些效用吗?看Direct
exchange类型沟通机。

亚洲必赢官网 7

三.2. 循环调度

应用工作行列的裨益正是它亦可互为的拍卖队列。假设堆积了众多职务,大家只须求丰盛越来越多的劳重力(workers)就能够了。大家先运营五个接收端,等待音信接收,再开发银行三个殡葬端进行消息发送。

亚洲必赢官网 8

我们扩张运转二个消费端后的运作结果:

亚洲必赢官网 9

从图中可见,大家循环发送四条新闻,四个新闻接收端按梯次被循环分配。
暗许意况下,RabbitMQ将按梯次将每条消息发送给下2个顾客。平均每种顾客将赢得同样数量的新闻。那种分发音讯的办法叫做循环(round-robin)。

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Producter
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName="localhost",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31         static void Main(string[] args)
32         {
33             DirectExchangeSendMsg();
34             Console.WriteLine("按任意键退出程序!");
35             Console.ReadKey();
36         }
37         /// <summary>
38         /// 单点精确路由模式
39         /// </summary>
40         private static void DirectExchangeSendMsg()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     var props = channel.CreateBasicProperties();
51                     props.Persistent = true;
52                     Console.WriteLine("请输入需要发送的消息:");
53                     string vadata = Console.ReadLine();
54                     while (vadata != "exit")
55                     {
56                         var msgBody = Encoding.UTF8.GetBytes(vadata);
57                         channel.BasicPublish(exchange: ExchangeName, routingKey: QueueName, basicProperties: props, body: msgBody);
58                         Console.WriteLine(string.Format("发送时间:{0},发送完毕,输入exit退出消息发送", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")));
59                         vadata = Console.ReadLine();
60                     }
61                 }
62             }
63         }
64     }
65 }

体系协会如图:

Directed Exchange

路由键exchange,该交流机械收割到音信后会把消息发送到钦定routing-key的queue中。这音讯交流机是怎么知道的吧?其实,producer
deliver音信的时候会把routing-key add到 message
header中。routing-key只是三个messgae的attribute。

A direct exchange delivers messages to queues based on a message routing key. The routing key is a message attribute added into the message header by the producer. The routing key can be seen as an "address" that the exchange use to decide how to route the message. A message goes to the queue(s) whose binding key exactly matches the routing key of the message.

Default Exchange
那种是至极的Direct
Exchange,是rabbitmq内部私下认可的二个沟通机。该交流机的name是空字符串,全数queue都私下认可binding
到该沟通机上。全数binding到该交换机上的queue,routing-key都和queue的name1样。

三.三. 新闻确认

安份守己大家地方的demo,壹旦RabbitMQ将新闻发送到消费端,新闻就会立马从内部存款和储蓄器中移出,无论消费端是或不是处理达成。在那种景况下,音讯就会丢掉。

为了保证3个音信永远不会丢掉,RabbitMQ辅助音信确认(message
acknowledgments)
。当消费端接收消息还要处理完了后,会发送3个ack(音讯确认)实信号到RabbitMQ,RabbitMQ接收到这一个能量信号后,就足以去除掉这条已经处理的新闻任务。但固然消费端挂掉了(比如,通道关闭、连接丢失等)未有发送ack时域信号。RabbitMQ就会分晓有个别音讯尚未例行处理,RabbitMQ将会再一次将音信入队,即便有其余叁个费用端在线,就会快捷的重复发送到别的四个消费端。

RabbitMQ中绝非消息超时的概念,唯有当消费端关闭或奔溃时,RabbitMQ才会再也分发新闻。

微调下Receive中的代码逻辑:

 //5. 构造消费者实例
 var consumer = new EventingBasicConsumer(channel);
 //6. 绑定消息接收后的事件委托
 consumer.Received += (model, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine(" [x] Received {0}", message);
     Thread.Sleep(6000);//模拟耗时
     Console.WriteLine(" [x] Done");
     // 7. 发送消息确认信号(手动消息确认)
     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 };
 //8. 启动消费者
 //autoAck:true;自动进行消息确认,当消费端接收到消息后,就自动发送ack信号,不管消息是否正确处理完毕
 //autoAck:false;关闭自动消息确认,通过调用BasicAck方法手动进行消息确认
 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

最首要改动的是将
autoAck:true修改为autoAck:fasle,以及在音信处理实现后手动调用BasicAck方法实行手动新闻确认。

亚洲必赢官网 10

从图中可见,音信发送端连接发送四条新闻,当中消费端1先被分配处理第2条音讯,消费端2被循环分配第3条音讯,第三条新闻由于并没有空余消费者依旧在队列中。
在开支端二未处理完第三条消息之前,手动中断(ctrl+c)。大家得以发现RabbitMQ在下三次分发时,会预先将被中止的信息分发给消费端一拍卖。

3.主顾端代码:

亚洲必赢官网 11

Topic Exchange

通配符调换机,exchange会把新闻发送到1个可能四个满意通配符规则的routing-key的queue。其中*表号匹配二个word,#杰出多少个word和路线,路径之间通过.隔开分离。如知足a.*.c的routing-key有a.hello.c;满足#.hello的routing-key有a.b.c.helo。

三.4. 音讯持久化

音讯确认确定保障了不畏消费端格外,音信也不会丢掉能够被再度分发处理。不过倘若RabbitMQ服务端卓殊,音讯依然会丢掉。除非大家钦点durable:true,不然当RabbitMQ退出或奔溃时,音讯将还是会丢掉。通过点名durable:true,并指定Persistent=true,来告诉RabbitMQ将消息持久化。

//send.cs
//4. 申明队列(指定durable:true,告知rabbitmq对消息进行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//将消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 构建byte消息数据包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 发送数据包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);

将音信标记为持久性不可能一心保险音讯不会丢掉。纵然它告诉RabbitMQ将消息保存到磁盘,不过当RabbitMQ接受消息还要还不曾保存时​​,依旧有二个十分的短的岁月窗口。RabbitMQ
只怕只是将新闻保存到了缓存中,并未将其写入到磁盘上。持久化是无法肯定保险的,不过对于3个大致职务队列来说已经足够。若是须求确认保证音讯队列的持久化,能够应用publisher
confirms.

 1 using System;
 2 using System.Collections.Generic;
 3 using System.Linq;
 4 using System.Text;
 5 using System.Threading.Tasks;
 6 using RabbitMQ.Client;
 7 
 8 namespace RabbitMQ.Consumer
 9 {
10     class Program
11     {
12         /// <summary>
13         /// 连接配置
14         /// </summary>
15         private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
16         {
17             HostName = "127.0.0.1",
18             UserName = "guest",
19             Password = "guest",
20             Port = 5672,
21             //VirtualHost = "JentVirtualHost"
22         };
23         /// <summary>
24         /// 路由名称
25         /// </summary>
26         const string ExchangeName = "Jent.Exchange";
27         /// <summary>
28         /// 队列名称
29         /// </summary>
30         const string QueueName = "Jent.Queue";
31 
32         static void Main(string[] args)
33         {
34             DirectAcceptExchange();
35 
36             Console.WriteLine("输入任意值退出程序!");
37             Console.ReadKey();
38         }
39 
40         private static void DirectAcceptExchange()
41         {
42             using (IConnection conn = rabbitMqFactory.CreateConnection())
43             {
44                 using (IModel channel = conn.CreateModel())
45                 {
46                     channel.ExchangeDeclare(ExchangeName, "direct", durable: true, autoDelete: false, arguments: null);
47                     channel.QueueDeclare(QueueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
48                     channel.QueueBind(QueueName, ExchangeName, routingKey: QueueName);
49 
50                     while (true)
51                     {
52                         BasicGetResult msgResponse = channel.BasicGet(QueueName, autoAck: false);
53                         if (msgResponse != null)
54                         {
55                             var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
56                             Console.WriteLine(string.Format("接收时间:{0},消息内容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
57                         }
58                         //System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
59                     }
60                 }
61             }
62         }
63     }
64 }

 

Fanout Exchange

扇形沟通机,该调换机会把音讯发送到全体binding到该交流机上的queue。那种是publisher/subcribe方式。用来做广播最佳。
拥有该exchagne上钦命的routing-key都会被ignore掉。

The fanout copies and routes a received message to all queues that are
bound to it regardless of routing keys or pattern matching as with
direct and topic exchanges. Keys provided will simply be ignored.

三.伍. 公道分发

RabbitMQ的新闻分发私下认可遵照消费端的多寡,按梯次循环分发。那样仅是确定保证了消费端被平均分发新闻的数目,但却不经意了消费端的闲忙情状。那就恐怕出现有些消费端直接处理耗费时间任务处于阻塞状态,有些消费端直接处理一般任务处于空置状态,而只是它们分配的任务数量1样。

亚洲必赢官网 12

但我们得以因而channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
设置prefetchCount : 1来告诉RabbitMQ,在未收到消费端的音信确认时,不再分发音信,也就保障了当消费端处于艰巨景色时,不再分配职责。

//Receive.cs
//4. 申明队列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

那儿你要求留意的是假若具有的消费端都地处劳碌景观,你的种类或许会被塞满。你供给留意那或多或少,要么添加更加多的消费端,要么使用此外策略。

4.先后结果:

二:开发以前,必要引用RabbitMQ包

Header Exchange

安装header attribute参数类型的交流机。

4. Exchange

细心的您只怕发现上边的demo,生产者和顾客直接是透过壹致队列名称举办相称衔接的。消费者订阅某些队列,生产者创造音信公布到行列中,队列再将新闻转载到订阅的主顾。那样就会有三个局限性,即消费者3次只能发送音信到某多个种类。

那消费者哪些才能发送新闻到多个音讯队列呢?
RabbitMQ提供了Exchange,它好像于路由器的功用,它用来对新闻进行路由,将消息发送到几个连串上。Exchange1方面从劳动者接收音信,另1方面将音信推送到行列。但exchange必须精通什么样处理接收到的新闻,是将其附加到一定队列照旧外加到七个连串,照旧直接忽略。而这么些规则由exchange
type定义,exchange的规律如下图所示。
亚洲必赢官网 13

周边的exchange type 有以下两种:

  • direct(鲜明的路由规则:消费端绑定的种类名称必须和音信发布时钦定的路由名称相同)
  • topic (方式匹配的路由规则:帮忙通配符)
  • fanout (音讯广播,将信息分发到exchange上绑定的具备队列上)

下边大家就来挨家挨户那介绍它们的用法。

亚洲必赢官网 14

设置相应的Nuget包,或然下载相关dll也能够,可是提出在线安装nuget,更便于

RabbitMQ 的 Hello Demo

安装就背着了,提议遵照官方文书档案上做。先贴代码,稍后解释,代码如下:

布置 交流机,队列,调换机与队列的绑定,新闻监视容器:

@Configuration
@Data
public class RabbitMQConfig {

    final static String queueName = "spring-boot";

    @Bean
    Queue queue() {
        return new Queue(queueName, false);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("spring-boot-exchange");
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(queueName);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(queueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    Receiver receiver() {
        return new Receiver();
    }
    @Bean
    MessageListenerAdapter listenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
}

安插接收新闻者(即消费者):

public class Receiver {

    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
        latch.countDown();
    }

    public CountDownLatch getLatch() {
        return latch;
    }
}

布局发送音信者(即生产者):

@RestController
public class Test {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping(value = "/test/{abc}",method = RequestMethod.GET)
    public String test(@PathVariable(value = "abc") String abc){
        rabbitTemplate.convertAndSend("spring-boot", abc + " from RabbitMQ!");
        return  "abc";
    }
}

如上便可实现一个简短的 RabbitMQ
德姆o,具体代码在:点这里

那正是说,那里,分为八个部分分析:发音信,调换机队列,收新闻。

  • 对此发送消息:大家1般能够运用 RabbitTemplate,那个是 Spring
    封装给了我们,便于我们发送新闻,我们调用
    rabbitTemplate.convertAndSend("spring-boot", xxx); 即可发送消息。
  • 对于沟通机队列:如上代码,大家须要配备交流机
    TopicExchange,配置队列 Queue,并且配备他们中间的绑定 Binding
  • 对于收受消息:首先要求创制叁个音信监听容器,然后把咱们的接受者注册到该容器中,那样,队列中有新闻,那么就会调用接收者的附和的不二秘籍。如上代码
    container.setMessageListener(listenerAdapter);
    在那之中,MessageListenerAdapter 能够当作是
    大家接收者的2个卷入类,new MessageListenerAdapter(receiver, "receiveMessage");
    指明了一旦有消息来,那么调用接收者哪个方法实行拍卖。

4.1 fanout

针对由表及里的盘算,大家先来精晓下fanout的广播路由体制。fanout的路由机制如下图,即发送到
fanout 类型exchange的新闻都会散发到具备绑定该exchange的行列上去。

亚洲必赢官网 15

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用fanout exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到指定exchange,fanout类型无需指定routingKey
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);

买主示例代码:

//申明fanout类型exchange
channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到指定fanout类型exchange,无需指定路由键
channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");

 

检索:RabbitMQ.Client  
安装新型版即可,不明白怎么设置nuget,请移步 

RabbitMQ 的 Hello Demo(spring xml实现)

spring
xml格局贯彻RabbitMQ不难,可读性较好,配置简单,配置和完结如下所示。

上文已经讲述了rabbitmq的安排,xml格局经过properites文件存放用户配置消息:

mq.host=127.0.0.1
mq.username=guest
mq.password=guest
mq.port=5672

配置application-mq.xml配置文件,表明连接、调换机、queue以及consumer监听。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
     http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
    http://www.springframework.org/schema/rabbit
    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd" >
    <description>rabbitmq 连接服务配置</description>

    <!-- 连接配置 -->
    <context:property-placeholder location="classpath:mq.properties" />
    <rabbit:connection-factory id="connectionFactory" host="${mq.host}" username="${mq.username}" password="${mq.password}" port="${mq.port}"/>
    <rabbit:admin connection-factory="connectionFactory"/>
    <!-- spring template声明-->
    <rabbit:template exchange="amqpExchange" id="amqpTemplate"  connection-factory="connectionFactory" />

    <!--申明queue-->
    <rabbit:queue id="test_queue_key" name="test_queue_key" durable="true" auto-delete="false" exclusive="false" />
    <!--申明exchange交换机并绑定queue-->
    <rabbit:direct-exchange name="amqpExchange" durable="true" auto-delete="false" id="amqpExchange">
        <rabbit:bindings>
            <rabbit:binding queue="test_queue_key" key="test_queue_key"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>


    <!--consumer配置监听-->
    <bean id="reveiver" class="com.demo.mq.receive.Reveiver" />
    <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
        <rabbit:listener queues="test_queue_key" ref="reveiver" method="receiveMessage"/>
    </rabbit:listener-container>
</beans>

上述代码中,引入properties文件就不多说了。

<rabbit:connection-factory>标签注脚创设connection的factory工厂。

<rabbit-template>声明spring
template,和上文spring中使用template一样。template可声明exchange。

<rabbit:queue>声称3个queue并安装queue的布局项,间接看标签属性就足以明白queue的安插项。

<rabbit:direct-exchange>申明交流机并绑定queue。

<rabbit:listener-container>申明监听container并安插consumer和监听routing-key。

剩余就大概了,application-context.xml中把rabbitmq配置import进去。

<?xml version="1.0" encoding="UTF-8"?>
<beans
        xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:task="http://www.springframework.org/schema/task"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:aop="http://www.springframework.org/schema/aop"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd
       http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd">

    <context:component-scan base-package="com.demo.**" />
    <import resource="application-mq.xml" />
</beans>

Producer达成,发送消息仍旧使用template的convertAndSend() deliver音信。

@Service
public class Producer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    private final static Logger logger = LoggerFactory.getLogger(Producer.class);

    public void sendDataToQueue(String queueKey, Object object) {
        try {
            amqpTemplate.convertAndSend(queueKey, object);
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("exeception={}",e);
        }

    }
}

配置consumer

package com.demo.mq.receive;

import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;

@Service
public class Reveiver {
    private CountDownLatch latch = new CountDownLatch(1);

    public void receiveMessage(String message) {
        System.out.println("reveice msg=" + message.toString());
        latch.countDown();
    }
}

测试deliver消息

Controller
@RequestMapping("/demo/")
public class TestController {
    private final static Logger logger = LoggerFactory.getLogger(TestController.class);
    @Resource
    private Producer producer;


    @RequestMapping("/test/{msg}")
    public String send(@PathVariable("msg") String msg){
        logger.info("#TestController.send#abc={msg}", msg);
        System.out.println("msg="+msg);
        producer.sendDataToQueue("test_queue_key",msg);
        return "index";
    }
}

4.2. direct

direct相对于fanout就属于完全匹配、单播的情势,路由体制如下图,即队列名称和音信发送时钦定的路由完全相配时,新闻才会发送到钦点队列上。
亚洲必赢官网 16

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用direct exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到direct类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);

消费者示例代码:

//申明direct类型exchange
channel.ExchangeDeclare (exchange: "directEC", type: "direct");
//绑定队列到direct类型exchange,需指定路由键routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");

注:在率先步事先,你必要安装RabbitMQ客户端,可从

安装好今后,起先激动的写代码啦!

RabbitMQ 在生产条件下选择和出现的难点

在生养条件中,由于 Spring 对 RabbitMQ
提供了有个别方便人民群众的注释,所以首先能够行使那几个评释。例如:

  • @EnableRabbit:@EnableRabbit 和 @Configuration
    注脚在3个类中结合使用,假如此类能够回到贰个RabbitListenerContainerFactory 类型的
    bean,那么就一定于能够把该终端(消费端)和 RabbitMQ
    进行连续。Ps:(生成端不是经过 RabbitListenerContainerFactory 来和
    RabbitMQ 连接,而是通过 RabbitTemplate )
  • @RabbitListener:当对应的类别中有音信的时候,该表明修饰下的主意会被执行。
  • @RabbitHandler:接收者能够监听七个类别,不相同的队列音信的档次可能分化,该表明能够使得不相同的新闻让区别措施来响应。

实际那么些申明的运用,能够参照那里的代码:点这里

首先,生产环境下的 RabbitMQ
大概不会在劳动者只怕消费者本机上,所以须要再一次定义
ConnectionFactory,即:

@Bean
ConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
    connectionFactory.setUsername(userName);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(vhost);
    return connectionFactory;
}

此处,能够重新安装须要连接的 RabbitMQ 的
ip,端口,虚拟主机,用户名,密码。

然后,能够先从生产端怀恋,生产端必要延续 RabbitMQ,那么能够因此RabbitTemplate 实行连接。 Ps:(RabbitTemplate
用于生产端发送消息到交流机中),如下代码:

@Bean(name="myTemplate")
RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate(connectionFactory);
    template.setMessageConverter(integrationEventMessageConverter());
    template.setExchange(exchangeName);
    return template;
}

在该代码中,new RabbitTemplate(connectionFactory);
设置了生产端连接到RabbitMQ,template.setMessageConverter(integrationEventMessageConverter());
设置了 生产端发送给调换机的音信是以怎么样格式的,在
integrationEventMessageConverter() 代码中:

public MessageConverter integrationEventMessageConverter() {
    Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter();
    return messageConverter;
}

如上 Jackson2JsonMessageConverter 指明了 JSON。上述代码的尾声
template.setExchange(exchangeName); 指明了
要把劳动者要把音讯发送到哪个沟通机上。

有了上述,那么,大家即可使用
rabbitTemplate.convertAndSend("spring-boot", xxx); 发送音讯,xxx
代表任意档次,因为上述的设置会帮我们把这么些品种转化成 JSON 传输。

进而,生产端发送大家说过了,那么未来得以看看消费端:

对此消费端,大家得以只创立
SimpleRabbitListenerContainerFactory,它能够帮大家转变
RabbitListenerContainer,然后大家再利用 @RabbitListener
内定接收者收到音信时处理的点子。

@Bean(name="myListenContainer")
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setMessageConverter(integrationEventMessageConverter());
    factory.setConnectionFactory(connectionFactory());
    return factory;
}

这其中
factory.setMessageConverter(integrationEventMessageConverter());
钦定了作者们承受消息的时候,以 JSON
传输的音讯能够转换到对应的类别传入到点子中。例如:

@Slf4j
@Component
@RabbitListener(containerFactory = "helloRabbitListenerContainer",queues = "spring-boot")
public class Receiver {
    @RabbitHandler
    public void receiveTeacher(Teacher teacher) {
        log.info("##### = {}",teacher);
    }
}

恐怕现身的标题:

4.3. topic

topic是direct的晋级版,是1种方式相配的路由机制。它支持使用三种通配符来展开方式相称:符号#和符号*。其中*合作三个单词,
#则代表相称0个或两个单词,单词之间用.细分。如下图所示。
亚洲必赢官网 17

生产者示例代码:

// 生成随机队列名称
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名称
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//发布到topic类型exchange,必须指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);

买主示例代码:

//申明topic类型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明随机队列名称
var queuename = channel.QueueDeclare ().QueueName;
//绑定队列到topic类型exchange,需指定路由键routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");

        但是RabbitMQ又是正视于Erlang
OTP平台,所以,安装RabbitMQ从前,要求先从

 

音信持久化

在生养条件中,我们要求思虑万一劳动者挂了,消费者挂了,大概 rabbitmq
挂了什么样。1般的话,假使劳动者挂了照旧消费者挂了,其实是未有影响,因为音讯就在队列之中。那么万一rabbitmq
挂了,在此之前在队列之中的新闻咋办,其实能够做音信持久化,RabbitMQ
会把音讯保存在磁盘上。

做法是能够先从 Connection 对象中得到3个 Channel
信道对象,然后再能够经过该对象设置 音信持久化。

5. RPC

奥迪Q5PC——Remote Procedure Call,远程进程调用。
那RabbitMQ如何开始展览远程调用呢?示意图如下:
亚洲必赢官网 18
先是步,首假如开始展览长途调用的客户端须求钦定接收远程回调的种类,并表明消费者监听此行列。
其次步,远程调用的服务端除了要表明消费端接收远程调用请求外,还要将结果发送到客户端用来监听的结果的行列中去。

长距离调用客户端:

 //申明唯一guid用来标识此次发送的远程调用请求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要监听的回调队列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回调队列
 properties.CorrelationId = correlationId;//指定消息唯一标识
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //发布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //创建消费者用于处理消息回调(远程调用返回结果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //仅当消息回调的ID与发送的ID一致时,说明远程调用结果正确返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

远程调用服务端:

//申明队列接收远程调用请求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//请求处理逻辑
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //从请求的参数中获取请求的唯一标识,在消息回传时同样绑定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //将远程调用结果发送到客户端监听的队列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手动发回消息确认
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

       
关于那有的的内容,推荐阅读:

3:达成RabbitMQ基本收发新闻的意义

劳动者或然消费者断线重连

此地 Spring 有活动重连机制。

6. 总结

依据上面的demo和对两种分化exchange路由体制的上学,我们发现RabbitMQ首假诺涉及到以下几其中央概念:

  1. Publisher:生产者,音信的发送方。
  2. Connection:互联网连接。
  3. Channel:信道,多路复用连接中的一条独立的双向数据流通道。
  4. Exchange:调换器(路由器),负责新闻的路由到对应队列。
  5. Binding:队列与沟通器间的涉及绑定。消费者将关心的行列绑定到钦定沟通器上,以便Exchange能准确分发音信到钦点队列。
  6. Queue:队列,信息的缓冲存储区。
  7. Virtual
    Host:虚拟主机,虚拟主机提供财富的逻辑分组和分手。包蕴连接,沟通,队列,绑定,用户权限,策略等。
  8. Broker:音信队列的服务器实体。
  9. Consumer:消费者,音信的接收方。

此次作为入门就讲到那里,下次我们来教学下EventBus +
RabbitMQ
什么样兑现事件的分发。

参考资料:
RabbitMQ Tutorials
Demo路径——RabbitMQ

 

1:在RabbitMQ_Lib类库中新建类:MyRabbitMQ.cs

ACK 确认机制

每种Consumer恐怕须要一段时间才能处理完收到的数据。如若在那些进度中,Consumer出错了,分外退出了,而数据还尚未拍卖到位,那么
分外不幸,那段数据就丢掉了。因为大家使用no-ack的法子开始展览确认,也正是说,每一趟Consumer接到数据后,而随就是还是不是处理完毕,RabbitMQ Server会立即把这么些Message标记为成功,然后从queue中删除了。

假如3个Consumer极度退出了,它处理的多寡可见被其它的Consumer处理,那样数据在那种情状下就不会丢掉了(注意是这种意况下)。
为了保证数据不被遗失,RabbitMQ帮助音信确认机制,即acknowledgments。为了保险数据能被正确处理而不仅是被Consumer收到,那么大家无法应用no-ack。而应当是在拍卖完数据后发送ack。

在拍卖数量后发送的ack,就是报告RabbitMQ数据现已被接收,处理完了,RabbitMQ能够去安全的删减它了。
假若Consumer退出领悟则未有发送ack,那么RabbitMQ就会把那一个Message发送到下一个Consumer。那样就确定保证了在Consumer分外退出的情形下数据也不会丢掉。

  此德姆o只是‘direct’格局的音信发送接收方式。

亚洲必赢官网 19亚洲必赢官网 20

个人对 RabbitMQ ACK 的一对疑云,求助:点这里

public class MyRabbitMQ
    {
        //连接工厂
        private ConnectionFactory factory { get; set; } = new ConnectionFactory
        {
            HostName = "localhost",
            Port = 5672,
            UserName = "guest",
            Password = "guest"
        };

        private IConnection connection { get; set; }

        private IModel channel { get; set; }

        //生产方 构造函数
        public MyRabbitMQ(string exchangeName = "",string exchangeType = ExchangeType.Direct)
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个转发器
            channel.ExchangeDeclare(exchangeName, exchangeType);
        }

        //消费方 构造函数
        public MyRabbitMQ(string exchangeName = "", string queueName = "", string routingKey = "")
        {
            //创建一个连接
            connection = factory.CreateConnection();
            channel = connection.CreateModel();

            //创建一个队列
            channel.QueueDeclare(queueName, true, false, false);

            //队列绑定
            channel.QueueBind(queueName, exchangeName, routingKey);
        }

        public void SendMessage(string message="", string exchangeName = "", string routingKey = "")
        {
            channel.BasicPublish(exchangeName, routingKey, null, Encoding.UTF8.GetBytes(message));
        }

        public QueueingBasicConsumer ReceiveMessage(string queueName = "")
        {
            //EventingBasicConsumer
            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queueName, true, consumer);
            return consumer;
        }
    }

总结

  1. RabbitMQ 效率:异步,解耦,缓冲,消息分发。
  2. RabbitMQ 首要分为二个部分,生产者,调换机和队列,消费者。
  3. 亟需注意音信持久化,指标为了预防 RabbitMQ 宕机;考虑 ACK
    机制,目标为了假使消费者对音信的处理失利了,那么继续要怎么处理。

View Code

写在终极

  1. 亚洲必赢官网 ,写出来,说出去才领悟对不对,知道不对才能改进,改良了才能成才。
  2. 在技能方面,希望大家眼里都容不得沙子。若是有不规则的地方大概要求革新的地点希望得以建议,10分谢谢。

能够看看,首先有三个构造函数,三个是给劳动者选择,2个是给买主选拔,注意参数有所差别,可以寓素不相识产者与顾客关切的点是不雷同的。

无论是生产或然消费,都以一个客户端,都亟待创建二个RabbitMQ连接并创设三个channel,才得以开始展览连锁的操作,那几个操作都以由channel发起的,那样说应该比较白话了。

布局生产者的时候,重假如创立二个转载器,转载器的名字及项目供给定义,

转载器常用类型包罗三种:direct、fanout、topic,

那二种档次那里说的更清楚:

本例子中是以topic为例子的

 

2:MQ_Producter项目中发送新闻(生产者中发送音信)

亚洲必赢官网 21亚洲必赢官网 22

class Program
    {
        static void Main(string[] args)
        {
            string exchangeName = "07281616_exchange_topic";
            string routingkeya = "0728.a.c.routingkey";
            string routingkeyb = "0728.b.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, ExchangeType.Topic);
            for (int i = 0; i < 3600; i++)
            {
                System.Threading.Thread.Sleep(1000);
                if (i % 2 == 0)
                {
                    var message = $"{routingkeyb} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeyb}");
                    myMQ.SendMessage(message, exchangeName, routingkeyb);
                }
                else
                {
                    var message = $"{routingkeya} -- {DateTime.Now.ToLongTimeString()}";
                    Console.WriteLine($"auto send: {message}  for {routingkeya}");
                    myMQ.SendMessage(message, exchangeName, routingkeya);
                }

            }
        }
    }

View Code

此处发送了3600次,每秒发3遍音讯,奇数和偶数发送的路由规则分化等,会有五个不等的客户端来选择,那样有利于大家测试音信是还是不是被分发到了不一致的体系上

 

叁:两个顾客项目展开新闻的接受

顾客一:

亚洲必赢官网 23亚洲必赢官网 24

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281616_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody}  (only for {routingRule})");
            }
        }
    }

View Code

 

买主2:

亚洲必赢官网 25亚洲必赢官网 26

class Program
    {
        static void Main(string[] args)
        {
            string queueName = "07281626_queue";
            string exchangeName = "07281616_exchange_topic";
            var routingRule = "0728.a.*.routingkey";
            MyRabbitMQ myMQ = new MyRabbitMQ(exchangeName, queueName, routingRule);
            var consumer = myMQ.ReceiveMessage(queueName);
            while (true)
            {
                //BasicConsume 方法是可阻塞的,比较好
                var msgResponse = consumer.Queue.Dequeue();
                //这种方法不好,没有阻塞等待
                //var msgResponse = channel.BasicGet("zzs_queue", true);
                var msgBody = Encoding.UTF8.GetString(msgResponse.Body);
                Console.WriteLine($"Received: {msgBody} (only for {routingRule})");
            }
        }
    }

View Code

 

三个顾客分别接收差异队列上的新闻

 

4:运行!

先编写翻译一下,到bin目录下先运营生产者,在运行八个买主

亚洲必赢官网 27

 

也足以先关掉消费端,过7秒再关闭生产端,在web
管理界面能够看看明天有一个连串里有音讯,二个三条三个四条

亚洲必赢官网 28

 

 四:总结

全部例子的具有代码都在此间了,代码里相关心释也很明亮,是自小编自个儿完结的首先个RabbitMQ收发功效,实际应用中毫无疑问能够有许多扩张,新手们有狐疑或许本人精通的有畸形的地方,烦请评论处建议哈,大家共同进步!

网站地图xml地图