Linux -- 线程池

发布于:2025-02-11 ⋅ 阅读:(55) ⋅ 点赞:(0)

目录

线程池

代码:

makefile:

Log.hpp 代码:

获取当前时间

可变参数 

为什么用宏来封装日志函数

LockGuard.hpp 代码:

Task.hpp 代码:

Thread.hpp 代码:

ThreadPool.hpp 代码:

bind 函数

test.cc 代码:

运行结果:

向显示器输出日志:

向文件输入日志信息: 


线程池

线程池是一种多线程处理形式,处理过程中将任务分为若干个线程,使用线程池可以有效地管理并发任务的执行。线程池的基本思想是在启动一个程序的时候创建一组线程(通常根据CPU核心数和任务类型来决定线程的数量),并将这些线程放入一个池中,在有新的任务需要进行时,就从池中取出一个空闲的线程来执行任务,当这个任务执行完毕后,该线程并不会死亡,而是再次返回到线程池中处于等待状态。

使用线程池的好处包括:

  1. 减少创建和销毁线程的开销:创建和销毁线程都是相对昂贵的操作,尤其是当应用频繁地创建和销毁线程时,这种开销会变得显著。线程池通过重用现有的线程来执行新任务,减少了这种开销。

  2. 控制并发度:线程池可以帮助限制应用程序中的并发线程数量,防止过多的线程消耗完系统的资源。

  3. 提高响应速度:对于需要立即响应的任务,如果线程已经存在并等待工作,那么任务可以更快地开始执行。

  4. 资源复用:线程池中的线程可以在多个任务之间复用,提高了线程的利用率

  5. 更好的管理:线程池通常提供一些管理功能,如设置核心线程数、最大线程数、线程存活时间等,以及排队策略和拒绝策略。

线程池维护着多个线程,等待着监督管理者分配可并发执行的任务,这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

代码:

makefile:

testPool:test.cc
	g++ -o $@ $^ -std=c++14 -lpthread

.PHONY:clean
clean:
	rm -f testPool

Log.hpp 代码:

获取当前时间

time_t curr_time = time(nullptr) 的作用就是获取当前时间并以 秒 为单位存储在 curr_time 中。由于获取的时间以秒为单位,为了得到格式为 年月日时分秒 的时间格式,需要调用 localtime 函数。

localtime 函数接收一个指向 time_t 类型的指针,并返回一个指向 struct tm 的指针,该结构体包含分解后的本地时间信息(年、月、日、时、分、秒等)。

struct tm 结构体中包含以下成员:

  • int tm_sec; // 秒,正常范围为 0 至 59,但允许至 61 来适应闰秒
  • int tm_min; // 分钟,范围 0 至 59
  • int tm_hour; // 小时,范围 0 至 23
  • int tm_mday; // 一个月中的第几天,范围 1 至 31
  • int tm_mon; // 月份,范围 0 至 11,其中 0 表示一月,11 表示十二月
  • int tm_year; // 自公元 1900 年以来的年数,因此需要加上 1900 才能得到完整的年份

可变参数 

va_list 是 C 和 C++ 标准库中定义的一种类型,用于处理可变参数函数.

va_start 初始化一个 va_list 类型的变量,使其指向第一个可变参数。此宏必须在访问任何可变参数之前调用。用法如下:va_start(va_list ap, last_fixed_arg);

  • ap要初始化的 va_list 变量。
  • last_fixed_arg:最后一个固定参数的名字,它是可变参数列表之前的那个参数

vsnsprintf 用于格式化字符串,并将结果写入到指定大小的缓冲区中。

#include <stdarg.h>
#include <stdio.h>

int vsnprintf(char *str, size_t size, const char *format, va_list ap);
  • str:指向要写入格式化输出的字符数组(即目标缓冲区)的指针
  • size:目标缓冲区的最大长度,包括结尾的空字符 \0。如果 size 为 0,则不写入任何内容到 str,但仍然计算并返回需要的字符数(不包括结尾的空字符)。
  • format格式化字符串,定义了如何解析后续参数。
  • ap:由 va_list 定义的变量,包含了所有要被格式化的参数

va_end 清理va_start 初始化的 va_list 变量。它应该总是被调用来结束对可变参数的访问。

为什么用宏来封装日志函数

因为在写日志时,需要获取当前运行的文件名和行数,为了更方便地获取这两个变量,使用宏 __FILE____LINE__它们可以在编译时被替换为当前文件的名称和当前行号。这些宏对于调试、日志记录以及错误报告非常有用,因为它们可以提供代码中某个特定位置的信息。 用宏来封装日志函数后,每一次调用日志函数都会自动获取文件名和行数,使用者只需要传日志等级和想备注的日志信息,不需要关心其他信息

