跳至主要內容
  • Hostloc 空間訪問刷分
  • 售賣場
  • 廣告位
  • 賣站?

4563博客

全新的繁體中文 WordPress 網站
  • 首頁
  • 关于 asyncio 执行 IO 密集型操作的不解
未分類
26 12 月 2021

关于 asyncio 执行 IO 密集型操作的不解

关于 asyncio 执行 IO 密集型操作的不解

資深大佬 : firejoke 13

有一个读文件然后写数据库的操作,想尝试使用协程。
使用协程的:

async def parse_text(file_path: Path, context_qs: [asyncio.Queue]):     ql = len(context_qs)     i = 0     # 每一个 Queue 放 step 个数据就切换下一个     step = 2     with open(file_path, encoding="utf8") as f:         for text in f:             if i // step == ql:                 i = 0             context_q = context_qs[i // step]             context = {}             text = re.findall(r"d+", text)             if text:                 context = {"解析然后组装成 dict"}                 await context_q.put(context)                 # 这里如果不 join ,会一直在这个 for 循环里不出去                 await context_q.join()                 i = i + 1         else:             await context_q.put("结束标记")             return   async def write_db(context_q: asyncio.Queue, model: ModelBase):     async with AsyncSession() as session:         while 1:             context = await context_q.get()             if context["结束标记"] == "end":                 return             info, obj = None, None             try:                 if context["info"]:                     info = await session.execute(                         select(InfoModel).filter(                             InfoModel.attr == context["info"]                         )                     )                     info = info.scalars().one_or_none()                     if not info:                         info = InfoModel(attr=context["info"])                         session.add(info)                 if context["header"]:                     obj = await session.execute(                         select(model).filter(                             model.header == context["header"]                         ).options(selectinload(getattr(model, "info")))                     )                     obj = obj.scalars().one_or_none()                     if not obj:                         obj = model(header=context["header"])                         session.add(obj)                 if obj or info:                     if info not in obj.info:                         obj.info.append(info)                         session.add(obj)                     await session.commit()             except Exception as e:                 await session.rollback()                 raise e             else:                 context_q.task_done()   async def main():  # 每个读取文件并解析的方法对应 c_q_count 个写数据库的方法     c_q_count = 3     a_context_qs = [asyncio.Queue() for i in range(c_q_count)]     b_context_qs = [asyncio.Queue() for i in range(c_q_count)]     tasks = [         asyncio.create_task(             parse_text(Path("a.txt"), a_context_qs)         ),         asyncio.create_task(             parse_text(Path("b.txt"), b_context_qs)         ),     ]     for i in range(c_q_count):         tasks.append(asyncio.create_task(write_db(a_context_qs[i], AModel)))         tasks.append(asyncio.create_task(write_db(b_context_qs[i], BModel)))     await asyncio.gather(*tasks)    if __name__ == '__main__':     asyncio.run(main(), debug=settings.DEBUG)  

不使用协程的:

def sync_read_file():     af = Path("a.txt").open(encoding="utf8")     bf = Path("b.txt").open(encoding="utf8")     with Session() as session:         while 1:             if af:                 try:                     text = af.readline()                     context = parse_text(text)                     sync_write_db(session, context, AModel)                 except IOError:                     af.close()                     af = None             if bf:                 try:                     text = bf.readline()                     context = parse_text(text)                     sync_write_db(session, context, BModel)                 except IOError:                     bf.close()                     bf = None             if not af and not bf:                 return   def sync_write_db(session, context, model):     info, obj = None, None     try:         if context["info"]:             info = session.execute(                 select(Info).filter(                     Info.attr == context["info"]                 )             )             info = info.scalars().one_or_none()             if not info:                 info = Info(attr=context["info"])                 session.add(info)         if context["header"]:             obj = session.execute(                 select(model).filter(model.info == context["info"]))             obj = obj.scalars().one_or_none()             if not obj:                 obj = model(info=context["info"])                 session.add(obj)         if obj or info:             if info not in obj.info:                 obj.info.append(info)                 session.add(obj)             session.commit()     except Exception as e:         session.rollback()         raise e   if __name__ == '__main__':     sync_read_file()  

这个协程的方法,每秒每个表可以写 400 多行,改为同步单线程的还是每秒写 400 多行。
不知道是我协程的用法有问题?还是说有别的什么原因?

