diff --git a/python/py.md b/python/py.md index bac26fc..b5c3c6d 100644 --- a/python/py.md +++ b/python/py.md @@ -129,6 +129,16 @@ - [Creating Tasks](#creating-tasks) - [`asyncio.create_task(coro, *, name=None, context=None)`](#asynciocreate_taskcoro--namenone-contextnone) - [Task Cancellation](#task-cancellation) + - [Task Groups](#task-groups) + - [`asyncio.TaskGroup`](#asynciotaskgroup) + - [`taskgroup.create_task(coro, *, name=None, context=None)`](#taskgroupcreate_taskcoro--namenone-contextnone) + - [CancelledError](#cancellederror) + - [terminate a taskgroup](#terminate-a-taskgroup) + - [Sleeping](#sleeping) + - [`async asyncio.sleep(delay, result=None)`](#async-asynciosleepdelay-resultnone) + - [并发运行任务](#并发运行任务) + - [`awaitable asyncio.gather(*aws, return_exceptions=False)`](#awaitable-asynciogatheraws-return_exceptionsfalse) + - [return\_exceptions](#return_exceptions) # Python @@ -2196,3 +2206,146 @@ asyncio中存在部分组件允许结构化的并发,例如`asyncio.TaskGroup` 但是,如果真的需要对`asyncio.CancelledError`进行suppress,其在吞异常之后还需要调用`uncancel`来取消cancellation state。 +### Task Groups +task groups整合了`task creation API`和`wait for all tasks in the group to finish`两方面,使异步代码的编写变得更加便捷。 + +#### `asyncio.TaskGroup` +该类是一个asynchronous context manager,其中持有了一组tasks,task可以通过`create_task`被添加到group中。当context manager退出时,会等待所有的tasks执行完成。 + +##### `taskgroup.create_task(coro, *, name=None, context=None)` +该方法调用会在group中创建一个task,该方法的签名和`asyncio.create_task`方法的一致。 + +> 如果该task group是非活跃的,那么将会对给定的coro进行关闭。 + +task group的使用示例如下所示: +```py +async def main(): + async with asyncio.TaskGroup() as tg: + task1 = tg.create_task(some_coro(...)) + task2 = tg.create_task(another_coro(...)) + print(f"Both tasks have completed now: {task1.result()}, {task2.result()}") +``` +`async with`语句会等待所有位于task group中的tasks都执行完成,但是,`在等待的过程中仍然可以向task group中添加新的task`。 +> 可以将task group传递给coroutinue,并且在corotinue中调用`tg.create_task`向task group中添加task + +一旦task group中所有的task执行完成并且`async with`block退出,那么将无法向task group中添加新的task。 + +##### CancelledError +一旦task group中任一task抛出`非CancelledError的异常`,位于task group中的其他task都会被cancelled,后续任何task都无法被添加到task group中。 + +并且,`除了添加到task group的tasks之外,包含`async with`代码块的`main task`也会被标记为cancelled。` + +task group抛出非`CancelledError`会导致在下一个await表达式时抛出`CancelledError`异常,但是,`CancelledError`并不会抛出`async with block`,故而在async with block外无需关注async with是否会抛出`CancelledError`。 + +当所有task都完成后,任何抛出`非CancelledError类似异常`的异常都会被整合到`ExceptionGroup`/`BaseExceptionGroup`中,并被后续抛出。 + +有两种异常将会被特殊对待:`KeyboardInterrupt`和`SystemExit`,对于这两种异常,task group仍然会取消其他task并且等待其余tasks,但是原始的`KeyboardInterrupt`和`SystemExit`异常仍然会被抛出。(抛出的是异常的原值,而不是ExceptionGroup)。 + +对于async with body中抛出的异常,其处理和task抛出异常一致。 + +#### terminate a taskgroup +在python标准库中,并不原生支持对task group的终止。但是,可以通过向task group中`添加一个抛出未处理异常的task`来实现,示例如下: +```py +import asyncio +from asyncio import TaskGroup + + class TerminateTaskGroup(Exception): + """Exception raised to terminate a task group.""" + + async def force_terminate_task_group(): + """Used to force termination of a task group.""" + raise TerminateTaskGroup() + + async def job(task_id, sleep_time): + print(f'Task {task_id}: start') + await asyncio.sleep(sleep_time) + print(f'Task {task_id}: done') + + async def main(): + try: + async with TaskGroup() as group: + # spawn some tasks + group.create_task(job(1, 0.5)) + group.create_task(job(2, 1.5)) + # sleep for 1 second + await asyncio.sleep(1) + # add an exception-raising task to force the group to terminate + group.create_task(force_terminate_task_group()) + except* TerminateTaskGroup: + pass + + asyncio.run(main()) +``` +其输出如下: +```py +Task 1: start +Task 2: start +Task 1: done +``` +### Sleeping +#### `async asyncio.sleep(delay, result=None)` +该方法会阻塞`delay`秒。 + +如果`result`有值,那么当coroutinue完成时result会被返回给调用方。 + +sleep会一直挂起当前任务,允许其他任务执行。如果为sleep传参为0,其将会让其他tasks执行,可作为一种优化手段。其通常是在运行时间较长的任务中使用,避免长任务一直占用event loop。 + +```py +await sleep(0) +``` +### 并发运行任务 +#### `awaitable asyncio.gather(*aws, return_exceptions=False)` +并发的运行aws中的awaitable objects。 + +如果aws中的某个awaitable是coroutinue,那么其将自动作为task被调度。 + +如果所有的awaitable objects都成功执行,那么结果将会聚合到returned list中。returned list中结果的顺序和aws中awaitable object的顺序一致。 + +##### return_exceptions +return_exceptions的值决定了gather的行为: +- `False`(默认):如果该参数为False,那么那么aws中抛出的第一个异常将会被立刻传递到`等待gather返回结果的task`,aws中其他的tasks并不会被cancelled,仍然会继续运行 +- `True`:exceptions也将被当作成功的返回结果,并且被组装到returned list中 + +如果`gather()`被cancelled,那么gather中所有未完成的awaitable objects也都会被取消。 + +如果aws中的任一Task或Future被取消,其将被看作抛出CancelledError,在这种情况下`gather()`并不会被cancelled。这样能够避免某一个task/future的取消造成gather中其他的task/future也被取消。 + +`asyncio.gather`的使用示例如下所示: +```py + +import asyncio + +async def factorial(name, number): + f = 1 + for i in range(2, number + 1): + print(f"Task {name}: Compute factorial({number}), currently i={i}...") + await asyncio.sleep(1) + f *= i + print(f"Task {name}: factorial({number}) = {f}") + return f + +async def main(): + # Schedule three calls *concurrently*: + L = await asyncio.gather( + factorial("A", 2), + factorial("B", 3), + factorial("C", 4), + ) + print(L) + + asyncio.run(main()) + + # Expected output: + # + # Task A: Compute factorial(2), currently i=2... + # Task B: Compute factorial(3), currently i=2... + # Task C: Compute factorial(4), currently i=2... + # Task A: factorial(2) = 2 + # Task B: Compute factorial(3), currently i=3... + # Task C: Compute factorial(4), currently i=3... + # Task B: factorial(3) = 6 + # Task C: Compute factorial(4), currently i=4... + # Task C: factorial(4) = 24 + # [2, 6, 24] + +```