音讯队列

RabbitMQ队列

第二我们在讲rabbitMQ在此以前我们要说一下python里的queue:2者干的事情是相同的,都是队列,用于传递音信

在python的queue中有七个三个是线程queue,贰个是进程queue(multiprocessing中的queue)。线程queue不能跨进度,用于多少个线程之间开始展览多少同步交互;进度queue只是用来父进度与子进程,或然同属于同意父进度下的八个子进度实行互相。也便是说假设是多个精光独立的次第,就算是python程序,也照例不可能用这几个进程queue来通讯。那假设我们有多少个独立的python程序,分属于八个经过,可能是python和任何语言

安装:windows下

率先需求安装 Erlang环境

官网: 

Windows版下载地址:

Linux版:     使用yum安装

 

接下来安装RabbitMQ了 

先是下载RabbitMQ 的Windows版本

下载地址:

安装pika:

前边设置过了pip,直接打开cmd,运营pip install pika

设置达成之后,实现贰个最简便易行的系列通信:

亚洲必赢官网 1

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创立2个骨干的socket,然后建立2个管道,在管道中发音信,然后声美素佳儿个queue,起个体系的名字,之后真正的发消息(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要1运转就一贯运营下去,他不停收一条,永远在此地卡住。

在上头不管是produce依旧consume,里面都宣示了一个queue,这些是为啥吗?因为我们不精晓是顾客先开首运维照旧生产者先运转,那样一旦未有注明的话就会报错。

下边大家来看一下一对多,即2个劳动者对应七个买主:

率先我们运营二个买主,然后不断的用produce去发送数据,大家能够看到顾客是由此一种轮询的方式开始展览持续的收受多少,每一个顾客消费2个。

那么只要我们顾客收到了音信,然后处理那几个消息须要30分钟,在处理的经过中,消费者断电了宕机了,那消费者还未有拍卖完,大家设那一个职务大家务必处理完,那我们应该有1个认可的音信,说那些使命成功了依然是从未做到,所以自身的劳动者要肯定消费者是或不是把那些职责处理完了,消费者处理完之后要给那么些生产者服务器端发送一个认同音讯,生产者才会把那一个职务从新闻队列中剔除。固然未有处理完,消费者宕机了,未有给劳动者发送确认音讯,那就代表从未拍卖完,那我们看看rabbitMQ是怎么处理的

笔者们可以在消费者的callback中添加一个time.sleep()进行效仿宕机。callback是一个回调函数,只要事件一触发就会调用那个函数。函数执行完了就代表音讯处理完了,倘若函数未有处理完,那就证实。。。。

咱俩得以看出在消费者代码中的basic_consume()中有3个参数叫no_ack=True,那几个意思是那条音信是不是被处理完都不会发送确认新闻,壹般大家不加那几个参数,rabbitMQ默许就会给您设置成音讯处理完了就机关发送确认,大家以后把这些参数去掉,并且在callback中添加一句话运营:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

亚洲必赢官网 2亚洲必赢官网 3亚洲必赢官网 4

运作的结果正是,笔者先运营贰遍生产者,数据被消费者一接收到了,不过小编把消费者1宕机,甘休运作,那么消费者二就收到了音讯,即假若消费者绝非发送确认新闻,生产者就不会把新闻删除。

RabbitMQ新闻持久化:

我们得以扭转好多的音讯队列,那我们怎么查看音讯队列的情状呢:rabbitmqctl.bat
list_queues

亚洲必赢官网 5

今昔的情状是,信息队列中还有音信,不过服务器宕机了,那这些音信就丢了,那作者就供给这些音信强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在历次注明队列的时候增加三个durable参数(客户端和服务器端都要丰裕这些参数),

亚洲必赢官网 6

在那几个意况下,咱们把rabbitMQ服务重视启,发现只有队列名留下了,但是队列中的新闻并未了,那样我们还要求在劳动者basic_publish中添加二个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

诸如此类就能够使得消息持久化

今后是1个劳动者对应八个顾客,很公道的收发收发,不过实际的状态是,我们机器的安排是不均等的,有的配置是单核的1些配置是多核的,大概i7处理器处理肆条音讯的时候和其余的计算机处理1条音信的时间大多,那差的微型总计机那里就会积聚音信,而好的微处理器那里就会形成闲置,在具体中做运行的,大家会在负载均衡中装置权重,哪个人的配备高权重高,任务就多一些,然而在rabbitMQ中,大家只做了贰个简短的拍卖就能够兑现公道的消息分发,你有多大的能力就处理多少新闻

即:server端给客户端发送新闻的时候,先检查现在还有稍稍音讯,若是当前消息并没有处理完成,就不会发送给那么些消费者音信。假如当前的顾客绝非音讯就发送

这么些只必要在消费者端举办改动加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在变更1个consume2,在callback中sleep20秒来模拟

亚洲必赢官网 7亚洲必赢官网 8亚洲必赢官网 9

作者先运维七个produce,被consume接受,然后在开发银行1个,就被consumer二接受,不过因为consumer第22中学sleep20秒,处理慢,所以那时在开发银行produce,就又给了consume进行处理

 

python学习之RabbitMQ—–音讯队列,

RabbitMQ队列

前言:此次整治写壹篇关于rabbitMQ的博客,比较上壹篇redis,感觉rabbitMQ难度是增加不少。那篇博客会插入1些英文讲解,可是简单精晓的。rabbitMQ的下载与安装,请参见redis&rabbitMQ安装。

Publish\Subscrible(消息宣布\订阅)

前边都以一对①的出殡接收数据,那本人想一对多,想广播一样,生产者发送一个音信,全数顾客都吸收消息。那大家咋做啊?那一年大家即将用到exchange了

exchange在壹端收音信,在另壹端就把音信放进queue,exchange必须规范的知晓收到的新闻要怎么,是不是相应发到三个特定的queue依然发给许多queue,大概说把他扬弃,那个都被exchange的类别所定义

exchange在概念的时候是有项指标,以决定到底是那个queue符合条件,能够承受信息:

fanout:全部bind到此exchange的queue都基本上能用音信

direct:通过rounroutingKey和exchange决定的相当唯一的queue能够吸收新闻

topic:全部符合routingKey的routingKey所bind的queue尚可新闻

headers:通过headers来决定把消息发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

此处的exchange从前是空的,以往赋值log;在那里也从不注明queue,广播不要求写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在消费者那里大家有定义了3个queue,注意一下诠释中的内容。然而我们在发送端未有注脚queue,为啥发送端不必要接收端必要吗?在consume里有五个channel.queue_bind()函数,里面绑定了exchange转换器上,当然里面还索要三个queue_name

运维结果:

亚洲必赢官网 10亚洲必赢官网 11亚洲必赢官网 12亚洲必赢官网 13

就一定于收音机一样,实时播报,打开多少个顾客,生产者发送一条数据,然后三个买主同时接收到

RabbitMQ队列

率先大家在讲rabbitMQ此前大家要说一下python里的queue:二者干的事务是均等的,都以队列,用于传递音讯

在python的queue中有三个3个是线程queue,叁个是进程queue(multiprocessing中的queue)。线程queue不可能跨进程,用于三个线程之间开始展览数据同步交互;进度queue只是用于父进度与子进程,或然同属于同意父进度下的两个子进度实行交互。相当于说倘若是七个精光独立的主次,就算是python程序,也依然无法用这几个进度queue来通讯。那假使我们有多个单身的python程序,分属于七个进程,可能是python和其余语言

安装:windows下

音讯队列。先是需求设置 Erlang环境 官网: 
Windows版下载地址:
Linux版:     使用yum安装   然后安装RabbitMQ了  首先下载RabbitMQ
的Windows版本 下载地址:

安装pika:

事先安装过了pip,间接打开cmd,运维pip install pika

安装收尾之后,达成一个最简易的系列通讯:

亚洲必赢官网 14

producer:

 1 import pika
 2 
 3 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 #声明一个管道
 5 channel = connection.channel()
 6 
 7 #声明queue
 8 channel.queue_declare(queue = 'hello')
 9 #routing_key是queue的名字
10 channel.basic_publish(exchange='',
11                       routing_key='hello',#queue的名字
12                       body='Hello World!',
13                       )
14 print("[x] Send 'Hello World!'")
15 connection.close()

 

先创制三当中坚的socket,然后建立三个管道,在管道中发消息,然后评释一(Wissu)(Aptamil)个queue,起个系列的名字,之后真正的发新闻(basic_publish)

consumer:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 3 channel = connection.channel()
 4 
 5 channel.queue_declare(queue='hello')
 6 
 7 
 8 def callback(ch, method, properties, body):#回调函数
 9     print("---->",ch,method,properties)
10     print(" [x] Received %r" % body)
11 
12 channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
13                       queue='hello',
14                       no_ack=True
15                        )
16 
17 print(' [*] Waiting for messages. To exit press CTRL+C')
18 channel.start_consuming()

 

 start_consuming()只要一运转就直接运转下去,他频频收一条,永远在此地卡住。