#pragma once

// 编写日志

#include <iostream>
#include <fstream>
#include <cstdio>
#include <string>
#include <ctime>
#include <cstdarg>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>
#include "LockGuard.hpp"

bool gIsSave = false;                  // 是否保存到文件中
std::string logname = "log.txt"; // 保存日志的文件

// 日志等级
enum Level
{
    DEBUG = 0, // 调试
    INFO,      // 常规信息
    WARNING,   // 警告
    ERROR,     // 出错
    FATAL      // 致命错误
};

// 日志等级转为字符串
std::string LevelToString(int level)
{
    switch (level)
    {
    case DEBUG:
        return "Debug";
    case INFO:
        return "Info";
    case WARNING:
        return "Warning";
    case ERROR:
        return "Error";
    case FATAL:
        return "Fatal";
    default:
        return "Unknown";
    }
}

// 保存到日志文件中
void SaveFile(std::string &filename, const std::string &message)
{
    // 创建文件流,并尝试打开文件,追加模式下写入
    std::ofstream out(filename, std::ios::app);

    // 检查文件是否打开成功
    if (!out.is_open())
        return; // 打开失败

    out << message; // 写入数据
    out.close();    // 关闭文件流
}

// 获取当前时间,并转为字符串
std::string GetTimeString()
{
    // 获取当前时间
    time_t curr_time = time(nullptr);

    // 将时间转为时间结构体
    struct tm *format_time = localtime(&curr_time);

    // 转换失败,可能为无效时间
    if (format_time == nullptr)
        return "None";

    char time_buffer[1024];

    // 转为想要的字符串格式放入缓冲区中
    snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d-%d-%d",
             format_time->tm_year + 1900,
             format_time->tm_mon + 1,
             format_time->tm_mday,
             format_time->tm_hour,
             format_time->tm_min,
             format_time->tm_sec);

    return time_buffer;
}

pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
// 日志文件,行数,是否保存到文件中,日志等级,想要输入的字符串(可变参数)
void LogMessage(std::string filename, int line, bool issave, int level, const char *format, ...)
{
    std::string levelstr = LevelToString(level); // 等级字符串
    std::string timestr = GetTimeString();       // 时间字符串
    pid_t selfpid = getpid();

    // 处理可变参数
    char buffer[1024];
    va_list arg;           // 声明可变参数列表
    va_start(arg, format); // 初始化arg,arg从format中读可变参数

    // 按照 format 给的方式,格式控制字符串,并写入缓冲区(arg包含了format的参数列表)
    // 比如 ("helloworld %d %d",cnt,num),format 就是helloworld %d %d,参数列表就是cnt,num
    vsnprintf(buffer, sizeof(buffer), format, arg);

    // 清理arg
    va_end(arg);

    std::string message = "[" + timestr + "]" + "[" + levelstr + "]" +
                          "[" + std::to_string(selfpid) + "]" +
                          "[" + filename + "]" + "[" + std::to_string(line) + "]" + buffer + "\n";

    LockGuard lockguard(&lock); // 保护显示器资源,防止输出混乱

    if (!issave)
        std::cout << message; // 向显示器输出
    else
        SaveFile(logname, message); // 写入文件
}

