关于 asyncio 执行 IO 密集型操作的不解
資深大佬 : firejoke 15
有一个读文件然后写数据库的操作,想尝试使用协程。
使用协程的:
async def parse_text(file_path: Path, context_qs: [asyncio.Queue]): ql = len(context_qs) i = 0 # 每一个 Queue 放 step 个数据就切换下一个 step = 2 with open(file_path, encoding="utf8") as f: for text in f: if i // step == ql: i = 0 context_q = context_qs[i // step] context = {} text = re.findall(r"d+", text) if text: context = {"解析然后组装成 dict"} await context_q.put(context) # 这里如果不 join ,会一直在这个 for 循环里不出去 await context_q.join() i = i + 1 else: await context_q.put("结束标记") return async def write_db(context_q: asyncio.Queue, model: ModelBase): async with AsyncSession() as session: while 1: context = await context_q.get() if context["结束标记"] == "end": return info, obj = None, None try: if context["info"]: info = await session.execute( select(InfoModel).filter( InfoModel.attr == context["info"] ) ) info = info.scalars().one_or_none() if not info: info = InfoModel(attr=context["info"]) session.add(info) if context["header"]: obj = await session.execute( select(model).filter( model.header == context["header"] ).options(selectinload(getattr(model, "info"))) ) obj = obj.scalars().one_or_none() if not obj: obj = model(header=context["header"]) session.add(obj) if obj or info: if info not in obj.info: obj.info.append(info) session.add(obj) await session.commit() except Exception as e: await session.rollback() raise e else: context_q.task_done() async def main(): # 每个读取文件并解析的方法对应 c_q_count 个写数据库的方法 c_q_count = 3 a_context_qs = [asyncio.Queue() for i in range(c_q_count)] b_context_qs = [asyncio.Queue() for i in range(c_q_count)] tasks = [ asyncio.create_task( parse_text(Path("a.txt"), a_context_qs) ), asyncio.create_task( parse_text(Path("b.txt"), b_context_qs) ), ] for i in range(c_q_count): tasks.append(asyncio.create_task(write_db(a_context_qs[i], AModel))) tasks.append(asyncio.create_task(write_db(b_context_qs[i], BModel))) await asyncio.gather(*tasks) if __name__ == '__main__': asyncio.run(main(), debug=settings.DEBUG)
不使用协程的:
def sync_read_file(): af = Path("a.txt").open(encoding="utf8") bf = Path("b.txt").open(encoding="utf8") with Session() as session: while 1: if af: try: text = af.readline() context = parse_text(text) sync_write_db(session, context, AModel) except IOError: af.close() af = None if bf: try: text = bf.readline() context = parse_text(text) sync_write_db(session, context, BModel) except IOError: bf.close() bf = None if not af and not bf: return def sync_write_db(session, context, model): info, obj = None, None try: if context["info"]: info = session.execute( select(Info).filter( Info.attr == context["info"] ) ) info = info.scalars().one_or_none() if not info: info = Info(attr=context["info"]) session.add(info) if context["header"]: obj = session.execute( select(model).filter(model.info == context["info"])) obj = obj.scalars().one_or_none() if not obj: obj = model(info=context["info"]) session.add(obj) if obj or info: if info not in obj.info: obj.info.append(info) session.add(obj) session.commit() except Exception as e: session.rollback() raise e if __name__ == '__main__': sync_read_file()
这个协程的方法,每秒每个表可以写 400 多行,改为同步单线程的还是每秒写 400 多行。
不知道是我协程的用法有问题?还是说有别的什么原因?
大佬有話說 (35)