在下面不管是produce照旧consume,里面都宣称了二个queue,这一个是干吗呢?因为咱们不晓得是消费者先起首运转仍然生产者先运维,那样如若没有表明的话就会报错。

上面我们来看一下一对多,即3个劳动者对应多少个顾客:

率先大家运行一个顾客,然后不断的用produce去发送数据,大家得以看到顾客是通过一种轮询的章程举行不断的接受多少,各类顾客消费3个。

那正是说1旦大家顾客接受了音讯,然后处理这几个消息须求30分钟,在拍卖的长河中,消费者断电了宕机了,那消费者还未曾处理完,大家设这一个职责我们无法不处理完,那大家应当有一个承认的音讯,说那几个任务成功了只怕是从未有过成功,所以作者的生产者要肯定消费者是还是不是把那些职分处理完了,消费者处理完之后要给这几个生产者服务器端发送一个认同新闻,生产者才会把这么些任务从音信队列中除去。假诺未有拍卖完,消费者宕机了,未有给劳动者发送确认新闻,那就表示尚无处理完,这大家看看rabbitMQ是怎么处理的

大家得以在消费者的callback中添加二个time.sleep()进行效仿宕机。callback是2个回调函数,只要事件一触发就会调用这几个函数。函数执行完了就表示音讯处理完了,若是函数未有处理完,那就认证。。。。

大家得以见见在消费者代码中的basic_consume()中有二个参数叫no_ack=True,那一个意思是那条新闻是还是不是被处理完都不会发送确认音讯,1般大家不加那几个参数,rabbitMQ暗中认可就会给您设置成信息处理完了就自动发送确认,大家现在把这么些参数去掉,并且在callback中添加一句话运营:ch.basic_ack(delivery_tag=method.delivery_tag)(手动处理)

def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

 

亚洲必赢官网 15亚洲必赢官网 16亚洲必赢官网 17

运行的结果正是,作者先运营1次生产者,数据被消费者1收受到了,可是小编把顾客1宕机,甘休运维,那么消费者二就接收了音信,即假如消费者未有发送确认音信,生产者就不会把信息删除。

RabbitMQ消息持久化:

我们得以生成好多的消息队列,那我们怎么查看新闻队列的意况吗:rabbitmqctl.bat
list_queues

亚洲必赢官网 18

明日的状态是,音讯队列中还有音信,不过服务器宕机了,那这一个消息就丢了,那本身就需求这一个新闻强制的持久化:

channel.queue_declare(queue='hello2',durable=True)

 

在历次注解队列的时候拉长2个durable参数(客户端和服务器端都要增进那一个参数),

亚洲必赢官网 19

在这一个景况下,我们把rabbitMQ服务珍视启,发现只有队列名留下了,但是队列中的消息尚未了,这样大家还亟需在劳动者basic_publish中添加1个参数:properties

producer:

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
#声明一个管道
channel = connection.channel()

#声明queue
channel.queue_declare(queue = 'hello2',durable=True)
#routing_key是queue的名字
channel.basic_publish(exchange='',
                      routing_key='hello2',
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode=2,#make message persistent
                      )
                      )
print("[x] Send 'Hello World!'")
connection.close()

 

如此那般就足以使得新闻持久化

近期是一个劳动者对应七个买主,很公道的收发收发,但是实际上的事态是,我们机器的铺排是不壹致的,有的配置是单核的局地配置是多核的,只怕i7处理器处理四条信息的时候和别的的微型总结机处理一条音信的年华基本上,那差的微处理器那里就会积聚音信,而好的处理器那里就会形成闲置,在实际中做运行的,大家会在负载均衡中装置权重,何人的布局高权重高,职务就多一点,然而在rabbitMQ中,我们只做了一个简易的处理就足以落成公道的新闻分发,你有多大的力量就处理多少消息

即:server端给客户端发送音信的时候,先反省今后还有多少消息,假诺当前新闻并未有处理完结,就不会发送给那个消费者消息。若是当前的消费者绝非音讯就发送

本条只须求在顾客端举办修改加代码:

import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello2',durable=True)


def callback(ch, method, properties, body):#回调函数
    print("---->",ch,method,properties)
    #time.sleep(30)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,#如果收到消息,就调用callback来处理消息
                      queue='hello2',
                      #no_ack=False
                       )

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

 

 大家在风谲云诡3个consume二,在callback中sleep20秒来模拟

亚洲必赢官网 20亚洲必赢官网 21亚洲必赢官网 22

本身先运营四个produce,被consume接受,然后在起步三个,就被consumer2接受,不过因为consumer第22中学sleep20秒,处理慢,所以此时在开发银行produce,就又给了consume举办拍卖

 

rabbitMQ是消息队列;想想在此之前的大家学过队列queue:threading
queue(线程queue,两个线程之间开始展览多少交互)、进程queue(父进度与子进度展开互相只怕同属于同1父进度下的八个子进度展开互动);要是多少个独立的次第,那么之间是不能够通过queue举行互动的,那时候我们就须求2当中间代理即rabbitMQ

