跳至主要內容
  • Hostloc 空間訪問刷分
  • 售賣場
  • 廣告位
  • 賣站?

4563博客

全新的繁體中文 WordPress 網站
  • 首頁
  • [asyncio] 多线程中子线程使用主线程 event loop,这样是否与直接用主线程去进行事件循环无差别
未分類
14 2 月 2021

[asyncio] 多线程中子线程使用主线程 event loop,这样是否与直接用主线程去进行事件循环无差别

[asyncio] 多线程中子线程使用主线程 event loop,这样是否与直接用主线程去进行事件循环无差别

資深大佬 : opengo 4

使用concurrent.futures创建线程池然后使用asyncio的run_in_executor方法去利用线程池执行某异步任务,具体代码如下:

# -*- coding: utf-8 -*- import asyncio import concurrent.futures import threading import time import logging  from datetime import datetime   logger = logging.getLogger(__file__) logger.setLevel('INFO')   async def run_task(val):     logger.info(f"run_task Owning thread - {threading.currentThread().native_id} {threading.currentThread().getName()}")     await asyncio.sleep(val)     return datetime.now()   def thread_task(loop, val):     logger.info(f"thread_task Start thread - {threading.currentThread().ident} {threading.currentThread().getName()}")      v = asyncio.run_coroutine_threadsafe(run_task(val), loop)     result = v.result()      logger.info(f"thread - {threading.currentThread().ident}  {threading.currentThread().getName()} done ==== {result}")   async def main(loop):     result = [0.1, 0.2, 0.3, 0.4, 0.5]     logger.info(f"main thread :: {threading.currentThread().ident} {threading.currentThread().getName()}")      with concurrent.futures.ThreadPoolExecutor() as pool:         blocking_tasks = [             loop.run_in_executor(                 pool, thread_task, loop, val             ) for val in result         ]         await asyncio.gather(*blocking_tasks)   if __name__ == '__main__':      loop = asyncio.get_event_loop()     try:         loop.run_until_complete(main(loop))     finally:         loop.close()  

使用run_in_executor调用thread_task方法,再在子线程中使用 asyncio.run_coroutine_threadsafe 方法去调用真正的异步任务,而实际上源码中 asyncio.run_coroutine_threadsafe 方法通过ensure_future 创建了一个future 添加到主线程的 event loop 中并绑定当前线程 future,子线程的异步任务在主线程中被循环,循环完成后再从子线程中去获取结果

感觉上不如直接在主线程中使用asyncio.gather 或者 asyncio.as_completed 去并发执行这些任务,另一种办法是每个子线程再设置一个 loop 去进行事件循环,但是实际测试中这几种方案性能相差并不多

# 使用 asyncio.gather 或 asyncio.as_completed task_list = []          for val in result:         task_list.append(asyncio.create_task(run_task(val)))              await asyncio.gather(*task_list)      for f in asyncio.as_completed(task_list, loop=loop):         results = await f   # 设置新的 loop def thread_task(val):      logger.info(f"thread_task Start thread - {threading.currentThread().ident} {threading.currentThread().getName()}")      loop = asyncio.new_event_loop()     try:         asyncio.set_event_loop(loop)         v = loop.run_until_complete(asyncio.gather(run_task(val)))         logger.info(f"thread - {threading.currentThread().ident}  {threading.currentThread().getName()} done ==== {result}")     finally:         loop.close()  

大家有什么好的多线程+协程的实现方案吗,这里对应的场景是同时处理多个文件的 io 任务

大佬有話說 (4)

  • 資深大佬 : liuxingdeyu

    没太搞明白,线程加协程的意义

  • 主 資深大佬 : opengo

    @liuxingdeyu 可以多线程去进行网络请求,文件 IO,这里我想利用多线程去处理文件 IO,利用协程去提高性能,多进程+协程的方式面对大量文件 IO 操作性能更好,但是资源开销也更大,所以想尝试多线程+协程的最优方案

  • 資深大佬 : linw1995

    应该等同,但会更慢

  • 主 資深大佬 : opengo

    @linw1995 确实不如单线程+协程,应该是线程池化+上下文切换消耗的

文章導覽

上一篇文章
下一篇文章

AD

其他操作

  • 登入
  • 訂閱網站內容的資訊提供
  • 訂閱留言的資訊提供
  • WordPress.org 台灣繁體中文

51la

4563博客

全新的繁體中文 WordPress 網站
返回頂端
本站採用 WordPress 建置 | 佈景主題採用 GretaThemes 所設計的 Memory
4563博客
  • Hostloc 空間訪問刷分
  • 售賣場
  • 廣告位
  • 賣站?
在這裡新增小工具