多线程异步日志系统与实现及 TCP/IP C/S 模型

发布于:2025-08-04 ⋅ 阅读:(17) ⋅ 点赞:(0)

一、多线程日志系统分析

多线程程序对日志库提出了新的需求:线程安全,即多个线程可以并发写日志,多个线程的日志消息不会出现交织。线程安全不难办到,简单的办法是用一个全局 mutex 保护 IO,或者每个线程单独写一个日志文件,但这两种做法的高效性就堪忧了。前者会造成全部线程抢一个锁,后者有可能让业务线程阻塞在写磁盘操作上。

每个进程中的多线程程序最好只写一个日志文件,这样分析日志更容易,不必在多个文件中跳来跳去。再说多线程写多个文件也不一定能提速。解决办法不难想到,用一个工作线程负责收集日志消息,并写入日志文件,其他业务线程只管往这个 "日志线程" 发送日志消息,这称为 "异步日志"。

二、多线程日志系统设计

日志系统采用的是双缓冲(double buffering) 技术,基本思路是准备两块 buffer:A 和 B,前端负责往 buffer A 填数据(日志消息),后端负责将 buffer B 的数据写入文件。当 buffer A 写满之后,交换 A 和 B,让后端将 buffer A 的数据写入文件,而前端则往 buffer B 填入新的日志消息,如此往复。

用两个 buffer 的好处是在新建日志消息的时候不必等待磁盘文件操作,也避免每条新日志消息都触发(唤醒)后端日志线程。换言之,前端不是将一条条日志消息分别传送给后端,而是将多条日志消息拼成一个大的 buffer 传送给后端,相当于批处理,减少了线程唤醒的频度,降低开销。另外,为了及时将日志消息写入文件,即便 buffer A 未满,日志库也会每 3 秒执行一次上述交换写入操作。

三、日志系统核心类实现

1. LogFile 类关键成员变量

count_(0),                       // 写数据次数计数, 超过限值checkEveryN_时清除, 然后重新计数
mutex_(threadSafe ? new mutex{} : nullptr), // 互斥锁指针, 根据是否需要线程安全来初始化
startOfPeriod_(0),               // 本次写log周期的起始时间(秒)
lastRoll_(0),                    // 上次roll日志文件时间(秒)
lastFlush_(0)                    // 上次flush日志文件时间(秒)
{
    //assert(basename.find('/') == string::npos);
    rollFile(); 
}
LogFile::~LogFile() {}

2. AsyncLogging 类定义

#include<atomic>
#include<thread>
#include<memory>
#include<mutex>
#include<condition_variable> 
#include<string>
#include<vector>
using namespace std;
//own
#include"LogFile.hpp"
namespace tulun
{
    class AsyncLogging
    {
    private:
        AsyncLogging(const AsyncLogging&) = delete;
        AsyncLogging& operator=(const AsyncLogging&) = delete;
    private:
        void workthreadfunc();      // 工作线程
    private:
        const int flushInterval_;   // 定期(flushInterval_秒)将缓冲区的数据写到文件中
        std::atomic<bool> running_; // 是否正在运行
        const string basename_;     // 日志filename名字
        const size_t rollSize_;     // 回滚大小
        std::unique_ptr<std::thread> pthread_; // 执行该异步日志记录器的线程
        std::mutex mutex_;                  // 互斥锁
        std::condition_variable cond_;      // 条件变量
        std::string currentBuffer_;         // 当前的缓冲区
        std::vector<std::string>  buffers_; // 数据缓冲区队列
        tulun::LogFile output_;             // 定义日志文件对象 
    public:
        // rollSize //回滚大小
        //flushInterval = 3 ; // 刷新间隔,默认值3秒
        AsyncLogging(const string &basename,size_t rollSize,int flushInterval = 3);
        ~AsyncLogging();
        void append(const string &info);
        void append(const char *info,int len);
        void start();
        void stop();
        void flush();
    };
}

3. AsyncLogging 类实现

