import asyncio import aiomysql offset = 0 save_records = 0 total = 0 read_done = False async def get_total_count(conn, query): async with conn.cursor() as cursor: await cursor.execute(f"SELECT COUNT(*) FROM {query}") result = await cursor.fetchone() total_count = result[0] return total_count async def get_last_id(conn, name): async with conn.cursor() as cursor: await cursor.execute(f"SELECT lastId FROM _sync_data WHERE name = %s", (name,)) result = await cursor.fetchone() last_id = result[0] if result else 0 return last_id async def update_last_id(conn, name, last_id): async with conn.cursor() as cursor: await cursor.execute( "INSERT INTO _sync_data (name, lastId) VALUES (%s, %s) " "ON DUPLICATE KEY UPDATE lastId = %s", (name, last_id, last_id) ) await conn.commit() async def get_datas(con, lastid, batch_size): async with con.cursor(aiomysql.DictCursor) as cursor: await cursor.execute( "SELECT id, gaid FROM gaid_ios WHERE id > %s ORDER BY id LIMIT %s", (lastid, batch_size) ) rows = await cursor.fetchall() return rows async def save_datas(con, rows): async with con.cursor() as cursor: values = [(row['gaid'],) for row in rows] await cursor.executemany( "INSERT INTO gaid_ios (gaid, country, created) VALUES (%s, 'US', now()) on duplicate key update country='US'", values ) await con.commit() async def main(): source_config = { 'host': '183.222.62.178', 'port': 2400, 'user': 'root', 'password': 'Xwj5FhM8cTuEuXbS', 'db': 'lux_ad_gaid', 'charset': 'utf8mb4', } target_config = { 'host': '183.222.62.53', 'port': 54321, 'user': 'root', 'password': 'W3*Cry56f-^9_miq10', 'db': 'secret', 'charset': 'utf8mb4', } global total source_pool = await aiomysql.create_pool(**source_config) target_pool = await aiomysql.create_pool(**target_config) source_con = await source_pool.acquire() target_con = await target_pool.acquire() total = await get_total_count(source_con, "gaid_ios") lastid = await get_last_id(target_con, "lux_gaid_ios") print(f"总计需要同步 {total} 条数据") while True: global offset, save_records, read_done rows = await get_datas(source_con, lastid, 1000) if not rows: print("数据读取完毕") break await save_datas(target_con, rows) offset += len(rows) lastid = rows[-1]['id'] save_records = await get_total_count(source_con, f"gaid_ios where id < '{lastid}'") await update_last_id(target_con, "lux_gaid_ios", lastid) print(f"已同步 {save_records}/{total} 条数据,当前ID:{lastid}") source_pool.close() target_pool.close() await source_pool.wait_closed() await target_pool.wait_closed() if __name__ == "__main__": asyncio.run(main())