rabbitMQ是消息队列;想想以前的大家学过队列queue:threading
queue(线程queue,三个线程之间举行数量交互)、进度Queue(父进度与子进程展开互动恐怕同属于同1父进度下的三个子进程展开互动);假使三个独立的次序,那么之间是不能够因而queue举行交互的,这时候我们就须要四个中级代理即rabbitMQ.

有取舍的接受音讯(exchange_type = direct)

RabbitMQ还援助依据重点字发送,即:队列绑定关键字,发送者将数据遵照重大字发送到新闻exchange,exchange依据重点字判定应该将数据发送到钦定的行列

亚洲必赢官网 23

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

一发缜密的过滤(exchange_type=topic)

亚洲必赢官网 24

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

上述都以服务器端发消息,客户端收新闻,新闻流是单向的,那借使大家想要发一条命令给长途的客户端去履行,然后想让客户端执行的结果重回,则那种形式叫做rpc

RabbitMQ RPC

亚洲必赢官网 25

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是进入四个绿灯形式,未有新闻就等候音信,有音讯就收过来

self.connection.process_data_events()是三个非阻塞版的start_consuming,就是说发了1个事物给客户端,每过一点时光去反省有未有信息,要是未有音讯,能够去干其余事体

reply_to = self.callback_queue是用来接收反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第2在客户端会通过uuid四生成,第一在劳务器端重返执行结果的时候也会传过来3个,所以说要是服务器端发过来的correlation_id与自个儿的id相同
,那么服务器端发出来的结果就必将是本人正好客户端发过去的授命的施行结果。未来就一个服务器端3个客户端,无所谓缺人不承认。今后客户端是非阻塞版的,大家得以不让它打字与印刷未有消息,而是进行新的下令,那样就两条新闻,不自然按梯次达成,那我们就供给去肯定各类再次回到的结果是哪个命令的施行结果。

完整的形式是那样的:生产者发了贰个指令给买主,不知道客户端哪天回来,依旧要去收结果的,然而它又不想进去阻塞格局,想每过壹段时间看这些音信收回来未有,如果音讯收回来了,就象征收完了。 

运营结果:

亚洲必赢官网 26亚洲必赢官网 27

服务器端开启,然后在运营客户端,客户端先是等待新闻的出殡,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

Publish\Subscrible(音信宣布\订阅)

前边都以1对一的出殡接收数据,那小编想1对多,想广播壹样,生产者发送多个音信,全部顾客都吸收消息。那我们如何是好啊?这年大家就要用到exchange了

exchange在1端收音讯,在另一端就把音信放进queue,exchange必须规范的驾驭收到的消息要怎么,是不是相应发到三个一定的queue如故发给许多queue,只怕说把她放任,那个都被exchange的花色所定义

exchange在概念的时候是有档次的,以控制到底是那些queue符合条件,基本上能用信息:

fanout:全体bind到此exchange的queue都足以承受音讯

direct:通过rounroutingKey和exchange决定的13分唯一的queue可以接收新闻

topic:全部符合routingKey的routingKey所bind的queue能够承受消息

headers:通过headers来控制把音信发给哪些queue

消息publisher:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 
 6 channel = connection.channel()
 7 
 8 channel.exchange_declare(exchange='log',type = 'fanout')
 9 
10 message = ' '.join(sys.argv[1:]) or 'info:Hello World!'
11 channel.basic_publish(exchange='logs',routing_key='',body=message)
12 print("[x] Send %r " % message)
13 connection.close()

 

此地的exchange在此以前是空的,今后赋值log;在此间也尚未声明queue,广播不要求写queue

 消息subscriber:

 1 import pika
 2 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 3 channel = connection.channel()
 4 channel.exchange_declare(exchange='logs',exchange_type='fanout')
 5 
 6 #exclusive唯一的,不指定queue名字,rabbit会随机分配一个名字
 7 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
 8 result = channel.queue_declare(exclusive=True)
 9 queue_name = result.method.queue
10 
11 channel.queue_bind(exchange='logs',queue=queue_name)
12 
13 print('[*] Waiting for logs,To exit press CTRL+C')
14 
15 def callback(ch,method,properties,body):
16     print("[X] %r" % body)
17 channel.basic_consume(callback,queue = queue_name,no_ack=True)
18 channel.start_consuming()

 

在顾客那里大家有定义了二个queue,注意一下申明中的内容。可是我们在发送端未有证明queue,为何发送端不须要接收端供给呢?在consume里有叁个channel.queue_bind()函数,里面绑定了exchange转换器上,当然里面还索要叁个queue_name

运行结果:

亚洲必赢官网 28亚洲必赢官网 29亚洲必赢官网 30亚洲必赢官网 31

就一定于收音机1样,实时播报,打开多个顾客,生产者发送一条数据,然后三个买主同时收到到

音信队列:

 

有取舍的吸收消息(exchange_type = direct)

RabbitMQ还匡助依照重大字发送,即:队列绑定关键字,发送者将数据根据重点字发送到音信exchange,exchange依据重要字判定应该将数据发送到内定的队列

亚洲必赢官网 32

publisher:

 1 import pika
 2 import sys
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
 7 
 8 severity = sys.argv[1] if len(sys.argv)>1 else 'info'
 9 message = ' '.join(sys.argv[2:]) or 'Hello World!'
10 channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
11 
12 print("[X] Send %r:%r" %(severity,message))
13 connection.close()

 

subscriber:

import pika
import sys
connect = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connect.channel()

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]#
if not severities:
    sys.stderr.write("Usage:%s [info] [warning] [error]\n" %sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

print('[*]Waiting for logs.To exit press CTRL+c')

def callback(ch,method,properties,body):
    print("[x] %r:%r"%(method.routing_key,body))

channel.basic_consume(callback,queue = queue_name,no_ack=True)
channel.start_consuming()

 

更进一步全面的过滤(exchange_type=topic)

亚洲必赢官网 33

 

publish:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         exchange_type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

 

subscriber:

 1 import pika
 2 import sys
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel = connection.channel()
 6 
 7 channel.exchange_declare(exchange='topic_logs',
 8                          exchange_type='topic')
 9 
10 result = channel.queue_declare(exclusive=True)
11 queue_name = result.method.queue
12 
13 binding_keys = sys.argv[1:]
14 if not binding_keys:
15     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
16     sys.exit(1)
17 
18 for binding_key in binding_keys:
19     channel.queue_bind(exchange='topic_logs',
20                        queue=queue_name,
21                        routing_key=binding_key)
22 
23 print(' [*] Waiting for logs. To exit press CTRL+C')
24 
25 
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28 
29 
30 channel.basic_consume(callback,
31                       queue=queue_name,
32                       no_ack=True)
33 
34 channel.start_consuming()

 

 

如上都以服务器端发新闻,客户端收信息,音信流是单向的,那倘使大家想要发一条命令给长途的客户端去执行,然后想让客户端执行的结果重回,则这种形式叫做rpc

RabbitMQ RPC

亚洲必赢官网 34

rpc server:

 1 import pika
 2 import time
 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel = connection.channel()
 5 
 6 channel.queue_declare(queue='rpc_queue')
 7 def fib(n):
 8     if n==0:
 9         return 0
10     elif n==1:
11         return 1
12     else:
13         return fib(n-1)+fib(n-2)
14 
15 def on_request(ch,method,props,body):
16     n = int(body)
17     print("[.] fib(%s)" %n)
18     response = fib(n)
19 
20     ch.basic_publish(exchange='',routing_key=props.reply_to,
21                      properties=pika.BasicProperties(correlation_id=props.correlation_id),
22                      body = str(response))
23     ch.basic_ack(delivery_tag=method.delivery_tag)25 channel.basic_consume(on_request,queue='rpc_queue')
26 
27 print("[x] Awaiting rpc requests")
28 channel.start_consuming()

 

 

rpc client:

 1 import pika
 2 import uuid,time
 3 class FibonacciRpcClient(object):
 4     def __init__(self):
 5         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 6 
 7         self.channel = self.connection.channel()
 8 
 9         result = self.channel.queue_declare(exclusive=True)
10         self.callback_queue =  result.method.queue
11 
12         self.channel.basic_consume(self.on_response,#回调函数,只要一收到消息就调用
13                                    no_ack=True,queue=self.callback_queue)
14 
15     def on_response(self,ch,method,props,body):
16         if self.corr_id == props.correlation_id:
17             self.response = body
18 
19     def call(self,n):
20         self.response = None
21         self.corr_id = str(uuid.uuid4())
22         self.channel.basic_publish(exchange='',routing_key='rpc_queue',
23                                    properties=pika.BasicProperties(
24                                        reply_to=self.callback_queue,
25                                        correlation_id=self.corr_id
26                                    ),
27                                    body=str(n),#传的消息,必须是字符串
28                                    )
29         while self.response is None:
30             self.connection.process_data_events()#非阻塞版的start_consuming
31             print("no message....")
32             time.sleep(0.5)
33         return int(self.response)
34 fibonacci_rpc = FibonacciRpcClient()
35 print("[x] Requesting fib(30)")
36 response = fibonacci_rpc.call(30)
37 print("[.] Got %r"%response)

 

之前的start_consuming是进入二个绿灯格局,未有音讯就等候新闻,有消息就收过来

self.connection.process_data_events()是几个非阻塞版的start_consuming,正是说发了三个事物给客户端,每过一点小时去检查有未有消息,假若未有新闻,能够去干别的事体

reply_to = self.callback_queue是用来接收反应队列的名字

corr_id =
str(uuid.uuid4()),correlation_id第3在客户端会通过uuid4生成,第2在服务器端重临执行结果的时候也会传过来二个,所以说如若服务器端发过来的correlation_id与团结的id相同
,那么服务器端发出来的结果就势必是自个儿正要客户端发过去的下令的进行结果。将来就贰个劳务器端3个客户端,无所谓缺人不肯定。今后客户端是非阻塞版的,大家能够不让它打字与印刷未有新闻,而是进行新的通令,那样就两条新闻,不必然按顺序达成,那大家就需求去肯定各类再次回到的结果是哪些命令的实践结果。

完整的方式是那样的:生产者发了二个命令给买主,不晓得客户端哪天回来,还是要去收结果的,可是它又不想进去阻塞方式,想每过1段时间看那几个新闻收回来未有,若是音讯收回来了,就象征收完了。 

运营结果:

亚洲必赢官网 35亚洲必赢官网 36

劳务器端开启,然后在开发银行客户端,客户端先是等待音讯的出殡和埋葬,然后做出反应,直到算出斐波那契

 

 

 

 

 

 

 

 

 

 

RabbitMQ队列
首先大家在讲rabbitMQ此前大家要说一下python里的queue:2者干的工作是平等的,都以队列,用于…

  • RabbitMQ
  • ZeroMQ
  • ActiveMQ
  • ………..

1、简单的rabbitMQ队列通讯

亚洲必赢官网 37

由上海教室能够,数据是头阵给exchange沟通器,exchage再发放相应队列。pika模块是python对rabbitMQ的API接口。接收端有多个回调函数,1接收到数量就调用该函数。一条音讯被三个消费者收到后,该信息就从队列删除。OK,驾驭上边的知识后,先来看望二个简易的rabbitMQ列队通讯。

send端:

 1 import pika
 2 #连上rabbitMQ
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()       #生成管道,在管道里跑不同的队列
 5 
 6 #声明queue
 7 channel.queue_declare(queue='hello1')
 8 
 9 #n RabbitMQ a message can never be sent directly to the queue,it always needs to go through an exchange.
10 #向队列里发数据
11 channel.basic_publish(exchange='',      #先把数据发给exchange交换器,exchage再发给相应队列
12                       routing_key='hello1', #向"hello'队列发数据
13                       body='HelloWorld!!')  #发的消息
14 print("[x]Sent'HelloWorld!'")
15 connection.close()

receive端:

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
 4 channel=connection.channel()
 5 
 6 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
 7 # We could avoid that if we were sure that the queue already exists. For example if send.py program
 8 # was run before. But we're not yet sure which program to run first. In such cases it's a good
 9 # practice to repeat declaring the queue in both programs.
10 channel.queue_declare(queue='hello1')#声明队列,保证程序不出错
11 
12 
13 def callback(ch,method,properties,body):
14     print("-->ch",ch)
15     print("-->method",method)
16     print("-->properties",properties)
17     print("[x] Received %r" % body)         #一条消息被一个消费者接收后,该消息就从队列删除
18 
19 
20 channel.basic_consume(callback,              #回调函数,一接收到消息就调用回调函数
21                       queue='hello1',
22                       no_ack=False)    #消费完毕后向服务端发送一个确认,默认为False
23 
24 print('[*] Waiting for messages.To exit press CTRL+C')
25 channel.start_consuming()

运转结果:(下边包车型客车代码对应自笔者写的注释相信是看得懂的~)

亚洲必赢官网 38亚洲必赢官网 39

rabbitMQ_1_send.py
 [x] Sent 'Hello World!'


rabbitMQ_2_receive.py
 [*] Waiting for messages. To exit press CTRL+C
-->ch <pika.adapters.blocking_connection.BlockingChannel object at 0x000000000250AEB8>
-->method <Basic.Deliver(['consumer_tag=ctag1.f9533f4c8c59473c8096817670ad69d6', 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=hello1'])>
-->properties <BasicProperties>
 [x] Received b'Hello World!!'

View Code

因此深切的测试,有以下多个意识:

  1. 先运行rabbitMQ_1_send.py发送数据,rabbitMQ_2_receive.py未运转。发现当receive运转时还能接收数据。
  2. 运行多个(eg:二个)接收数据的客户端,再运营发送端,客户端1接收多少,再运维发送端,客户端二收到多少,再运营发送端,客户端三收受数量。

RabbitMQ会私下认可把p发的新闻依次分发给各种消费者(c),跟负载均衡大约。

 

原理:

二、全英文ack

在看上边的例子,你会发现有一句代码no_ack=False(消费实现后向服务端发送一个确认,暗中认可为False),以自个儿丹麦语4级飘过的程度,看完上边关于ack的上书感觉写得很牛啊!!于是分享一下:

Doing a task can take a few seconds. You
may wonder what happens if one of the consumers starts a long task and
dies with it only partly done. With our current code once RabbitMQ
delivers message to the customer it immediately removes it from memory.
In this case, if you kill a worker we will lose the message it was just
processing. We’ll also lose all the messages that were dispatched to
this particular worker but were not yet handled.

But we don’t want to lose any tasks. If a
worker dies, we’d like the task to be delivered to another
worker.

In order to make sure a message is never
lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is
sent back from the consumer to tell RabbitMQ that a particular message
had been received, processed and that RabbitMQ is free to delete
it.

If a consumer dies (its channel is
closed, connection is closed, or TCP connection is lost) without sending
an ack, RabbitMQ will understand that a message wasn’t processed fully
and will re-queue it. If there are other consumers online at the same
time, it will then quickly redeliver it to another consumer. That way
you can be sure that no message is lost, even if the workers
occasionally die.

There aren’t any message timeouts;
RabbitMQ will redeliver the message when the consumer dies. It’s fine
even if processing a message takes a very, very long time.

Message
acknowledgments are turned on by default. In previous examples we
explicitly turned them off via the no_ack=True flag. It’s time to
remove this flag and send a proper acknowledgment from the worker, once
we’re done with a task.

Using this code we can be sure that even
if you kill a worker using CTRL+C while it was processing a message,
nothing will be lost. Soon after the worker dies all unacknowledged
messages will be redelivered.

自己把发送端和接收端分别比作生产者与顾客。生产者发送职务A,消费者收到职务A并处理,处理完后生产者将新闻队列中的职务A删除。现在我们相遇了叁个题材:假诺顾客收到职分A,但在拍卖的经过中陡然宕机了。而此刻生产者将音讯队列中的职务A删除。实际上任务A并未能如愿拍卖完,也正是丢失了职分/音信。为化解这一个难题,应使顾客接受职分并打响拍卖完后发送多少个ack到生产者亚洲必赢官网 ,!生产者收到ack后就领悟任务A已被成功拍卖,这时才从音信队列大校任务A删除,若是未有吸收ack,就必要把任务A发送给下二个主顾,直到义务A被成功拍卖。

 

亚洲必赢官网 40

3、音讯持久化

前边早已理解,生产者生产数量,消费者再开发银行是足以接收数据的。

然则,生产者生产数量,然后重启rabbitMQ,消费者是心有余而力不足接收数据。

eg:音信在传输进度中rabbitMQ服务器宕机了,会意识在此之前的新闻队列就不设有了,那时大家就要用到新闻持久化,消息持久化会让队列不趁早服务器宕机而消亡,会永远的保留下去。上边看下关于音讯持久化的英文讲解:

We have learned how to make sure that
even if the consumer dies, the task isn’t lost(by default, if wanna
disable  use no_ack=True). But our tasks will still be lost if RabbitMQ
server stops.

When RabbitMQ quits or crashes it will forget the
queues and messages unless you tell it not to. Two things are
required to make sure that messages aren’t lost: we need to mark both
the queue and messages as durable.

First, we
need to make sure that RabbitMQ will never lose our queue. In order to
do so, we need to declare it as durable:

      1 channel.queue_declare(queue=’hello’,
durable=True)

Although this command is correct by
itself, it won’t work in our setup. That’s because we’ve already defined
a queue called hello which is not durable. RabbitMQ doesn’t allow you to redefine an
existing queue with different parameters and will return an
error(会曝错) to any program that tries to do that. But there is
a quick workaround – let’s declare a queue with different name, for
exampletask_queue:

      1
channel.queue_declare(queue=’task_queue’, durable=True)

This queue_declare change needs to be
applied to both the producer and consumer code.

At that point we’re sure that
the task_queue queue won’t be lost even if RabbitMQ restarts. Now we
need to mark our messages as persistent –
by supplying a delivery_mode property with a value 2.

      1
channel.basic_publish(exchange=”,
      2
                      routing_key=”task_queue”,
      3
                      body=message,
      4
                      properties=pika.BasicProperties(
      5
                         delivery_mode = 2,      # make message
persistent
      6
                      ))

上边的英文对新闻持久化讲得很好。新闻持久化分为两步:

  • 持久化队列。通过代码落成持久化hello队列:channel.queue_declare(queue=’hello’,
    durable=True)
  • 持久化队列中的新闻。通过代码完毕:properties=pika.BasicProperties( delivery_mode = 2, )

此处有个点要专注下:

壹经您在代码中已实现持久化hello队列与队列中的新闻。那么您重启rabbitMQ后再也运转代码大概会爆错!

因为: RabbitMQ doesn’t allow you to
redefine an existing queue with different parameters and will return an
error.

为了消除那个难题,能够声爱他美(Aptamil)个与重启rabbitMQ此前差别的行列名(queue_name).

 

壹、安装和主导使用

四、音讯公平分发

一旦Rabbit只管按梯次把新闻发到各样消费者身上,不思索消费者负载的话,很只怕出现,一个机器配置不高的顾客那里堆积了广大音信处理不完,同时配备高的主顾却向来很自在。为解决此题材,能够在挨家挨户消费者端,配置perfetch=1,意思正是报告RabbitMQ在自小编这一个消费者当前音讯还没处理完的时候就无须再给小编发新音信了。

亚洲必赢官网 41

 

带新闻持久化+公平分发的完整代码

劳动者端:

亚洲必赢官网 42亚洲必赢官网 43

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.queue_declare(queue='task_queue', durable=True)  #队列持久化
 9  
10 message = ' '.join(sys.argv[1:]) or"Hello World!"
11 channel.basic_publish(exchange='',
12                       routing_key='task_queue',
13                       body=message,
14                       properties=pika.BasicProperties(
15                          delivery_mode = 2, # make message persistent消息持久化
16                       ))
17 print(" [x] Sent %r" % message)
18 connection.close()

View Code

消费者端:

亚洲必赢官网 44亚洲必赢官网 45

 1 #!/usr/bin/env python
 2 import pika
 3 import time
 4  
 5 connection =pika.BlockingConnection(pika.ConnectionParameters(
 6         host='localhost'))
 7 channel = connection.channel()
 8  
 9 channel.queue_declare(queue='task_queue', durable=True)
10 print(' [*] Waiting for messages. To exit press CTRL+C')
11  
12 def callback(ch, method, properties, body):
13     print(" [x] Received %r" % body)
14     time.sleep(body.count(b'.'))
15     print(" [x] Done")
16     ch.basic_ack(delivery_tag =method.delivery_tag)   
17  
18 channel.basic_qos(prefetch_count=1)
19 channel.basic_consume(callback,
20                       queue='task_queue')
21  
22 channel.start_consuming()

View Code

作者在运维方面程序时对消费者端里回调函数的一句代码(ch.basic_ack(delivery_tag
=method.delivery_tag))1贰分吸引。那句代码去掉消费者端也能1如既往收到新闻啊。那句代码有毛线用处??

生产者端新闻持久后,需求在顾客端加上(ch.basic_ack(delivery_tag
=method.delivery_tag)): 保障新闻被消费后,消费端发送二个ack,然后服务端从队列删除该音讯.

 

安装RabbitMQ服务
 

伍、音信揭露与订阅

在此以前的例证都基本都是一对一的音信发送和接到,即音信只好发送到钦定的queue里,但有点时候你想让你的音讯被抱有的queue收到,类似广播的效果,那时候就要用到exchange了。PS:有趣味的问询redis的揭橥与订阅,能够看看笔者写的博客python之redis。

An exchange is a very simple thing. On
one side it receives messages from producers and the other side it
pushes them to queues. The exchange must know exactly what to do with a
message it receives. Should it be appended to a particular queue? Should
it be appended to many queues? Or should it get discarded(丢弃). The
rules for that are defined by the exchange type.

Exchange在概念的时候是有项目标,以决定到底是何等Queue符合条件,能够吸收新闻

 

fanout: 全体bind到此exchange的queue都得以接到音信

direct: 通过routingKey和exchange决定的12分唯①的queue勉强可以音信

topic:全体符合routingKey(此时得以是二个表明式)的routingKey所bind的queue能够收到音讯

 

表明式符号表明: #表示1个或七个字符,*意味着任何字符
     
    例:#.a会匹配a.a,aa.a,aaa.a等
               
*.a会匹配a.a,b.a,c.a等
          
 注:使用RoutingKey为#,Exchange
Type为topic的时候一定于选取fanout

 

下面小编分别讲下fanout,direct,topic:

1、fanout

fanout: 全部bind到此exchange的queue都得以接受新闻

亚洲必赢官网 46

send端:

亚洲必赢官网 47亚洲必赢官网 48

 1 import pika
 2 import sys
 3 
 4 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 5 channel=connection.channel()
 6 
 7 channel.exchange_declare(exchange='logs',
 8                       type='fanout')
 9 
10 message=''.join(sys.argv[1:])or"info:HelloWorld!"
11 channel.basic_publish(exchange='logs',
12                       routing_key='',  #fanout的话为空(默认)
13                       body=message)
14 print("[x]Sent%r"%message)
15 connection.close()

View Code

receive端:

亚洲必赢官网 49亚洲必赢官网 50

 1 import pika
 2 
 3 connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 4 channel=connection.channel()
 5 
 6 channel.exchange_declare(exchange='logs',type='fanout')
 7 
 8 #不指定queue名字(为了收广播),rabbit会随机分配一个queue名字,
 9 #exclusive=True会在使用此queue的消费者断开后,自动将queue删除
10 result=channel.queue_declare(exclusive=True)
11 queue_name=result.method.queue
12 
13 #把声明的queue绑定到交换器exchange上
14 channel.queue_bind(exchange='logs',
15                 queue=queue_name)
16 
17 print('[*]Waitingforlogs.ToexitpressCTRL+C')
18 
19 def callback(ch,method,properties,body):
20     print("[x]%r"%body)
21 
22 
23 channel.basic_consume(callback,
24                       queue=queue_name,
25                       no_ack=True)
26 
27 channel.start_consuming()

View Code

有五个点要留意下:

  • fanout-广播,send端的routing_key=”, #fanout的话为空(暗中同意)

  • receive端有一句代码:result=channel.queue_declare(exclusive=True),功用:不指定queue名字(为了收广播),rabbitMQ会随机分配3个queue名字,exclusive=True会在行使此queue的主顾断开后,自动将queue删除。

 

二、有选拔的吸收新闻(exchange
type=direct)

RabbitMQ还帮衬依据重点字发送,即:队列绑定关键字,发送者将数据依据重大字发送到音讯exchange,exchange依据 关键字
判定应该将数据发送至钦点队列。

亚洲必赢官网 51

send端:

亚洲必赢官网 52亚洲必赢官网 53

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localh'))ost
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 severity = sys.argv[1] iflen(sys.argv) > 1 else 'info'
12 message = ' '.join(sys.argv[2:]) or'Hello World!'
13 channel.basic_publish(exchange='direct_logs',
14                       routing_key=severity, #关键字不为空,告知消息发送到哪里(info,error~)
15                       body=message)
16 print(" [x] Sent %r:%r" % (severity, message))
17 connection.close()

View Code

receive端:

亚洲必赢官网 54亚洲必赢官网 55

 1 import pika
 2 import sys
 3  
 4 connection =pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='direct_logs',
 9                          type='direct')
10  
11 result =channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 severities = sys.argv[1:]
15 if not severities:
16     sys.stderr.write("Usage: %s [info] [warning] [error]\n" %sys.argv[0])
17     sys.exit(1)
18  
19 for severity in severities:
20     channel.queue_bind(exchange='direct_logs',
21                        queue=queue_name,
22                        routing_key=severity)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" %(method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

实在最初阶小编看代码是一脸懵逼的~
下边是本身在cmd进行测试的截图(合作着截图看会不难精通些),二个send端,多个receive端(先起receive端,再起receive端):

send端:

亚洲必赢官网 56

receive端-1:

亚洲必赢官网 57

receive端-2:

亚洲必赢官网 58

 

三、更周详的新闻过滤topic(供参考)

Although using the direct exchange
improved our system, it still has limitations – it can’t do routing
based on multiple criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix tool, which routes logs based on both severity
(info/warn/crit…) and facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’.

感觉自笔者英文水准不高啊~,笔者相比着垃圾有道翻译,加上本人的明白,大约知道地点在讲怎么。

举例:
假设是系统的荒唐,就把消息发送到A,要是是MySQL的谬误,就把信息发送到B。但是对B来说,想达成接收MySQL的错误音信,能够用有取舍的接受信息(exchange type=direct),让主要字为error就贯彻了啊!今后B有个供给:不是兼具的错误音讯都吸收接纳,只接到钦定的一无是处。在某种消息再开始展览过滤,那正是更加细致的新闻过滤topic。

 

send端:

亚洲必赢官网 59亚洲必赢官网 60

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')  #类型为topic
10  
11 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
12 message = ' '.join(sys.argv[2:]) or 'Hello World!'
13 channel.basic_publish(exchange='topic_logs',
14                       routing_key=routing_key,
15                       body=message)
16 print(" [x] Sent %r:%r" % (routing_key, message))
17 connection.close()

View Code

receive端:

亚洲必赢官网 61亚洲必赢官网 62

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='topic_logs',
 9                          type='topic')
10  
11 result = channel.queue_declare(exclusive=True)
12 queue_name = result.method.queue
13  
14 binding_keys = sys.argv[1:]
15 if not binding_keys:
16     sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
17     sys.exit(1)
18  
19 for binding_key in binding_keys:
20     channel.queue_bind(exchange='topic_logs',
21                        queue=queue_name,
22                        routing_key=binding_key)
23  
24 print(' [*] Waiting for logs. To exit press CTRL+C')
25  
26 def callback(ch, method, properties, body):
27     print(" [x] %r:%r" % (method.routing_key, body))
28  
29 channel.basic_consume(callback,
30                       queue=queue_name,
31                       no_ack=True)
32  
33 channel.start_consuming()

View Code

 

 

python安装RabbitMQ模块

六、RPC(Remote Procedure Call)

帕杰罗PC的定义可看小编百度的(其实就就好像小编此前做的FTP,笔者从客户端发多个指令,服务端重回相关音讯):

亚洲必赢官网 63亚洲必赢官网 64

RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。

View Code

上边重点讲下陆风X八PC通讯,作者刚早先学挺难的,学完之后感觉奥迪Q伍PC通讯的思考很有启发性,代码的例证写得也很牛!!

亚洲必赢官网 65

client端发的音讯被server端接收后,server端会调用callback函数,执行职务后,还亟需把相应的新闻发送到client,不过server怎样将音讯发还给client?借使有四个client连接server,server又怎么明白是要发放哪个client??

GL450PC-server暗中同意监听rpc_queue.肯定不能够把要发给client端的音信发到rpc_queue吧(rpc_queue是监听client端发到server端的数额)。

理所当然的方案是server端另起多个queue,通过queue将音讯再次回到给对应client。但难题又来了,queue是server端起的,故client端肯定不精晓queue_name,连queue_name都不理解,client端接收毛线的多少??

消除措施:

客户端在发送指令的还要告诉服务端:职责履行完后,数据通过某队列重回结果。客户端监听该队列就OK了。

client端:

 1 import pika
 2 import uuid
 3 
 4 
 5 class FibonacciRpcClient(object):
 6     def __init__(self):
 7         self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
 8 
 9         self.channel = self.connection.channel()
10         #随机建立一个queue,为了监听返回的结果
11         result = self.channel.queue_declare(exclusive=True)
12         self.callback_queue = result.method.queue   ##队列名
13 
14         self.channel.basic_consume(self.on_response,  #一接收客户端发来的指令就调用回调函数on_response
15                                    no_ack=True,
16                                    queue=self.callback_queue)
17 
18     def on_response(self, ch, method, props, body):  #回调
19         #每条指令执行的速度可能不一样,指令1比指令2先发送,但可能指令2的执行结果比指令1先返回到客户端,
20         #此时如果没有下面的判断,客户端就会把指令2的结果误认为指令1执行的结果
21         if self.corr_id == props.correlation_id:
22             self.response = body
23 
24     def call(self, n):
25         self.response = None    ##指令执行后返回的消息
26         self.corr_id = str(uuid.uuid4())   ##可用来标识指令(顺序)
27         self.channel.basic_publish(exchange='',
28                                    routing_key='rpc_queue', #client发送指令,发到rpc_queue
29                                    properties=pika.BasicProperties(
30                                        reply_to=self.callback_queue, #将指令执行结果返回到reply_to队列
31                                        correlation_id=self.corr_id,
32                                    ),
33                                    body=str(n))
34         while self.response is None:
35             self.connection.process_data_events() #去queue接收数据(不阻塞)
36         return int(self.response)
37 
38 
39 fibonacci_rpc = FibonacciRpcClient()
40 
41 print(" [x] Requesting fib(30)")
42 response = fibonacci_rpc.call(30)
43 print(" [.] Got %r" % response)

server端:

 1 import pika
 2 import time
 3 
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5     host='localhost'))
 6 
 7 channel = connection.channel()
 8 
 9 channel.queue_declare(queue='rpc_queue')