#include"AsyncLogging.hpp"
namespace tulun
{
    static const int BufMaxLen = 4000;
    static const int BufQueueSize = 16;
    AsyncLogging::AsyncLogging(const std::string& basename, size_t rollSize, int flushInterval)
    :flushInterval_(flushInterval), // 刷新间隔
    running_(false),
    rollSize_(rollSize),
    pthread_(nullptr),
    // latch_(1),
    output_(basename,rollSize,false)  // 定义日志文件对象 
    {
        currentBuffer_.reserve(BufMaxLen);
        buffers_.reserve(BufQueueSize); // vector预定大小,避免自动增长(效率更高)
    }
    
    AsyncLogging::~AsyncLogging()
    {
        if (running_)
        {
            stop();
        }
    }
    
    void AsyncLogging::start()
    {
        running_ = true;
        // 执行该异步日志记录器的线程
        pthread_.reset(new std::thread(&AsyncLogging::workthreadfunc,this));
        // latch_.wait();
    }
    
    void AsyncLogging::stop()
    {
        running_ = false;
        cond_.notify_all();
        pthread_->join();
    }
    
    void AsyncLogging::append(const string &info)
    {
        append(info.c_str(),info.size());
    }
    
    /******************************************************************** 
    Description : 
    前端在生成一条日志消息时,会调用AsyncLogging::append()。
    如果currentBuffer_够用,就把日志内容写入到currentBuffer_中,
    如果不够用(就认为其满了),就把currentBuffer_放到已满buffer_数组中,
    等待消费者线程(即后台线程)来取。则将预备好的另一块缓冲
    (nextBuffer_)移用为当前缓冲区(currentBuffer_)。
    *******************************************************************/
    void AsyncLogging::append(const char *info,int len)
    {
        std::unique_lock<std::mutex> _lock(mutex_);
        if(currentBuffer_.size() >= BufMaxLen || 
        (currentBuffer_.capacity() - currentBuffer_.size()) < len)
        {
            buffers_.push_back(std::move(currentBuffer_));
            currentBuffer_.reserve(BufMaxLen);
        }
        else
        {
            currentBuffer_.append(info,len);
        }
        cond_.notify_all();
    }
    
    void AsyncLogging::workthreadfunc()
    {
        //tulun::LogFile output(basename_,rollSize_,false); //定义日志文件对象 
        std::vector<std::string> buffersToWrite;// 线程函数的局部队列
        // latch_.countDown();
        while (running_)
        {
            //std::this_thread::sleep_for(std::chrono::milliseconds(5000));
            {
                std::unique_lock<std::mutex> _lock(mutex_);
                if (buffers_.empty())
                {
                    cond_.wait_for(_lock, std::chrono::seconds(flushInterval_));
                    // 时间点到达 ,还要获得mutex_ 才能从wait_for 函数返回;
                }
                // 无论cond是因何(一是超时,二是当前缓冲区写满了)而醒来,都要将currentBuffer_放到buffers_中。 
                // 如果是因为时间到(3秒)而醒,那么currentBuffer_还没满,此时也要将之写入LogFile中。
                // 如果已经有一个前端buffer满了,那么在前端线程中就已经把一个前端buffer放到buffers_中了。
                // 此时,还是需要把currentBuffer_放到buffers_中(注意,前后放置是不同的buffer,
                // 因为在前端线程中,currentBuffer_已经被换成nextBuffer_指向的buffer了)。
                buffers_.push_back(std::move(currentBuffer_));
                currentBuffer_.reserve(BufMaxLen);
                buffersToWrite.swap(buffers_);
                buffers_.reserve(BufQueueSize);
                // 释放mutex_ ;
            }
            ////异步写文件
            // 如果将要写入文件的buffer列表中buffer的个数大于25,那么将多余数据删除。
            // 前端陷入死循环,拼命发送日志消息,超过后端的处理能力,这是典型的生产速度超过消费速度, 
            // 会造成数据在内存中的堆积,严重时引发性能问题(可用内存不足)或程序崩溃(分配内存失败)。 
            if (buffersToWrite.size() > 25)  
            {  
                char buf[256]; 
                snprintf(buf, sizeof buf, "Dropped log messages at larger buffers\n"); 
                fputs(buf, stderr); 
                // 丢掉多余日志,以腾出内存,仅保留两块缓冲区 
                buffersToWrite.erase(buffersToWrite.begin() + 2, buffersToWrite.end()); 
            }
            // 将buffersToWrite的数据写入到日志文件中
            for (const auto& buffer : buffersToWrite)
            {
                output_.append(buffer.c_str(),buffer.size());
            }
            buffersToWrite.clear();
        }
        output_.flush();
    }
    