// 宏定义,便于日志的使用,##__VA_ARGS__可以确保在没有提供额外参数的情况下不会留下多余的逗号
#define LOG(level, format, ...)                                             \
    do                                                                       \
    {                                                                        \
        LogMessage(__FILE__, __LINE__, gIsSave, level, format, ##__VA_ARGS__); \
    } while (0)

// 日志存入文件
#define EnableFile()    \
    do                  \
    {                   \
        gIsSave = true; \
    } while (0)

// 日志输出到屏幕
#define EnableScreen()   \
    do                   \
    {                    \
        gIsSave = false; \
    } while (0)

LockGuard.hpp 代码:

#ifndef __LOCK_GUARD_HPP__
#define __LOCK_GUARD_HPP__


//对锁进行封装


#include<pthread.h>
#include<iostream>
class LockGuard
{
public:
    LockGuard(pthread_mutex_t *mutex)
        :_mutex(mutex)
    {
        pthread_mutex_lock(_mutex);//加锁
    }
    ~LockGuard()
    {
        pthread_mutex_unlock(_mutex);
    }
private:
    pthread_mutex_t *_mutex;
};

#endif

Task.hpp 代码:

#pragma once

#include <iostream>
#include <string>
#include <functional>

class Task
{
public:
    Task() {}
    Task(int a, int b)
        : _a(a), _b(b), _result(0)
    {
    }

    void Excute()
    {
        _result=_a+_b;
    }
    std::string ResultToString()
    {
        return std::to_string(_a) + "+" + std::to_string(_b) + "=" + std::to_string(_result);
    }

    std::string DebugToString()
    {
        return std::to_string(_a) + "+" + std::to_string(_b) + "=?";
    }

    void operator()()
    {
        Excute();
    }

private:
    int _a;
    int _b;
    int _result;
};

Thread.hpp 代码:

#ifndef __THREAD_HPP__
#define __THREAD_HPP__

#include <iostream>
#include <string>
#include <unistd.h>
#include <functional>
#include <pthread.h>

namespace ThreadModule
{
    using func_t = std::function<void(std::string)>;

    class Thread
    {
    public:
        void Excute()
        {
            _func(_threadname);
        }

    public:
        Thread(func_t func, const std::string name = "none-name")
            : _func(func), _threadname(name), _stop(true)
        {
        }
        static void *threadroutine(void *args) // 类成员函数,形参是有this指针的!!
        {
            Thread *self = static_cast<Thread *>(args);
            self->Excute();
            return nullptr;
        }
        bool Start()
        {
            int n = pthread_create(&_tid, nullptr, threadroutine, this);
            if (!n)
            {
                _stop = false;
                return true;
            }
            else
            {
                return false;
            }
        }
        void Detach()
        {
            if (!_stop)
            {
                pthread_detach(_tid);
            }
        }
        void Join()
        {
            if (!_stop)
            {
                pthread_join(_tid, nullptr);
            }
        }
        std::string name()
        {
            return _threadname;
        }
        void Stop()
        {
            _stop = true;
        }
        ~Thread() {}

    private:
        pthread_t _tid;
        std::string _threadname;
       
        func_t _func;
        bool _stop;
    };
}

#endif

ThreadPool.hpp 代码:

bind 函数

std::bind 是 C++ 标准库中提供的一个函数模板,它位于 <functional> 头文件中。std::bind 用于创建可调用对象(如函数、lambda 表达式或其它可调用对象)的包装器,允许你固定一部分参数(即绑定),并将剩余的参数留待后续调用时提供。这使得你可以创建部分应用的函数,或者调整参数传递给目标函数的方式。

std::bind 的参数可以分为几个类别,主要包括:

  1. 目标可调用对象:这是你想要绑定的函数、成员函数、lambda 表达式或任何其他可调用对象。它作为 std::bind 的第一个参数。

  2. 绑定参数(固定参数):这些是你希望在调用时固定的参数值。它们可以是普通值、变量、引用或其他表达式的结果。当最终调用被绑定的对象时,这些参数将直接传递给目标可调用对象。

  3. 占位符:如果你不想在 std::bind 时就提供所有参数,而是想留一些参数到后续调用时再提供,你可以使用占位符。C++ 标准库提供了 std::placeholders::_1, std::placeholders::_2, 等等,用于表示第一个、第二个参数位置等。占位符允许你在创建绑定对象时不指定某些参数的具体值,并在实际调用时传入这些参数。

  4. 成员函数和对象实例:当你绑定一个成员函数时,除了成员函数指针本身外,你还必须提供一个对象实例(或者指向该对象的指针或引用),这样 std::bind 才知道要操作哪个对象上的成员函数。

#pragma once

// 线程池的封装

#include <pthread.h>
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Thread.hpp"
#include "LockGuard.hpp"
#include "Log.hpp"
using namespace ThreadModule;

const static int gdefaultthreadnum = 10;
template <typename T>
class ThreadPool
{
private:
    void LockQueue()
    {
        pthread_mutex_lock(&_mutex);
    }
    void UnlockQueue()
    {
        pthread_mutex_unlock(&_mutex);
    }
    void ThreadSleep() // 谁调用谁休眠
    {
        pthread_cond_wait(&_cond, &_mutex);
    }
    void ThreadWakeUp() // 唤醒一个线程
    {
        pthread_cond_signal(&_cond);
    }
    void ThreadWakeUpAll() // 唤醒全部线程
    {
        pthread_cond_broadcast(&_cond);
    }
    void HandlerTask(std::string name) // 第一个参数是this指针
    {
        while (true) // 一直处理任务,直到线程池退出且任务队列为空
        {
            LockQueue(); // 任务队列是临界资源,需要保护

            // 任务队列为空,且线程池还在运行,则线程休眠
            while (_task_queue.empty() && _isrunning)
            {
                _waitnum++;
                ThreadSleep();
                _waitnum--;
            }

            // 任务队列为空,且线程池不运行了,退出
            if (_task_queue.empty() && !_isrunning)
            {
                UnlockQueue(); // 解锁
                break;         // 退出
            }

            // 还有任务没处理,则处理任务
            T t = _task_queue.front(); // 取出任务
            _task_queue.pop();
            UnlockQueue(); // 线程已经取出任务,任务已为线程私有,且任务可能比较耗时,解锁

            LOG(DEBUG, "%s get a task", name.c_str());

            t();

            LOG(DEBUG, "%s handler a task,result is: %s", name.c_str(), t.ResultToString().c_str());
        }
    }

public:
    ThreadPool(int num = gdefaultthreadnum)
        : _threadnum(num), _waitnum(0), _isrunning(false)
    {
        pthread_mutex_init(&_mutex, nullptr);
        pthread_cond_init(&_cond, nullptr);
        LOG(INFO, "ThreadPool Construct");
    }
    // 只是传了线程需要的参数,线程还没有创建出来
    void InitThreadPool()
    {
        for (int i = 0; i < _threadnum; i++)
        {
            std::string name = "thread-" + std::to_string(i + 1);

            // 由于 HandlerTask 的第一个参数是 this 指针,第二个参数是 string 类型
            // 而 Thread.hpp 中 Thread 类的构造函数要求传的函数的参数只能有 string 类型
            // 可以用 bind 函数对 HandlerTask 的第一个参数进行绑定(绑了 this 指针)
            // (_1 就是绑定第一个参数,_2 就是绑定第二个参数)
            // 绑定之后,使用 HandlerTask 就只需要传 string 类型的参数
            // 相当于函数的参数从( this,string )变成了( string )
            _threads.emplace_back(std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1), name);
            LOG(INFO, "init thread %s done", name.c_str());
        }
        _isrunning = true;
    }

    // 创建线程
    void Start()
    {
        for (auto &thread : _threads)
        {
            thread.Start();
        }
    }

    void Stop()
    {
        LockQueue(); // 状态值也是临界资源,需要保护

        _isrunning = false;

        // 线程池不运行了,需要唤醒所以在条件变量下等待的线程
        // 否则线程会一直阻塞等待条件变量,无法被 join
        ThreadWakeUpAll();

        UnlockQueue();
    }

    void Wait()
    {
        for (auto &thread : _threads)
        {
            thread.Join();
            LOG(INFO, "%s is quit...", thread.name().c_str());
        }
    }

    bool Enqueue(const T &t)
    {
        bool ret = false;
        LockQueue();

        // 线程池在运行中,才可以放入任务
        if (_isrunning)
        {
            _task_queue.push(t);

            // 有线程在等任务,唤醒线程
            if (_waitnum > 0)
                ThreadWakeUp();

            LOG(DEBUG, "enqueue task success");
            ret = true; // 任务插入成功
        }

        UnlockQueue();

        return ret;
    }
    ~ThreadPool()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_cond);
    }