大佬有話說 (35)

  • 資深大佬 : long2ice

    试试 aiofiles 之类的,你的文件 IO 还是同步的

  • 資深大佬 : Trim21

    with open(file_path, encoding=”utf8″) as f:
    for text in f:
    这两行都是阻塞的

  • 主 資深大佬 : firejoke

    @long2ice #1
    @Trim21 #2
    文件这里只是读取,然后放进队列里,这也会导致阻塞吗?

  • 資深大佬 : Trim21

    @firejoke #3 同步 io 会阻塞掉整个事件循环。

  • 主 資深大佬 : firejoke

    @Trim21 #4 所以,也会导致如果不主动用队列的 join 阻塞住,就不会跳到其他 await 的地方?

  • 資深大佬 : Nitroethane

    @firejoke 如果读取文件速度比较慢,而且文件比较大的话影响应该比较明显

  • 主 資深大佬 : firejoke

    @Nitroethane #6 每行数据小于 1kb ,而且是用的 for ,这里相当于一个生成器

  • 資深大佬 : Trim21

    @firejoke #5 不是,你的代码中仅仅会阻塞在 open 和 for text in f 这两行。在等待这两行底层的同步 io 完成的时间里是不会运行其他 task 的。

  • 主 資深大佬 : firejoke

    @Trim21 #8 我改成了 asyncfiles ,然后把队列的 join 去掉了,这次成功跳到了其他 await 的位置,确实如你所说,感谢!
    但测试发现,虽然没了 io 的阻塞,但写入速度还是没太大变化,他每读一行,切到其他 task ,和我之前没读一行,join 住,就执行流程来说,是不是没差?

  • 資深大佬 : Trim21

    我没仔细看完整的代码,只是看到一开始就有同步阻塞的问题就回复了。

  • 資深大佬 : locoz

    目测是正则导致的阻塞…有一说一你这种情况不太适合用 asyncio ,或者说不太适合没有包上隐式多进程的 asyncio ,毕竟不是纯粹的 IO 操作。然后文件操作方面 aiofiles 实际背后也是靠线程池跑的,这一点需要注意一下,有时候可能会导致踩坑。

  • 資深大佬 : documentzhangx66

    先监视一下设备性能极限。
    iostat -x -m -d 1

  • 資深大佬 : LeeReamond

    大概看了一眼上说的应该没问题,并非所有类型的任务都能通过异步加速,你要做好心理准备。另外 aiofiles 的实现其实很丑陋。。上说是线程池跑的,我有点忘记具体情况了,只记得以前读源码的印象是很丑陋。。

  • 資深大佬 : Contextualist

    看上去没有明显的问题,不过对于任何为了改进性能的重写建议还是先 profile 一下,看看瓶颈到底出在哪个调用上。
    然后异步文件 IO 不是为了提升性能(降低平均延迟)的,而是为了降低尾延迟的,参见: https://trio.readthedocs.io/en/stable/reference-io.html#background-why-is-async-file-i-o-useful-the-answer-may-surprise-you

  • 資深大佬 : 2i2Re2PLMaDnghL

    (我会尝试先把所有信息读进内存然后 timeit 数据库部分,看瓶颈是不是文件

  • 資深大佬 : lesismal

    不是说给函数加上异步就是一切都异步了:
    1. 异步的函数 A
    2. A 内部调用 B C D ,B C D 有任意同步阻塞的行为,A 也一样跟着阻塞
    py 的性能痛点远不只是 asyncio 就能解决的了的,how about trying golang -_-

  • 主 資深大佬 : firejoke

    @locoz #11 我也感觉似乎没发挥出 asyncio 的优势,每一条数据都不超过 1kb ,所以可能除了数据库操作稍微耗时长一点,其他地方等待的很少,所以和单线程的性能差不多?另外请教一下,“没有包上隐式多进程” 具体是指什么呢?

  • 主 資深大佬 : firejoke

    @documentzhangx66 #12 设备性能应该没问题,12 核 24 线程,64G 内存,磁盘读取速度也没有跑满,IO 读写也不是特别高。

  • 主 資深大佬 : firejoke

    @LeeReamond #13 嗯,我昨天也想了一下,如果每一步阻塞住的操作实际上都很快,那 asyncio 其实发挥不出切换等待的优势。

  • 資深大佬 : locoz

    @firejoke #17 建议用调试工具或者排除法看看具体是哪里拖慢了,单看代码和前面的讨论我感觉是正则部分导致的。
    前面没讲清楚,“包上隐式多进程的 asyncio”指的是把多进程和协程结合,开一堆子进程然后每个子进程一个 eventloop ,因为之前有看到过一个专门的库把这部分操作给隐式处理了,使用起来两三行搞定,不需要自己写进程管理部分。然后一些框架其实也会隐式地做这种结合处理来提高效率。

  • 主 資深大佬 : firejoke

    @Contextualist #14 看文档的意思,是说用异步文件 IO ,在从内存读取时反倒会变慢,在从磁盘读取的时候会加快,在不同环境下其结果是不可预测的。那我如果单独用一个进程读取文件到内存,然后另一个进程从内存读取然后再操作,应该可以绕开这个问题。

  • 主 資深大佬 : firejoke

    @locoz #20 我昨天最后也是改成用多进程了,一个进程专门读文件,然后放进队列,其他子进程从队列读,然后操作数据库,那看来我思路没跑偏。还有其他的解法吗?多进程和协程的结合,一般都是以多进程为主吗?

  • 資深大佬 : Contextualist

    又看了一下你贴出来文件的部分,你是不是就两个大文件(就是说不是大量小文件),那文件 IO 就基本不可能是你的瓶颈,你看到磁盘读取没跑满很有可能是你下游的处理速度没跟上。
    多进程和协程,感觉你自己也总结出来了。协程得用在有长时间等待系统调用 (syscall) 的地方(比如网络、子进程、定时任务)。CPU 密集的操作得用多线程或多进程,但在 Python 里有 GIL ,就只能用多进程。

  • 主 資深大佬 : firejoke

    @Contextualist #23 是的,就是两个大文件,所以我也觉得文件 IO 不是我这里的瓶颈,协程在这个场景中没体现出他的优势,我已经改成了多进程了。

  • 資深大佬 : Contextualist

    我对数据库不熟,不过我猜对于很多数据库并发写是不会有性能提升的,用单线程就可以了,但你可能需要 batch / bulk 操作,用来一次性插入数十条、数百条数据,而不是一次插入一条。

  • 資深大佬 : O5oz6z3

    虽然不懂,看完上感觉原因之一在于 asyncio 的上限就是单线程,而单线程吞吐量不如多线程?

  • 主 資深大佬 : firejoke

    @Contextualist #25 对欸!资源是消耗在每一条查询和写入的操作上,如果批量写,就可以降低写入频率,至于查询,我已经在查询字段上加了索引,我改一下试试。感谢~
    然后我看到你之前提到的 trio ,看他的文档像是涉及到异步操作的都有涉及,感觉非常不错啊。

  • 主 資深大佬 : firejoke

    @O5oz6z3 #26 不是,当不存在较长的 io 等待的时候,协程和单线程没差。

  • 資深大佬 : yufpga

    大概看了下, 这瓶颈显然不是在 parse_text 中的文件读,就算再怎么阻塞,读写本地文件也不至于到每秒才 400 行的程度. 而在 write_db 中, 出现好几处 await 的地方, 这些地方可都是要同步等待结果返回的呀. 一个很好容易验证的方法就是把 write_db 中的 await 用 await asyncio.sleep 替换掉, 尝试不同的 sleep 时间. 实际上上面的问题在于每一次 while 1 的循环循环是同步的, 你必须要先处理完队列中的前一条数据, 才能继续处理下一条数据. 所以处理也很简单, 把每一次的循环异步化掉.

  • 資深大佬 : hustlibraco

    用“`async for“`替代“`for“`可以吗?

  • 主 資深大佬 : firejoke

    @hustlibraco #30 换成异步文件读,就可以换成 async for 了。

  • 主 資深大佬 : firejoke

    @yufpga #29 我看日志里,我同时开了好多个 task ,这个 task 的循环里 await query 或 add 或 commit ,就会跳到另一个 task 的循环里的 query 或 add 或 commit 。

  • 資深大佬 : yufpga

    @firejoke 是我看差了, 我以为只有一个 queue, 而你的代码里是两个 context, 各自 3 个 queue, 也就是总共 6 个 queue, 对应 6 个 write_db 的 task. 当遇到 await 的时候, 确实是会跳转到别的 task 里面执行. 确实比较奇怪,但我仍然觉得瓶颈不大可能在 parse_text, 你可以试着记录一下队列写入数据的速率, 如果这个速率也在 400/s 左右, 那说明确实有可能是 parse_text 慢了

  • 資深大佬 : ohayoo

    @firejoke 老哥可以分享下多进程版本的代码吗?

  • 主 資深大佬 : firejoke

    @ohayoo #34 还在调试多进程和协程的组合,后面会贴一下的。

文章導覽

上一篇文章
下一篇文章

AD

其他操作

  • 登入
  • 訂閱網站內容的資訊提供
  • 訂閱留言的資訊提供
  • WordPress.org 台灣繁體中文

51la

4563博客

全新的繁體中文 WordPress 網站
返回頂端
本站採用 WordPress 建置 | 佈景主題採用 GretaThemes 所設計的 Memory
4563博客
  • Hostloc 空間訪問刷分
  • 售賣場
  • 廣告位
  • 賣站?
在這裡新增小工具