同步函数中调用异步函数的思路

发布于:2024-12-06 ⋅ 阅读:(95) ⋅ 点赞:(0)

在一个同步函数中,调用异步函数,但是此时并不知道当前的同步函数是否已经处于某个loop循环中。

1.先判断当前是否处于循环中。
def is_event_loop_running():
    try:
        # 尝试获取当前运行的事件循环
        loop = asyncio.get_running_loop()
        return loop is not None
    except RuntimeError:
        # 如果没有运行中的事件循环,会抛出 RuntimeError
        return False
2.如果未处于任何循环中。则使用asyncio.run来运行异步函数。
result = asyncio.run(async_slot_function())
3.如果处于循环中,则获取循环loop,如果loop正在运行,则创建任务执行目标函数,如果loop未运行则直接使用loop来运行函数。
loop = asyncio.get_running_loop()
# 当前loop是否正在运行
if loop.is_running():
    task = loop.create_task(async_slot_function())
    # 为任务添加回调函数
    task.add_done_callback(lambda t: print(t.result()))
else:
    result = loop.run_until_complete(async_slot_function())
4.完整代码
        # 判断是否处于某个loop中
        if is_event_loop_running():
            loop = asyncio.get_running_loop()
            # 当前loop是否正在运行
            if loop.is_running():
                task = loop.create_task(async_slot_function())
                # 为任务添加回调函数
                task.add_done_callback(lambda t: print(t.result()))
            else:
                result = loop.run_until_complete(async_slot_function())
        else:
            result = asyncio.run(async_slot_function())
5.这里会出现一个问题,以任务的形式来执行异步函数,并不会立即返回任务结果。如果需要结果进行返回,需要进行等待。使用asyncio.run_coroutine_threadsafe来执行异步函数,并对结果进行等待返回。
result_future = asyncio.run_coroutine_threadsafe(async_slot_function(), loop)
try:
    # 设置超时时间为 3 秒
    result = result_future.result(timeout=3)
except asyncio.TimeoutError:
    logging.error("Task timed out after 10 seconds")
    result = False
except Exception as e:
    logging.error(f"Task failed: {e}\n{traceback.format_exc()}")
    result = False
6.完整代码
# 同步函数    
def insert_api(self, data=None) -> bool:
        default_data = self.get_default_data()
        # 异步函数
        async def async_slot_function():
            pool = await create_pool()
            try:
                # 插入数据
                await insert(pool=pool, table_name=TaskPlanInterface.TABLE_NAME, data=default_data)
                return True
            except:
                traceback.print_exc()
                exc_info = traceback.format_exc()
                return False
            finally:
                await close_pool(pool)

        # 判断是否处于某个loop中
        if is_event_loop_running():
            loop = asyncio.get_running_loop()
            # 当前loop是否正在运行
            if loop.is_running():
                result_future = asyncio.run_coroutine_threadsafe(async_slot_function(), loop)
                try:
                    # 设置超时时间为 3 秒
                    result = result_future.result(timeout=3)
                except asyncio.TimeoutError:
                    logging.error("Task timed out after 10 seconds")
                    result = False
                except Exception as e:
                    logging.error(f"Task failed: {e}\n{traceback.format_exc()}")
                    result = False
            else:
                result = loop.run_until_complete(async_slot_function())
        else:
            result = asyncio.run(async_slot_function())


网站公告

今日签到

点亮在社区的每一天
去签到