    void AsyncLogging::flush()
    {
        std::vector<std::string> buffersToWrite;//  
        std::unique_lock<std::mutex> _lock(mutex_);
        buffers_.push_back(std::move(currentBuffer_));
        buffersToWrite.swap(buffers_);
        for (const auto& buffer : buffersToWrite)
        {
            output_.append(buffer.c_str(),buffer.size());
        }
        output_.flush();
        buffersToWrite.clear();
    }
}

四、日志系统测试代码

1. 基础测试

#include"Logger.hpp"
tulun::LogFile *plog = nullptr;
void writefile(const string &info)
{
    plog->append(info);
}
void flushfile()
{
    plog->flush();
}
int main()
{
    //tulun::LogFile logfile("yhping",1024*1024*1000);
    tulun::Logger::setOutput(writefile);
    tulun::Logger::setFlush(flushfile);
    plog = new tulun::LogFile("yhping",1024*1024*1000);
    
    for(int i = 0;i<1000000;++i)
    {
        LOG_INFO<<"main"<<i;
    }
    return 0;
}

2. 异步日志单线程测试

#include"Logger.hpp"
tulun::AsyncLogging *asynclog = nullptr;
void asyncWriteFile(const string &info)
{
    asynclog->append(info);
}
void asyncFlushFile()
{
    asynclog->flush();
}
int main()
{
    asynclog = new tulun::AsyncLogging("yhping",1024*10);
    tulun::Logger::setOutput(asyncWriteFile);
    tulun::Logger::setFlush(asyncFlushFile);
    asynclog->start();
    for(int i = 0;i<1000;++i)
    {
        LOG_INFO<<"main "<<i;
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }
    return 0;
}

3. 异步日志多线程测试

#include"Logger.hpp"
tulun::AsyncLogging *asynclog = nullptr;
void asyncWriteFile(const string &info)
{
    asynclog->append(info);
}
void asyncFlushFile()
{
    asynclog->flush();
}
void func(char ch)
{
    for(int i = 0;i<1000;++i)
    {
        LOG_INFO<<"main "<<i<<ch;
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    } 
}
int main()
{
    asynclog = new tulun::AsyncLogging("yhping",1024*10);
    tulun::Logger::setOutput(asyncWriteFile);
    tulun::Logger::setFlush(asyncFlushFile);
    asynclog->start();
    std::thread tha(func,'a');
    std::thread thb(func,'b');
    std::thread thc(func,'c');
    tha.join();
    thb.join();
    thc.join();
    return 0;
}

五、倒计时同步工具类

1. CountDownLatch 类定义

#include<mutex>
#include<condition_variable>
using namespace std;
namespace tulun
{
    class CountDownLatch 
    {
    public:
        explicit CountDownLatch(int count);
        void wait();
        void countDown();
        int getCount() const;
    private:
        mutable std::mutex mutex_;
        std::condition_variable condition_;
        int count_;
    }; 
}

2. CountDownLatch 类实现

#include"CountDownLatch.hpp"
namespace tulun
{
    CountDownLatch::CountDownLatch(int count)
    :count_(count)   {}
    
    void CountDownLatch::wait()
    {
        std::unique_lock<std::mutex> _lock(mutex_);
        while(count_ > 0)
        {
            condition_.wait(_lock);
        }
    }
    
    void CountDownLatch::countDown()
    {
        std::unique_lock<std::mutex> _lock(mutex_);
        count_-=1;
        if(count_ == 0)
        {
            condition_.notify_all();
        }
    }
    
    int CountDownLatch::getCount() const
    {
        std::unique_lock<std::mutex> _lock(mutex_);
        return count_;
    }
}

3. 集成 CountDownLatch 的 AsyncLogging 修改

在 AsyncLogging 类中添加倒计时对象:

tulun::CountDownLatch latch_; // 倒计时对象

构造函数初始化:

latch_(1),