10 
11 
12 def fib(n):
13     if n == 0:
14         return 0
15     elif n == 1:
16         return 1
17     else:
18         return fib(n - 1) + fib(n - 2)
19 
20 
21 def on_request(ch, method, props, body):
22     n = int(body)
23 
24     print(" [.] fib(%s)" % n)
25     response = fib(n)  #从客户端收到的消息
26 
27     ch.basic_publish(exchange='',   ##服务端发送返回的数据到props.reply_to队列(客户端发送指令时声明)
28                      routing_key=props.reply_to,  #correlation_id (随机数)每条指令都有随机独立的标识符
29                      properties=pika.BasicProperties(correlation_id= \
30                                                          props.correlation_id),
31                      body=str(response))
32     ch.basic_ack(delivery_tag=method.delivery_tag)  #客户端持久化
33 
34 
35 channel.basic_qos(prefetch_count=1)  #公平分发
36 channel.basic_consume(on_request,    #一接收到消息就调用on_request
37                       queue='rpc_queue')
38 
39 print(" [x] Awaiting RPC requests")
40 channel.start_consuming()

 

转载表明出处: 

pip install pika
or
easy_install pika
or
源码

https://pypi.python.org/pypi/pika

2、达成最简易的行列通讯

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)   #接受到消息后不返回ack,无论本地是否处理完消息都会在队列中消失
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

