一、多线程日志系统分析
多线程程序对日志库提出了新的需求:线程安全,即多个线程可以并发写日志,多个线程的日志消息不会出现交织。线程安全不难办到,简单的办法是用一个全局 mutex 保护 IO,或者每个线程单独写一个日志文件,但这两种做法的高效性就堪忧了。前者会造成全部线程抢一个锁,后者有可能让业务线程阻塞在写磁盘操作上。
每个进程中的多线程程序最好只写一个日志文件,这样分析日志更容易,不必在多个文件中跳来跳去。再说多线程写多个文件也不一定能提速。解决办法不难想到,用一个工作线程负责收集日志消息,并写入日志文件,其他业务线程只管往这个 "日志线程" 发送日志消息,这称为 "异步日志"。
二、多线程日志系统设计
日志系统采用的是双缓冲(double buffering) 技术,基本思路是准备两块 buffer:A 和 B,前端负责往 buffer A 填数据(日志消息),后端负责将 buffer B 的数据写入文件。当 buffer A 写满之后,交换 A 和 B,让后端将 buffer A 的数据写入文件,而前端则往 buffer B 填入新的日志消息,如此往复。
用两个 buffer 的好处是在新建日志消息的时候不必等待磁盘文件操作,也避免每条新日志消息都触发(唤醒)后端日志线程。换言之,前端不是将一条条日志消息分别传送给后端,而是将多条日志消息拼成一个大的 buffer 传送给后端,相当于批处理,减少了线程唤醒的频度,降低开销。另外,为了及时将日志消息写入文件,即便 buffer A 未满,日志库也会每 3 秒执行一次上述交换写入操作。
三、日志系统核心类实现
1. LogFile 类关键成员变量
count_(0), // 写数据次数计数, 超过限值checkEveryN_时清除, 然后重新计数
mutex_(threadSafe ? new mutex{} : nullptr), // 互斥锁指针, 根据是否需要线程安全来初始化
startOfPeriod_(0), // 本次写log周期的起始时间(秒)
lastRoll_(0), // 上次roll日志文件时间(秒)
lastFlush_(0) // 上次flush日志文件时间(秒)
{
//assert(basename.find('/') == string::npos);
rollFile();
}
LogFile::~LogFile() {}
2. AsyncLogging 类定义
#include<atomic>
#include<thread>
#include<memory>
#include<mutex>
#include<condition_variable>
#include<string>
#include<vector>
using namespace std;
//own
#include"LogFile.hpp"
namespace tulun
{
class AsyncLogging
{
private:
AsyncLogging(const AsyncLogging&) = delete;
AsyncLogging& operator=(const AsyncLogging&) = delete;
private:
void workthreadfunc(); // 工作线程
private:
const int flushInterval_; // 定期(flushInterval_秒)将缓冲区的数据写到文件中
std::atomic<bool> running_; // 是否正在运行
const string basename_; // 日志filename名字
const size_t rollSize_; // 回滚大小
std::unique_ptr<std::thread> pthread_; // 执行该异步日志记录器的线程
std::mutex mutex_; // 互斥锁
std::condition_variable cond_; // 条件变量
std::string currentBuffer_; // 当前的缓冲区
std::vector<std::string> buffers_; // 数据缓冲区队列
tulun::LogFile output_; // 定义日志文件对象
public:
// rollSize //回滚大小
//flushInterval = 3 ; // 刷新间隔,默认值3秒
AsyncLogging(const string &basename,size_t rollSize,int flushInterval = 3);
~AsyncLogging();
void append(const string &info);
void append(const char *info,int len);
void start();
void stop();
void flush();
};
}
3. AsyncLogging 类实现
#include"AsyncLogging.hpp"
namespace tulun
{
static const int BufMaxLen = 4000;
static const int BufQueueSize = 16;
AsyncLogging::AsyncLogging(const std::string& basename, size_t rollSize, int flushInterval)
:flushInterval_(flushInterval), // 刷新间隔
running_(false),
rollSize_(rollSize),
pthread_(nullptr),
// latch_(1),
output_(basename,rollSize,false) // 定义日志文件对象
{
currentBuffer_.reserve(BufMaxLen);
buffers_.reserve(BufQueueSize); // vector预定大小,避免自动增长(效率更高)
}
AsyncLogging::~AsyncLogging()
{
if (running_)
{
stop();
}
}
void AsyncLogging::start()
{
running_ = true;
// 执行该异步日志记录器的线程
pthread_.reset(new std::thread(&AsyncLogging::workthreadfunc,this));
// latch_.wait();
}
void AsyncLogging::stop()
{
running_ = false;
cond_.notify_all();
pthread_->join();
}
void AsyncLogging::append(const string &info)
{
append(info.c_str(),info.size());
}
/********************************************************************
Description :
前端在生成一条日志消息时,会调用AsyncLogging::append()。
如果currentBuffer_够用,就把日志内容写入到currentBuffer_中,
如果不够用(就认为其满了),就把currentBuffer_放到已满buffer_数组中,
等待消费者线程(即后台线程)来取。则将预备好的另一块缓冲
(nextBuffer_)移用为当前缓冲区(currentBuffer_)。
*******************************************************************/
void AsyncLogging::append(const char *info,int len)
{
std::unique_lock<std::mutex> _lock(mutex_);
if(currentBuffer_.size() >= BufMaxLen ||
(currentBuffer_.capacity() - currentBuffer_.size()) < len)
{
buffers_.push_back(std::move(currentBuffer_));
currentBuffer_.reserve(BufMaxLen);
}
else
{
currentBuffer_.append(info,len);
}
cond_.notify_all();
}
void AsyncLogging::workthreadfunc()
{
//tulun::LogFile output(basename_,rollSize_,false); //定义日志文件对象
std::vector<std::string> buffersToWrite;// 线程函数的局部队列
// latch_.countDown();
while (running_)
{
//std::this_thread::sleep_for(std::chrono::milliseconds(5000));
{
std::unique_lock<std::mutex> _lock(mutex_);
if (buffers_.empty())
{
cond_.wait_for(_lock, std::chrono::seconds(flushInterval_));
// 时间点到达 ,还要获得mutex_ 才能从wait_for 函数返回;
}
// 无论cond是因何(一是超时,二是当前缓冲区写满了)而醒来,都要将currentBuffer_放到buffers_中。
// 如果是因为时间到(3秒)而醒,那么currentBuffer_还没满,此时也要将之写入LogFile中。
// 如果已经有一个前端buffer满了,那么在前端线程中就已经把一个前端buffer放到buffers_中了。
// 此时,还是需要把currentBuffer_放到buffers_中(注意,前后放置是不同的buffer,
// 因为在前端线程中,currentBuffer_已经被换成nextBuffer_指向的buffer了)。
buffers_.push_back(std::move(currentBuffer_));
currentBuffer_.reserve(BufMaxLen);
buffersToWrite.swap(buffers_);
buffers_.reserve(BufQueueSize);
// 释放mutex_ ;
}
////异步写文件
// 如果将要写入文件的buffer列表中buffer的个数大于25,那么将多余数据删除。
// 前端陷入死循环,拼命发送日志消息,超过后端的处理能力,这是典型的生产速度超过消费速度,
// 会造成数据在内存中的堆积,严重时引发性能问题(可用内存不足)或程序崩溃(分配内存失败)。
if (buffersToWrite.size() > 25)
{
char buf[256];
snprintf(buf, sizeof buf, "Dropped log messages at larger buffers\n");
fputs(buf, stderr);
// 丢掉多余日志,以腾出内存,仅保留两块缓冲区
buffersToWrite.erase(buffersToWrite.begin() + 2, buffersToWrite.end());
}
// 将buffersToWrite的数据写入到日志文件中
for (const auto& buffer : buffersToWrite)
{
output_.append(buffer.c_str(),buffer.size());
}
buffersToWrite.clear();
}
output_.flush();
}
void AsyncLogging::flush()
{
std::vector<std::string> buffersToWrite;//
std::unique_lock<std::mutex> _lock(mutex_);
buffers_.push_back(std::move(currentBuffer_));
buffersToWrite.swap(buffers_);
for (const auto& buffer : buffersToWrite)
{
output_.append(buffer.c_str(),buffer.size());
}
output_.flush();
buffersToWrite.clear();
}
}
四、日志系统测试代码
1. 基础测试
#include"Logger.hpp"
tulun::LogFile *plog = nullptr;
void writefile(const string &info)
{
plog->append(info);
}
void flushfile()
{
plog->flush();
}
int main()
{
//tulun::LogFile logfile("yhping",1024*1024*1000);
tulun::Logger::setOutput(writefile);
tulun::Logger::setFlush(flushfile);
plog = new tulun::LogFile("yhping",1024*1024*1000);
for(int i = 0;i<1000000;++i)
{
LOG_INFO<<"main"<<i;
}
return 0;
}
2. 异步日志单线程测试
#include"Logger.hpp"
tulun::AsyncLogging *asynclog = nullptr;
void asyncWriteFile(const string &info)
{
asynclog->append(info);
}
void asyncFlushFile()
{
asynclog->flush();
}
int main()
{
asynclog = new tulun::AsyncLogging("yhping",1024*10);
tulun::Logger::setOutput(asyncWriteFile);
tulun::Logger::setFlush(asyncFlushFile);
asynclog->start();
for(int i = 0;i<1000;++i)
{
LOG_INFO<<"main "<<i;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
return 0;
}
3. 异步日志多线程测试
#include"Logger.hpp"
tulun::AsyncLogging *asynclog = nullptr;
void asyncWriteFile(const string &info)
{
asynclog->append(info);
}
void asyncFlushFile()
{
asynclog->flush();
}
void func(char ch)
{
for(int i = 0;i<1000;++i)
{
LOG_INFO<<"main "<<i<<ch;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
int main()
{
asynclog = new tulun::AsyncLogging("yhping",1024*10);
tulun::Logger::setOutput(asyncWriteFile);
tulun::Logger::setFlush(asyncFlushFile);
asynclog->start();
std::thread tha(func,'a');
std::thread thb(func,'b');
std::thread thc(func,'c');
tha.join();
thb.join();
thc.join();
return 0;
}
五、倒计时同步工具类
1. CountDownLatch 类定义
#include<mutex>
#include<condition_variable>
using namespace std;
namespace tulun
{
class CountDownLatch
{
public:
explicit CountDownLatch(int count);
void wait();
void countDown();
int getCount() const;
private:
mutable std::mutex mutex_;
std::condition_variable condition_;
int count_;
};
}
2. CountDownLatch 类实现
#include"CountDownLatch.hpp"
namespace tulun
{
CountDownLatch::CountDownLatch(int count)
:count_(count) {}
void CountDownLatch::wait()
{
std::unique_lock<std::mutex> _lock(mutex_);
while(count_ > 0)
{
condition_.wait(_lock);
}
}
void CountDownLatch::countDown()
{
std::unique_lock<std::mutex> _lock(mutex_);
count_-=1;
if(count_ == 0)
{
condition_.notify_all();
}
}
int CountDownLatch::getCount() const
{
std::unique_lock<std::mutex> _lock(mutex_);
return count_;
}
}
3. 集成 CountDownLatch 的 AsyncLogging 修改
在 AsyncLogging 类中添加倒计时对象:
tulun::CountDownLatch latch_; // 倒计时对象
构造函数初始化:
latch_(1),
修改 start () 和 workthreadfunc ():
void AsyncLogging::start()
{
running_ = true;
// 执行该异步日志记录器的线程
pthread_.reset(new std::thread(&AsyncLogging::workthreadfunc,this));
latch_.wait();
}
void AsyncLogging::workthreadfunc()
{
//tulun::LogFile output(basename_,rollSize_,false); //定义日志文件对象
std::vector<std::string> buffersToWrite;// 线程函数的局部队列
latch_.countDown();
while (running_)
{
/// 原有逻辑
}
}
六、TCP/IP 协议 C/S 模型实现
1. 服务器代码实现
#include<stdio.h>
#include<assert.h>
#include<netinet/in.h> // sockaddr_in
#include<string.h> // bzero
#include<arpa/inet.h> // inet_ntop; inet_pton
#include<sys/socket.h> // socket;
#include<sys/types.h> // uint16_t
#include<unistd.h> // read
#include<errno.h> // errno;
const uint16_t PORT = 12345;
const char *IP = "127.0.0.1";
const int LISTENSIZE = 10;
const int BUFFSIZE = 128;
int main(int argc,char *argv[])
{
int listenfd = 0, connfd = 0;
int ret = 0;
char buff[BUFFSIZE]={};
struct sockaddr_in seraddr,cliaddr;
bzero(&seraddr,sizeof(seraddr));
seraddr.sin_family = AF_INET;
inet_pton(AF_INET,IP,&seraddr.sin_addr);
seraddr.sin_port = htons(PORT);
listenfd = socket(PF_INET,SOCK_STREAM,0);
assert(listenfd != -1);
ret = bind(listenfd,(struct sockaddr*) &seraddr,sizeof(seraddr));
assert(ret != -1);
ret = listen(listenfd,LISTENSIZE);
assert(ret != -1);
while(1)
{
socklen_t cliaddrlen = sizeof(cliaddr); // socklen_t => unsigned int;
connfd = accept(listenfd,(struct sockaddr*)&cliaddr,&cliaddrlen);
assert(connfd != -1);
//int len = read(connfd,buff,sizeof(buff));
int len = recv(connfd,buff,sizeof(buff),0);
if(len > 0)
{
printf("client : data: %d %s \n",len ,buff);
send(connfd,buff,len,0);
close(connfd);
}else if(len == 0)
{
close(connfd);
}else
{
printf("error %s \n",strerror(errno));
close(connfd);
}
}
close(listenfd);
return 0;
}
2.客户端代码实现
#include<stdio.h>
#include<arpa/inet.h>
#include<sys/types.h>
#include<unistd.h>
#include<sys/socket.h>
#include<string.h>
#define MAXLEN 64
#define PORT 2344
int main()
{
struct sockaddr_in serveraddr;
int connfd;
char buf[MAXLEN] = "";
// 创建socket
connfd = socket(AF_INET, SOCK_STREAM, 0);
if (connfd < 0)
{
perror("socket creation failed");
return 1;
}
// 初始化服务器地址结构
bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET; // IPv4协议
serveraddr.sin_port = htons(PORT); // 端口转换
inet_pton(AF_INET, "127.0.0.1", &serveraddr.sin_addr); // IP地址转换
// 连接服务器
if (connect(connfd, (struct sockaddr *)&serveraddr, sizeof(serveraddr)) < 0)
{
perror("connection failed");
close(connfd);
return 1;
}
// 循环读取用户输入并发送给服务器
while (~scanf("%s", buf))
{
// 输入"quit"退出
if (!strncmp("quit", buf, 4))
break;
// 发送数据
write(connfd, buf, strlen(buf));
// 接收服务器响应
int retlen = read(connfd, buf, MAXLEN);
if (retlen > 0)
{
printf("Server response: %s\n", buf);
}
}
// 发送退出信号并关闭连接
write(connfd, "quit", 4);
close(connfd);
return 0;
}