130 lines
4.2 KiB
Python
130 lines
4.2 KiB
Python
# MicroPython uasyncio module
|
|
# MIT license; Copyright (c) 2019-2022 Damien P. George
|
|
|
|
from . import core
|
|
|
|
|
|
def _run(waiter, aw):
|
|
try:
|
|
result = await aw
|
|
status = True
|
|
except BaseException as er:
|
|
result = None
|
|
status = er
|
|
if waiter.data is None:
|
|
# The waiter is still waiting, cancel it.
|
|
if waiter.cancel():
|
|
# Waiter was cancelled by us, change its CancelledError to an instance of
|
|
# CancelledError that contains the status and result of waiting on aw.
|
|
# If the wait_for task subsequently gets cancelled externally then this
|
|
# instance will be reset to a CancelledError instance without arguments.
|
|
waiter.data = core.CancelledError(status, result)
|
|
|
|
|
|
async def wait_for(aw, timeout, sleep=core.sleep):
|
|
aw = core._promote_to_task(aw)
|
|
if timeout is None:
|
|
return await aw
|
|
|
|
# Run aw in a separate runner task that manages its exceptions.
|
|
runner_task = core.create_task(_run(core.cur_task, aw))
|
|
|
|
try:
|
|
# Wait for the timeout to elapse.
|
|
await sleep(timeout)
|
|
except core.CancelledError as er:
|
|
status = er.value
|
|
if status is None:
|
|
# This wait_for was cancelled externally, so cancel aw and re-raise.
|
|
runner_task.cancel()
|
|
raise er
|
|
elif status is True:
|
|
# aw completed successfully and cancelled the sleep, so return aw's result.
|
|
return er.args[1]
|
|
else:
|
|
# aw raised an exception, propagate it out to the caller.
|
|
raise status
|
|
|
|
# The sleep finished before aw, so cancel aw and raise TimeoutError.
|
|
runner_task.cancel()
|
|
await runner_task
|
|
raise core.TimeoutError
|
|
|
|
|
|
def wait_for_ms(aw, timeout):
|
|
return wait_for(aw, timeout, core.sleep_ms)
|
|
|
|
|
|
class _Remove:
|
|
@staticmethod
|
|
def remove(t):
|
|
pass
|
|
|
|
|
|
async def gather(*aws, return_exceptions=False):
|
|
if not aws:
|
|
return []
|
|
|
|
def done(t, er):
|
|
# Sub-task "t" has finished, with exception "er".
|
|
nonlocal state
|
|
if gather_task.data is not _Remove:
|
|
# The main gather task has already been scheduled, so do nothing.
|
|
# This happens if another sub-task already raised an exception and
|
|
# woke the main gather task (via this done function), or if the main
|
|
# gather task was cancelled externally.
|
|
return
|
|
elif not return_exceptions and not isinstance(er, StopIteration):
|
|
# A sub-task raised an exception, indicate that to the gather task.
|
|
state = er
|
|
else:
|
|
state -= 1
|
|
if state:
|
|
# Still some sub-tasks running.
|
|
return
|
|
# Gather waiting is done, schedule the main gather task.
|
|
core._task_queue.push(gather_task)
|
|
|
|
ts = [core._promote_to_task(aw) for aw in aws]
|
|
for i in range(len(ts)):
|
|
if ts[i].state is not True:
|
|
# Task is not running, gather not currently supported for this case.
|
|
raise RuntimeError("can't gather")
|
|
# Register the callback to call when the task is done.
|
|
ts[i].state = done
|
|
|
|
# Set the state for execution of the gather.
|
|
gather_task = core.cur_task
|
|
state = len(ts)
|
|
cancel_all = False
|
|
|
|
# Wait for the a sub-task to need attention.
|
|
gather_task.data = _Remove
|
|
try:
|
|
yield
|
|
except core.CancelledError as er:
|
|
cancel_all = True
|
|
state = er
|
|
|
|
# Clean up tasks.
|
|
for i in range(len(ts)):
|
|
if ts[i].state is done:
|
|
# Sub-task is still running, deregister the callback and cancel if needed.
|
|
ts[i].state = True
|
|
if cancel_all:
|
|
ts[i].cancel()
|
|
elif isinstance(ts[i].data, StopIteration):
|
|
# Sub-task ran to completion, get its return value.
|
|
ts[i] = ts[i].data.value
|
|
else:
|
|
# Sub-task had an exception with return_exceptions==True, so get its exception.
|
|
ts[i] = ts[i].data
|
|
|
|
# Either this gather was cancelled, or one of the sub-tasks raised an exception with
|
|
# return_exceptions==False, so reraise the exception here.
|
|
if state is not 0:
|
|
raise state
|
|
|
|
# Return the list of return values of each sub-task.
|
|
return ts
|