注:windows连linux上的rabbitMQ会产出报错,供给提供用户名密码

叁、RabbitMQ音讯分发轮询

先运维新闻生产者,然后再分别运营2个买主,通过生产者多发送几条音讯,你会发现,这几条新闻会被每种分配到各种消费者身上

亚洲必赢官网 66

 

在那种形式下,RabbitMQ会默许把p发的新闻公平的相继分发给各样消费者(c),跟负载均衡大概

亚洲必赢官网 67亚洲必赢官网 68

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

亚洲必赢官网 69亚洲必赢官网 70

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print(ch,method,properties)
    #ch:<pika.adapters.blocking_connection.BlockingChannel object at 0x002E6C90>    管道内存对象地址
    #methon:<Basic.Deliver(['consumer_tag=ctag1.03d155a851b146f19cee393ff1a7ae38',   #具体信息
            # 'delivery_tag=1', 'exchange=', 'redelivered=False', 'routing_key=lzl'])>
    #properties:<BasicProperties>
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

因此履行pubulish.py和consume.py能够达成地方的新闻公平分发,那假使c一收取音信随后宕机了,会并发哪些状态呢?rabbitMQ是何许处理的?今后我们模拟一下

亚洲必赢官网 71亚洲必赢官网 72

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

亚洲必赢官网 73亚洲必赢官网 74

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      no_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

