import pika import json import pymysql credentials = pika.PlainCredentials('admin', '123456') # mq用户名和密码 # 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.40.7',port = 5672,virtual_host = 'test',credentials = credentials)) channel = connection.channel() # 申明消息队列,消息在这个队列传递,如果不存在,则创建队列 channel.queue_declare(queue = 'ios-idfa', durable=True) db = pymysql.connect(host='192.168.40.214', port=4000, user='root', password='Xwj5FhM8cTuEuXbS', database='secret') def insert_to_db(idfa): # 连接到MySQL数据库 cur = db.cursor() cur.executemany("insert ignore into gaid_ios(gaid, country, created) value (%s, 'US', now())", idfa) db.commit() cur.close() # 定义一个回调函数来处理消息队列中的消息,这里是打印出来 def callback(ch, method, properties, body): jsStr = body.decode() data = json.loads(jsStr) idfas = [(it['idfa'],) for it in data] print(idfas) insert_to_db(idfas) ch.basic_ack(delivery_tag = method.delivery_tag) # 告诉rabbitmq,用callback来接收消息 channel.basic_consume('ios-idfa',callback) # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理 channel.start_consuming()