こんにちは、福田asyncio.を紹介します。
Python 3.11の新機能
高速化が話題のPython 3.asyncio.です。asyncio.は複数のタスクを並行処理する高レベルAPIになります。同様の既存機能asyncio.やasyncio.)
- 同時に発生した例外の捕捉
- 例外発生時のタスクのキャンセル
この特徴によって、発生した例外を漏らさずに記録でき、制御しやすい並行処理の実装がより簡単にできるようになりました。私個人としては、かゆいところに手が届くイチオシの新機能です。[2]
asyncioを利用していてすでに例外やタスクのキャンセルを意識されている方も、あまりasyncioを使っておらず例外やキャンセルを意識されていない方も、この記事を通してPython 3.asyncio.を試してみよう!
なおこの記事は、Python 3.
asyncio.TaskGroup の基本の書き方
公式ドキュメントにあるサンプルコードを見てみましょう。asyncio.はコンテキストマネージャーを利用し、その中で複数のタスクを同時に実行します。サンプルコードではsome_とanother_という非同期関数
asyncio.TaskGroupのサンプルコードimport asyncio
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(some_coro(...))
task2 = tg.create_task(another_coro(...))
print("Both tasks have completed now.")
引用元:コルーチンとTask — Python 3.
しかし、複数のタスクを同時に処理するのであれば、既存機能のasyncio.やasyncio.でも同様の書き方ができます。
asyncio.TaskGroup のサンプルコードをasyncio.gather()で書き換えimport asyncio
async def main():
await asyncio.gather(
some_coro(...),
another_coro(...)
)
print("Both tasks have completed now.")
違いは例外の扱いにあります。まずは、
同時に発生した例外の捕捉
ここでは同時に発生した例外処理をPython 3.asyncio.と、Python 3.asyncio.で、それぞれ比較してみましょう。
Python 3.10以前のasyncioの例外処理
Python 3.asyncio.の利用例を紹介します。asyncio.では最初の例外を送出するか、すべてのタスクを実行した後に結果をリストで取得するしかありませんでした。
asyncio.での例外処理gather() return_指定なし:最初の例外を送出exceptions return_指定あり:すべてのタスクの完了を待ち、例外を含む結果をリストで取得exceptions=True
return_指定なしの場合、try-exceptで囲い捕捉します。return_の場合、戻り値のリストをチェックする必要があります。
実際にコードにして動きを見てみましょう。同時に実行するための簡易な非同期関数を3つ用意します。うち2つは、例外を送出します。
まずはreturn_を指定せずにasyncio.で動かしてみます。
asyncio.gather()で最初の例外を送出import asyncio
async def coro_success():
return "成功"
async def coro_value_err():
raise ValueError
async def coro_type_err():
raise TypeError
async def main():
"""return_exceptionsなし"""
try:
results = await asyncio.gather(
coro_success(), coro_value_err(), coro_type_err()
)
print(f"{results=}")
except BaseException as err:
print(f"{err=}")
asyncio.run(main())
実行すると次のような結果になります。最初の例外のみを捕捉できます。同時に発生した coro_ のTypeErrorは捕捉できません。
$ python3.10 asyncio_gather_except.py err=ValueError()
次にasyncio.のreturn_を指定します。return_はすべてのタスクを実行し、例外を含むすべてのタスクの結果をリストで取得します。
async def main():
"""return_exceptions=Trueあり"""
results = await asyncio.gather(
coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True)
print(f"{results=}")
これを実行すると、次のようになります。
$ python3.10 asyncio_gather_except.py results=['成功', ValueError(), TypeError()]
この場合、例外を処理するには、戻り値のリストをチェックする必要があります。
async def main():
"""return_exceptions=Trueあり"""
results = await asyncio.gather(
coro_success(), coro_value_err(), coro_type_err(), return_exceptions=True)
print(f"{results=}")
for result in results:
match result:
case ValueError():
print("ValueError")
case TypeError():
print("TypeError")
asyncio.ではすべてのタスクを実行しないと、すべての例外を捕捉できないことがわかります。たとえば、10個のHTTPリクエストを同時に行う処理があるとします。最初のタスクに例外が発生した場合、残りの失敗するかもしれない9つが完了するまで待つことになります
asyncio.での例外処理には、以下の2つの課題があります。
asyncio.の例外処理での課題gather() return_指定なし:同時に発生した例外を1つしか捕捉できないexceptions return_指定あり:戻り値のリストをチェックする必要がある。またすべてのタスクを実行しなければならないexceptions=True
これらの課題を解決し読みやすく書けるのがasyncio.です。
asyncio.TaskGroupによる新しい例外処理の書き方
asyncio.は、Python 3.except*を利用することで、逐次関数の例外処理と同様に書けます。見慣れないexcept*については後述します。
ではasyncio.で同時に発生する例外を捕捉してみましょう。先ほどのasyncio.の例をasyncio.で書き換えてみます。
なお except* で捕捉した場合、 err はExceptionGroupオブジェクトになります。実際に発生した例外は、exceptions属性にタプルで格納されています。
async def main():
try:
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(coro_success())
task2 = tg.create_task(coro_value_err())
task3 = tg.create_task(coro_type_err())
results = [task1.result(), task2.result(), task3.result()]
except* ValueError as err:
print(f"{err.exceptions=}")
except* TypeError as err:
print(f"{err.exceptions=}")
実行すると次のような結果になります。複数のタスクから同時に発生する例外を捕捉できました。
$ python3.11 asyncio_taskgroup_except.py err.exceptions=(ValueError(),) err.exceptions=(TypeError(),)
同期処理を書くように、直感的に実装できるようになっています。
タスクのキャンセル
さて、asyncio.のもう1つの特徴である、タスクのキャンセルについて見てみましょう。Python 3.asyncio.と、Python 3.asyncio.をそれぞれ比較してみましょう。
タスクがキャンセルされずに残ってしまうケース
asyncioで複数のタスクを同時に処理する際に例外が送出されると、完了していないタスクが残り意図せず動作してしまっている場合があります。asyncio.を利用し、タスクが残ることを確認してみましょう。
非同期関数のcoro_を追加しています。asyncio.を利用して出力します。
それぞれのタスクの状態を、Taskオブジェクトのインスタンス属性の_stateで確認します。_stateは
import asyncio
async def coro_success():
return "成功"
async def coro_value_err():
raise ValueError
async def coro_long():
await asyncio.sleep(1)
print("完了していないタスクが出力しています")
return "成功?"
async def main():
try:
task1 = asyncio.create_task(coro_success())
task2 = asyncio.create_task(coro_value_err())
task3 = asyncio.create_task(coro_long(), name="残るコルーチン") # 分かりやすくするためタスクに名づけ
results = await asyncio.gather(*[task1, task2, task3])
print(f"{results=}")
except ValueError as err:
print(f"{err=}")
print(f"タスク1の状態 {task1._state=}")
print(f"タスク2の状態 {task2._state=}")
print(f"タスク3の状態 {task3._state=}")
await asyncio.sleep(1.5) # 1.5秒待つことでException発生後にcoro_log()が動いていることを確認できる
asyncio.run(main())
実行すると次のような結果になります。追加した非同期関数のcoro_がPENDINGの状態で残っており、"完了していないタスクが出力しています"が出力されていることから、例外発生後にcoro_が動作しているとわかります。
$ python3.10 asyncio_gather_cancel.py err=ValueError() タスク1の状態 task1._state='FINISHED' タスク2の状態 task2._state='FINISHED' タスク3の状態 task3._state='PENDING' 完了していないタスクが出力しています
このように例外が発生すると、タスクが残ってしまう場合があります。タスクが残ってしまうと、意図せず後続処理の間に動作が完了したり、実装によってはI/
Python 3.10以前のタスクのキャンセル
上記のサンプルコードを修正し、タスクのキャンセル処理を追加します。修正のポイントは次の3つです。
- キャンセルされるタスク
(非同期関数) に asyncio.の捕捉と再送出 [3] を追加するCancelledError - 完了していないタスクに対しキャンセルを指示する
- キャンセルの完了を待つ
まず、キャンセルする非同期関数のcoro_へasyncio.の捕捉とキャンセル時の処理を追加します。[4] 次にasyncio.を実行しているmain()にタスクの完了を判定するTask.と、タスクのキャンセルを指示するTask.を追加します。最後にキャンセル処理の完了を待つ必要があるため、ここではasyncio.を利用し完了を待ちます。
async def coro_long():
try:
await asyncio.sleep(1)
print("完了していないタスクが出力しています")
return "成功?"
except asyncio.CancelledError as err: # キャンセル処理を追加
print("キャンセルされたタスクが出力しています")
raise asyncio.CancelledError # 再送出する
async def main():
try:
task1 = asyncio.create_task(coro_success())
task2 = asyncio.create_task(coro_value_err())
task3 = asyncio.create_task(coro_long(), name="残るコルーチン") # 分かりやすくするためタスクに名づけ
results = await asyncio.gather(*[task1, task2, task3])
print(f"{results=}")
except ValueError as err:
print(f"{err=}")
print(f"タスク1の状態 {task1._state=}")
print(f"タスク2の状態 {task2._state=}")
print(f"タスク3の状態 {task3._state=}") # この時点ではキャンセルされていない
for task in [task1, task2, task3]:
if task.done() is False:
task.cancel() # 未完了のタスクをキャンセル
print(f"タスク3の状態 {task3._state=}") # この時点ではキャンセル依頼されただけでキャンセルされていない
await asyncio.sleep(1) # キャンセル処理完了を待つ必要がある
print(f"タスク3の状態 {task3._state=}") # CancelledError内の処理が完了後、キャンセルになる
実行すると次のような結果になります。"完了していないタスクが出力しています"が出力されず、代わりに"キャンセルされたタスクが出力しています"が出力されます。
また、タスクの状態もtask.実行直後はasyncio.を利用しキャンセル処理の完了を待つと
$ python3.10 asyncio_gather_cancel.py err=ValueError() タスク1の状態 task1._state='FINISHED' タスク2の状態 task2._state='FINISHED' タスク3の状態 task3._state='PENDING' タスク3の状態 task3._state='PENDING' キャンセルされたタスクが出力しています タスク3の状態 task3._state='CANCELLED'
なお、サンプルコードではキャンセル処理の完了待ちに asyncio. を利用していますが、本来であれば未完了のタスクに対し asyncio. などを再度実行しなければなりません。
for task in [task1, task2, task3]:
if task.done() is False:
task.cancel() # 未完了のタスクをキャンセル
# await asyncio.sleep(1) # キャンセル処理完了を待つ必要がある
# asyncio.sleep(1)の代わりに、キャンセル完了を待つため asyncio.gather() を実行する
results = await asyncio.gather(*[task3], return_exceptions=True)
例外発生時にタスクが残ってしまうことを避けるため、キャンセル処理は必要です。サンプルコードと出力からわかるように、asyncio.では終了していないタスクをキャンセルする処理、そしてキャンセル完了を待つ処理の実装が必要です。
asyncio.TaskGroupによるタスクのキャンセル
asyncio.では、コンテキストマネージャーを抜けるタイミングで未完了のタスクをキャンセルします。例外が発生した場合に開発者は、タスクがキャンセルされたか、またキャンセル処理が完了しているか、を意識する必要がなくなります。
非常にシンプルで読みやすい実装になります。
async def main():
try:
async with asyncio.TaskGroup() as g:
task1 = g.create_task(coro_success())
task2 = g.create_task(coro_value_err())
task3 = g.create_task(coro_long(), name="残るコルーチン")
results = [task1.result(), task2.result(), task3.result()]
print(f"{results=}")
except* ValueError as err:
print(f"{err.exceptions=}")
print(f"完了していないタスク {task1._state=}")
print(f"完了していないタスク {task2._state=}")
print(f"完了していないタスク {task3._state=}")
実行すると、キャンセル処理が実行され、またタスクも
$ python3.11 asyncio_taskgroup_cancel.py キャンセルされたタスクが出力しています err.exceptions=(ValueError(),) タスク1の状態 task1._state='FINISHED' タスク2の状態 task2._state='FINISHED' タスク3の状態 task3._state='CANCELLED'
まとめ
asyncioはI/
今回の asyncio. を利用すると、例外処理や例外発生時のキャンセルを非常に読みやすく書けるようになっています。 asyncio. を利用して
最後に私事ですが、asyncio.とasyncioについてもう少し掘り下げた内容をPyCon JP 2022で紹介します