在consume.py的callback函数里扩充了time.sleep模拟函数处理,通过地点程序开始展览模拟发现,c壹收受到新闻后尚未处理完突然宕机,消息就从队列上海消防灭了,rabbitMQ把新闻删除掉了;假设程序供给讯息必要求处理完才能从队列里删除,那我们就必要对程序开始展览处理一下

亚洲必赢官网 75亚洲必赢官网 76

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl')    #声明queue队列

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!'
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

publish.py

亚洲必赢官网 77亚洲必赢官网 78

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl')

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    #time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

经过把consume.py接收端里的no_ack``=``True去掉之后并在callback函数里面添加ch.basic_ack(delivery_tag ``= method.delivery_tag,就能够完成音信不被拍卖完不可能在队列里清除

翻看音讯队列数:

亚洲必赢官网 79

4、新闻持久化

如果音讯在传输进度中rabbitMQ服务器宕机了,会发觉在此之前的消息队列就不设有了,这时大家就要用到新闻持久化,音信持久化会让队列不趁早服务器宕机而化为乌有,会永远的保留下来

发送端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

接收端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

五、音讯公平分发

假诺Rabbit只管按梯次把音信发到种种消费者身上,不考虑消费者负载的话,很恐怕出现,三个机器配置不高的主顾那里堆积了过多新闻处理不完,同时布署高的消费者却平素很自在。为缓解此题材,能够在一一消费者端,配置perfetch=壹,意思便是报告RabbitMQ在我这些消费者当前音信还没处理完的时候就毫无再给作者发新消息了

亚洲必赢官网 80

channel.basic_qos(prefetch_count=1)

带消息持久化+公平分发

亚洲必赢官网 81亚洲必赢官网 82

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()      #声明一个管道(管道内发消息)

channel.queue_declare(queue='lzl',durable=True)    #队列持久化

channel.basic_publish(exchange='',
                      routing_key='lzl',  #routing_key 就是queue名
                      body='Hello World!',
                      properties=pika.BasicProperties(
                          delivery_mode = 2     #消息持久化
                      )
)
print("Sent 'Hello,World!'")
connection.close()      #关闭

pubulish.py

亚洲必赢官网 83亚洲必赢官网 84

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika,time

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel = connection.channel()

channel.queue_declare(queue='lzl',durable=True)

def callback(ch,method,properties,body):
    print("->>",ch,method,properties)
    time.sleep(15)              # 模拟处理时间
    print("Received %r"%body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,     #如果收到消息,就调用callback函数处理消息
                      queue="lzl",
                      )
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()   #开始收消息

consume.py

6、Publish\Subscribe(新闻发布\订阅) 

前边的例子都基本都是1对一的音讯发送和收取,即新闻只可以发送到钦赐的queue里,但多少时候你想让你的音讯被有着的Queue收到,类似广播的功用,那时候就要用到exchange了,

An exchange is a very simple thing. On one
side it receives messages from producers and the other side it pushes
them to queues. The exchange must know exactly what to do with a message
it receives. Should it be appended to a particular queue? Should it be
appended to many queues? Or should it get discarded. The rules for that
are defined by the exchange type.

Exchange在概念的时候是有档次的,以控制到底是何等Queue符合条件,能够吸收接纳音信

fanout: 全数bind到此exchange的queue都能够收起音讯
direct: 通过routingKey和exchange决定的丰富唯一的queue能够接收音讯
topic:全部符合routingKey(此时得以是三个表明式)的routingKey所bind的queue能够吸收音讯

headers: 通过headers
来决定把新闻发给哪些queue

表明式符号表明:#代表一个或八个字符,*表示任何字符

     
 例:#.a会匹配a.a,aa.a,aaa.a等
           
*.a会匹配a.a,b.a,c.a等
注:使用RoutingKey为#,Exchange
Type为topic的时候一定于采纳fanout 

一fanout接到全体广播:广播表示近来新闻是实时的,假设未有多少个消费者在经受消息,音信就会放弃,在此间消费者的no_ack已经无用,因为fanout不会管你处理音信停止未有,发过的新闻不会重发,记住广播是实时的

亚洲必赢官网 85

 

亚洲必赢官网 86亚洲必赢官网 87

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',   #广播不用声明queue
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

publish.py

亚洲必赢官网 88亚洲必赢官网 89

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)  # 不指定queue名字,rabbit会随机分配一个名字,
                                                # exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue

