python RabbitMq的订阅
【test_RabbitMq1.py】
import pika import json credentials = pika.PlainCredentials('test', 'test') # mq用户名和密码 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '42.192.14.149',port = 5672,virtual_host = '/test',credentials = credentials)) channel=connection.channel() # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout') for i in range(2000): message=json.dumps({'OrderId':"1000%s"%i}) # 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置 channel.basic_publish(exchange = 'python-test',routing_key = '',body = message, properties=pika.BasicProperties(delivery_mode = 2)) print(message) connection.close()
【test_RabbitMq2.py】
import pika credentials = pika.PlainCredentials('test', 'test') # mq用户名和密码 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '42.192.14.149',port = 5672,virtual_host = '/test',credentials = credentials)) channel = connection.channel() # 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除 result = channel.queue_declare('',exclusive=True) # 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储 channel.exchange_declare(exchange = 'python-test',durable = True, exchange_type='fanout') # 绑定exchange和队列 exchange 使我们能够确切地指定消息应该到哪个队列去 channel.queue_bind(exchange = 'python-test',queue = result.method.queue) # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): ch.basic_ack(delivery_tag = method.delivery_tag) print(body.decode()) channel.basic_consume(result.method.queue,callback,# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉 auto_ack = False) channel.start_consuming()