Python 异步任务如何检测运行时间
資深大佬 : fangwenxue 4
async def run_task(self, task_name, task_fun): self.logger.info(f'{task_name}') begin_time = int(time.time()) self.task_state = False # t = threading.Thread(target=self.check_task, args=(begin_time, task_name)) # t.start() # t.join() try: await getattr(self, task_fun)() await asyncio.sleep(1) self.logger.warning(f'{task_name} -> DONE.') self.task_state = True except Exception as e: self.logger.error(e) self.task_state = True finally: await self.close_page() await asyncio.sleep(1) def check_task(self, begin_time, task_name): while True: if self.task_state: return True now_time = int(time.time()) n = now_time - begin_time if n > 120: self.logger.warning(f'{task_name} runtime {n}s.') return False time.sleep(1)
- 想要实现的是 task_fun 执行时间超过 N 秒直接退出 run_task
- 自己想了笨方法, 写了个线程检测运行时间,但不知道怎么抛异常退出 task_fun
- 有没有更好的实现?
大佬有話說 (3)