在一个同步函数中,调用异步函数,但是此时并不知道当前的同步函数是否已经处于某个loop循环中。
1.先判断当前是否处于循环中。
def is_event_loop_running():
try:
# 尝试获取当前运行的事件循环
loop = asyncio.get_running_loop()
return loop is not None
except RuntimeError:
# 如果没有运行中的事件循环,会抛出 RuntimeError
return False
2.如果未处于任何循环中。则使用asyncio.run来运行异步函数。
result = asyncio.run(async_slot_function())
3.如果处于循环中,则获取循环loop,如果loop正在运行,则创建任务执行目标函数,如果loop未运行则直接使用loop来运行函数。
loop = asyncio.get_running_loop()
# 当前loop是否正在运行
if loop.is_running():
task = loop.create_task(async_slot_function())
# 为任务添加回调函数
task.add_done_callback(lambda t: print(t.result()))
else:
result = loop.run_until_complete(async_slot_function())
4.完整代码
# 判断是否处于某个loop中
if is_event_loop_running():
loop = asyncio.get_running_loop()
# 当前loop是否正在运行
if loop.is_running():
task = loop.create_task(async_slot_function())
# 为任务添加回调函数
task.add_done_callback(lambda t: print(t.result()))
else:
result = loop.run_until_complete(async_slot_function())
else:
result = asyncio.run(async_slot_function())
5.这里会出现一个问题,以任务的形式来执行异步函数,并不会立即返回任务结果。如果需要结果进行返回,需要进行等待。使用asyncio.run_coroutine_threadsafe来执行异步函数,并对结果进行等待返回。
result_future = asyncio.run_coroutine_threadsafe(async_slot_function(), loop)
try:
# 设置超时时间为 3 秒
result = result_future.result(timeout=3)
except asyncio.TimeoutError:
logging.error("Task timed out after 10 seconds")
result = False
except Exception as e:
logging.error(f"Task failed: {e}\n{traceback.format_exc()}")
result = False
6.完整代码
# 同步函数
def insert_api(self, data=None) -> bool:
default_data = self.get_default_data()
# 异步函数
async def async_slot_function():
pool = await create_pool()
try:
# 插入数据
await insert(pool=pool, table_name=TaskPlanInterface.TABLE_NAME, data=default_data)
return True
except:
traceback.print_exc()
exc_info = traceback.format_exc()
return False
finally:
await close_pool(pool)
# 判断是否处于某个loop中
if is_event_loop_running():
loop = asyncio.get_running_loop()
# 当前loop是否正在运行
if loop.is_running():
result_future = asyncio.run_coroutine_threadsafe(async_slot_function(), loop)
try:
# 设置超时时间为 3 秒
result = result_future.result(timeout=3)
except asyncio.TimeoutError:
logging.error("Task timed out after 10 seconds")
result = False
except Exception as e:
logging.error(f"Task failed: {e}\n{traceback.format_exc()}")
result = False
else:
result = loop.run_until_complete(async_slot_function())
else:
result = asyncio.run(async_slot_function())