32 lines
1.4 KiB
Python
32 lines
1.4 KiB
Python
import pika
|
||
import json
|
||
import pymysql
|
||
|
||
credentials = pika.PlainCredentials('admin', '123456') # mq用户名和密码
|
||
# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。
|
||
connection = pika.BlockingConnection(pika.ConnectionParameters(host = '192.168.40.7',port = 5672,virtual_host = 'test',credentials = credentials))
|
||
|
||
channel = connection.channel()
|
||
# 申明消息队列,消息在这个队列传递,如果不存在,则创建队列
|
||
channel.queue_declare(queue = 'ios-idfa', durable=True)
|
||
db = pymysql.connect(host='192.168.40.214', port=4000, user='root', password='Xwj5FhM8cTuEuXbS', database='secret')
|
||
def insert_to_db(idfa):
|
||
# 连接到MySQL数据库
|
||
cur = db.cursor()
|
||
cur.executemany("insert ignore into gaid_ios(gaid, country, created) value (%s, 'US', now())", idfa)
|
||
db.commit()
|
||
cur.close()
|
||
|
||
# 定义一个回调函数来处理消息队列中的消息,这里是打印出来
|
||
def callback(ch, method, properties, body):
|
||
jsStr = body.decode()
|
||
data = json.loads(jsStr)
|
||
idfas = [(it['idfa'],) for it in data]
|
||
print(idfas)
|
||
insert_to_db(idfas)
|
||
ch.basic_ack(delivery_tag = method.delivery_tag)
|
||
|
||
# 告诉rabbitmq,用callback来接收消息
|
||
channel.basic_consume('ios-idfa',callback)
|
||
# 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理
|
||
channel.start_consuming() |