private:
    int _threadnum; // 线程的个数

    std::vector<Thread> _threads; // 管理线程
    std::queue<T> _task_queue;    // 任务队列

    pthread_mutex_t _mutex; // 互斥锁
    pthread_cond_t _cond;   // 信号量

    bool _isrunning; // 线程池的启动状态
    int _waitnum;    // 线程等待的个数
};

test.cc 代码:

srand(time(nullptr) ^ getpid() ^ pthread_self()) 通过将这些值进行按位异或(^操作符),可以获得一个更加独特且难以预测的种子值

#include "ThreadPool.hpp"
#include "Task.hpp"
#include "Log.hpp"
#include <iostream>
#include <string>
#include <memory>
#include <ctime>

int main()
{
    srand(time(nullptr) ^ getpid() ^ pthread_self());
    //EnableScreen(); // 开启日志显示器打印功能
    EnableFile();
    std::unique_ptr<ThreadPool<Task>> tp = std::make_unique<ThreadPool<Task>>(5); // C++14新特性
    tp->InitThreadPool();
    tp->Start();

    int tasknum = 10;
    while (tasknum)
    {
        int a = rand() % 10 + 1;
        usleep(1234);
        int b = rand() % 5 + 1;
        Task t(a, b);
        LOG(INFO, "main thread push task: %s", t.DebugToString().c_str());
        tp->Enqueue(t);
        sleep(1);
        tasknum--;
    }

    tp->Stop();
    tp->Wait();

    return 0;
}

运行结果:

向显示器输出日志:

向文件输入日志信息: 


网站公告

今日签到

点亮在社区的每一天
去签到