CPP线程管理类实现

发布于:2024-05-02 ⋅ 阅读:(21) ⋅ 点赞:(0)

 一个线程管理模块应该包含Task(任务类)、Thread(线程类)、线程管理类(ThreadManager)。

#pragma once
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
#include <queue>
#include <functional>
#include <condition_variable>

typedef std::function<void(void*)> ThreadTaskFun;

// 任务类
class Task {
public:
  Task(ThreadTaskFun, void* param = nullptr) : func_(std::move(func)), taskParams(param){}
  Task() {}

  void execute() {
    if (func_) {
      func_(taskParams);
    }
  }

private:
  std::function<void(void*)> func_;
  void* taskParams;
};

// 线程类
class Thread {
public:
  Thread() : thread_(), active_(false) {}

  void start() {
    active_ = true;
    thread_ = std::thread([this]() {
      while (active_) {
        Task task;
        {
          std::unique_lock<std::mutex> lock(mutex_);
          // 等待任务队列不为空
          condition_.wait(lock, [this]() { return !tasks_.empty() || !active_; });
          if (!active_) {
            break;
          }
          task = std::move(tasks_.front());
          tasks_.pop();
        }
        task.execute();
      }
    });
  }

  void stop() {
    if (active_) {
      active_ = false;
      condition_.notify_one();
      if (thread_.joinable()) {
        thread_.join();
      }
    }
  }

  void addTask(Task task) {
    std::lock_guard<std::mutex> lock(mutex_);
    tasks_.push(std::move(task));
    condition_.notify_one();
  }

private:
  std::thread thread_;
  std::queue<Task> tasks_;
  std::mutex mutex_;
  std::condition_variable condition_;
  bool active_;
};

// 线程管理类
class ThreadManager {
public:
  ThreadManager(int num_threads) {
    for (int i = 0; i < num_threads; ++i) {
      threads_.emplace_back(std::make_unique<Thread>());
    }
  }

  ~ThreadManager()
  {
    stopThreads();
  }

  void startThreads() {
    for (auto& thread : threads_) {
      thread->start();
    }
  }

  void stopThreads() {
    for (auto& thread : threads_) {
      thread->stop();
    }
  }

  void addTask(Task task) {
    if (index_ >= threads_.size()) {
      index_ = 0; // 循环使用线程
    }
    threads_[index_]->addTask(std::move(task));
    index_++;
  }

private:
  std::vector<std::unique_ptr<Thread>> threads_;
  int index_ = 0;
};

Demo

#include "threadpool.h"

class Resources
{
public:
  Resources()
  {
    resourcesThreadFun = std::bind(&Resources::exampleTaskFunction, this, std::placeholders::_1);
  }
  void exampleTaskFunction(void* param) {
    int index = 0;
    std::cout << "Executing task " << index << " in thread: " << std::this_thread::get_id() << std::endl;
  }

  ThreadTaskFun resourcesThreadFun;

};

int main() {
  ThreadManager manager(4); // 创建4个线程
  manager.startThreads();

  Resources* pResource = new Resources();
  

  // 添加10个示例任务
  for (int i = 0; i < 100; ++i) {
    Task task(pResource->resourcesThreadFun);
    manager.addTask(std::move(task));
  }

  std::cout << "main thread over!" << std::endl;

  return 0;
}