修改 start () 和 workthreadfunc ():

void AsyncLogging::start()
{
    running_ = true;
    // 执行该异步日志记录器的线程
    pthread_.reset(new std::thread(&AsyncLogging::workthreadfunc,this));
    latch_.wait();
}

void AsyncLogging::workthreadfunc()
{
    //tulun::LogFile output(basename_,rollSize_,false); //定义日志文件对象 
    std::vector<std::string> buffersToWrite;// 线程函数的局部队列
    latch_.countDown();
    while (running_)
    {
        /// 原有逻辑
    }
}

六、TCP/IP 协议 C/S 模型实现

1. 服务器代码实现

#include<stdio.h>
#include<assert.h>
#include<netinet/in.h> // sockaddr_in
#include<string.h>     // bzero
#include<arpa/inet.h>   // inet_ntop; inet_pton
#include<sys/socket.h>  // socket; 
#include<sys/types.h>   // uint16_t
#include<unistd.h>      // read
#include<errno.h>       // errno;
const uint16_t PORT = 12345;
const char *IP = "127.0.0.1";
const int LISTENSIZE = 10;
const int BUFFSIZE = 128;
int main(int argc,char *argv[])
{
    int listenfd = 0, connfd = 0;
    int ret = 0;
    char buff[BUFFSIZE]={};
    struct sockaddr_in seraddr,cliaddr;
    bzero(&seraddr,sizeof(seraddr)); 
    seraddr.sin_family = AF_INET;
    inet_pton(AF_INET,IP,&seraddr.sin_addr);
    seraddr.sin_port = htons(PORT);
    listenfd = socket(PF_INET,SOCK_STREAM,0);
    assert(listenfd != -1);
    
    ret = bind(listenfd,(struct sockaddr*) &seraddr,sizeof(seraddr));
    assert(ret != -1);
    ret = listen(listenfd,LISTENSIZE);
    assert(ret != -1);
    while(1)
    {
        socklen_t cliaddrlen = sizeof(cliaddr); // socklen_t => unsigned int;
        connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
        assert(connfd != -1);
        //int len = read(connfd,buff,sizeof(buff));
        int len = recv(connfd,buff,sizeof(buff),0);
        if(len > 0)
        {
            printf("client : data: %d %s \n",len ,buff);
            send(connfd,buff,len,0);
            close(connfd);
        }else if(len == 0)
        {
            close(connfd);
        }else
        {
            printf("error %s \n",strerror(errno));
            close(connfd);
        }
    }
    close(listenfd);
    return 0;
}

2.客户端代码实现

#include<stdio.h>
#include<arpa/inet.h>
#include<sys/types.h>
#include<unistd.h>
#include<sys/socket.h>
#include<string.h>
#define MAXLEN 64
#define PORT 2344

int main()
{
    struct sockaddr_in serveraddr;
    int connfd;
    char buf[MAXLEN] = "";
    
    // 创建socket
    connfd = socket(AF_INET, SOCK_STREAM, 0);
    if (connfd < 0)
    {
        perror("socket creation failed");
        return 1;
    }
    
    // 初始化服务器地址结构
    bzero(&serveraddr, sizeof(serveraddr));
    serveraddr.sin_family = AF_INET;                // IPv4协议
    serveraddr.sin_port = htons(PORT);              // 端口转换
    inet_pton(AF_INET, "127.0.0.1", &serveraddr.sin_addr);  // IP地址转换
    
    // 连接服务器
    if (connect(connfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0)
    {
        perror("connection failed");
        close(connfd);
        return 1;
    }
    
    // 循环读取用户输入并发送给服务器
    while (~scanf("%s", buf))
    {
        // 输入"quit"退出
        if (!strncmp("quit", buf, 4))
            break;
            
        // 发送数据
        write(connfd, buf, strlen(buf));
        
        // 接收服务器响应
        int retlen = read(connfd, buf, MAXLEN);
        if (retlen > 0)
        {
            printf("Server response: %s\n", buf);
        }
    }
    
    // 发送退出信号并关闭连接
    write(connfd, "quit", 4);
    close(connfd);
    return 0;
}


网站公告

今日签到

点亮在社区的每一天
去签到