channel.queue_bind(exchange='logs',         # 绑定转发器,收转发器上面的数据
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()

consume.py

二有采用的收纳音讯 direct:
 同fanout壹样,
no_ack在此要设置为True,不然队列里多少不会清空(纵然也不会重发)**

RabbitMQ还帮忙根据重点字发送,即:队列绑定关键字,发送者将数据依据首要字发送到新闻exchange,exchange根据关键字 判定应该将数据发送至钦赐队列

亚洲必赢官网 90

 

亚洲必赢官网 91亚洲必赢官网 92

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

publish.py

亚洲必赢官网 93亚洲必赢官网 94

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)

for severity in severities:
    channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key=severity)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py

③更加细致的音讯过滤 topic:

Although using the direct exchange improved our system, it still has
limitations – it can’t do routing based on multiple
criteria.

In our logging system we might want to
subscribe to not only logs based on severity, but also based on the
source which emitted the log. You might know this concept from
the syslog unix
tool, which routes logs based on both severity (info/warn/crit…) and
facility (auth/cron/kern…).

That would give us a lot of flexibility –
we may want to listen to just critical errors coming from ‘cron’ but
also all logs from ‘kern’

亚洲必赢官网 95

亚洲必赢官网 96亚洲必赢官网 97

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
                      routing_key=routing_key,
                      body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

publish.py

亚洲必赢官网 98亚洲必赢官网 99

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',
                         type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',
                       queue=queue_name,
                       routing_key=binding_key)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

consume.py

 

SportagePC(Remote procedure
call )双向通讯

To illustrate how an RPC service could be
used we’re going to create a simple client class. It’s going to expose a
method named call which sends an RPC request and
blocks until the answer is received:

亚洲必赢官网 100

rpc client:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian

import pika
import uuid,time


class FibonacciRpcClient(object):
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))

        self.channel = self.connection.channel()

        result = self.channel.queue_declare(exclusive=True)
        self.callback_queue = result.method.queue

        self.channel.basic_consume(self.on_response, #只要收到消息就执行on_response
                                   no_ack=True,     #不用ack确认
                                   queue=self.callback_queue)

    def on_response(self, ch, method, props, body):
        if self.corr_id == props.correlation_id:    #验证码核对
            self.response = body


    def call(self, n):
        self.response = None
        self.corr_id = str(uuid.uuid4())
        print(self.corr_id)
        self.channel.basic_publish(exchange='',
                                   routing_key='rpc_queue',
                                   properties=pika.BasicProperties(
                                       reply_to=self.callback_queue,    #发送返回信息的队列name
                                       correlation_id=self.corr_id,     #发送uuid 相当于验证码
                                   ),
                                   body=str(n))
        while self.response is None:
            self.connection.process_data_events()   #非阻塞版的start_consuming
            print("no messages")
            time.sleep(0.5)     #测试
        return int(self.response)


fibonacci_rpc = FibonacciRpcClient()    #实例化
print(" [x] Requesting fib(30)")
response = fibonacci_rpc.call(30)       #执行call方法
print(" [.] Got %r" % response)

rpc server:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
#-Author-Lian
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))

channel = connection.channel()

channel.queue_declare(queue='rpc_queue')


def fib(n):
    if n == 0:
        return 0
    elif n == 1:
        return 1
    else:
        return fib(n - 1) + fib(n - 2)


def on_request(ch, method, props, body):
    n = int(body)

    print(" [.] fib(%s)" % n)
    response = fib(n)

    ch.basic_publish(exchange='',
                     routing_key=props.reply_to,    #回信息队列名
                     properties=pika.BasicProperties(correlation_id=
                                                         props.correlation_id),
                     body=str(response))
    ch.basic_ack(delivery_tag=method.delivery_tag)


#channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request,
                      queue='rpc_queue')

print(" [x] Awaiting RPC requests")
channel.start_consuming()

 

网站地图xml地图