基于 Windows I/O 完成端口(IOCP)的多线程任务队列系统小case

发布于:2025-05-01 ⋅ 阅读:(49) ⋅ 点赞:(0)

这段代码实现了一个基于 Windows I/O 完成端口(IOCP)的多线程任务队列系统。它通过 IOCP 将任务分发到线程池中执行,并通过一个线程安全的队列(std::list<std::string>)来管理任务数据。以下是对代码的详细讲解:

1. 宏定义和枚举

#define IOCP_LIST_EMPTR 0
#define IOCP_LIST_PUSH 1
#define IOCP_LIST_POP  2

enum {
    IocpListEmpty,
    IocpListPush,
    IocpListPop
};
  • 宏定义和枚举用于定义操作类型:

    • IOCP_LIST_EMPTRIocpListEmpty:表示清空队列的操作。

    • IOCP_LIST_PUSHIocpListPush:表示向队列中添加数据的操作。

    • IOCP_LIST_POPIocpListPop:表示从队列中取出数据的操作。

2. 数据结构 IOCP_PARAM

typedef struct IocpParam
{
    int nOperator; // 操作类型
    std::string strData; // 数据
    _beginthread_proc_type cbFunc; // 回调函数
    IocpParam(int op, const char* sData, _beginthread_proc_type cb = NULL) {
        nOperator = op;
        strData = sData;
        cbFunc = cb;
    }
    IocpParam() {
        nOperator = -1;
    }
} IOCP_PARAM;
  • IOCP_PARAM 是一个结构体,用于封装任务信息:

    • nOperator:指定操作类型(如 IocpListPushIocpListPop)。

    • strData:存储任务相关的数据。

    • cbFunc:回调函数指针,用于在任务完成后调用。

3. 线程函数 threadQueueEntry

void threadQueueEntry(HANDLE hIOCP) {
    std::list<std::string> lstString; // 线程安全的队列
    DWORD dwTransferred = 0;
    ULONG_PTR CompletionKey = 0;
    OVERLAPPED* poverlapped = NULL;
    while (GetQueuedCompletionStatus(hIOCP, &dwTransferred, &CompletionKey, &poverlapped, INFINITE)) {
        if ((dwTransferred == 0) && (CompletionKey == NULL)) {
            printf("thread is prepare to exit!\r\n");
            break;
        }
        IOCP_PARAM* pParam = (IOCP_PARAM*)CompletionKey;
        if (pParam->nOperator == IocpListPush) {
            lstString.push_back(pParam->strData);
        }
        else if (pParam->nOperator == IocpListPop) {
            std::string* pStr = NULL;
            if (lstString.size() > 0) {
                pStr = new std::string(lstString.front());
                lstString.pop_front();
            }
            if (pParam->cbFunc) {
                pParam->cbFunc(pStr);
            }
        }
        else if (pParam->nOperator == IocpListEmpty) {
            lstString.clear();
        }
        delete pParam;
    }
    _endthread();
}
  • 这是线程的入口函数,负责处理从完成端口(IOCP)接收到的任务。

  • 使用 GetQueuedCompletionStatus 函数从完成端口获取任务:

    • 如果 dwTransferred 为 0 且 CompletionKeyNULL,表示线程准备退出。

    • 否则,根据 CompletionKey 获取任务参数 pParam

  • 根据 pParam->nOperator 的值执行不同的操作:

    • IocpListPush:将数据添加到队列 lstString 中。

    • IocpListPop:从队列中取出数据,并调用回调函数 cbFunc

    • IocpListEmpty:清空队列。

  • 最后释放任务参数 pParam

4. 回调函数 func

void func(void* arg) {
    std::string* pstr = (std::string*)arg;
    if (pstr != NULL) {
        printf("pop from list :%s\r\n", pstr->c_str());
        delete pstr;
    }
    else {
        printf("list is empty,no data!\r\n");
    }
}
  • 这是一个简单的回调函数,用于处理从队列中取出的数据:

    • 如果 pstr 不为空,打印数据并释放内存。

    • 如果为空,打印队列为空的消息。

5. 主函数 main

int main()
{
    if (!CTool::Init()) return 1;
    printf("press any key to exit...\r\n");
    HANDLE hIOCP = INVALID_HANDLE_VALUE; // I/O 完成端口
    hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, NULL, 1); // 创建完成端口
    HANDLE hThread = (HANDLE)_beginthread(threadQueueEntry, 0, hIOCP); // 创建线程

    DWORD tick = GetTickCount64();
    while (_kbhit() != 0) { // 检测按键
        if (GetTickCount64() - tick > 1300) {
            PostQueuedCompletionStatus(hIOCP, sizeof(IOCP_PARAM), (ULONG_PTR)new IOCP_PARAM(IocpListPop, "hello world"), NULL);
        }
        if (GetTickCount64() - tick > 2000) {
            PostQueuedCompletionStatus(hIOCP, sizeof(IOCP_PARAM), (ULONG_PTR)new IOCP_PARAM(IocpListPush, "hello world"), NULL);
            tick = GetTickCount64();
        }
        Sleep(1);
    }

    if (hIOCP != NULL) {
        PostQueuedCompletionStatus(hIOCP, 0, NULL, NULL); // 唤醒完成端口
        WaitForSingleObject(hIOCP, INFINITE);
    }
    CloseHandle(hIOCP);

    printf("exit done!\r\n");
    ::exit(0);
}
  • 主函数的主要逻辑:

    1. 初始化:调用 CTool::Init() 进行初始化(假设这是一个工具类)。

    2. 创建完成端口:使用 CreateIoCompletionPort 创建一个完成端口,指定线程池大小为 1。

    3. 创建线程:通过 _beginthread 创建一个线程,线程函数为 threadQueueEntry

    4. 任务调度

      • 使用 PostQueuedCompletionStatus 向完成端口提交任务。

      • 每隔 1300 毫秒提交一个 IocpListPop 任务。

      • 每隔 2000 毫秒提交一个 IocpListPush 任务。

    5. 退出逻辑

      • 按下任意键后,通过 PostQueuedCompletionStatus 提交一个退出信号(dwTransferred 为 0,CompletionKeyNULL)。

      • 等待完成端口线程退出。

      • 关闭完成端口句柄。

总结

这段代码通过 Windows 的 I/O 完成端口(IOCP)实现了一个简单的任务队列系统。它利用 IOCP 的多线程能力,将任务分发到线程池中执行,并通过一个线程安全的队列管理任务数据。代码的核心在于:

  • 使用 CreateIoCompletionPort 创建完成端口。

  • 使用 PostQueuedCompletionStatus 提交任务。

  • 在线程中通过 GetQueuedCompletionStatus 获取任务并处理。

这种设计可以高效地处理并发任务,适用于需要高性能 I/O 操作的场景。


网站公告

今日签到

点亮在社区的每一天
去签到