socket编程-UDP(3)-聊天室系统
Mutex.hpp
#pragma once // 防止头文件重复包含
#include <iostream> // 标准输入输出(实际未使用,可移除)
#include <pthread.h> // POSIX线程库,提供互斥锁原语
namespace MutexModule
{
class Mutex
{
public:
Mutex()
{
pthread_mutex_init(&_mutex, nullptr); // 初始化互斥锁,nullptr表示默认属性
}
void Lock()
{
int n = pthread_mutex_lock(&_mutex); // 阻塞直到获取锁
(void)n; // 忽略返回值(实际工程中应检查错误)
}
void Unlock()
{
int n = pthread_mutex_unlock(&_mutex); // 释放锁
(void)n;
}
~Mutex()
{
pthread_mutex_destroy(&_mutex);
}
pthread_mutex_t *Get()
{
return &_mutex;
}
private:
pthread_mutex_t _mutex;
};
class LockGuard
{
public:
LockGuard(Mutex &mutex):_mutex(mutex)
{
_mutex.Lock();// 构造时自动加锁
}
~LockGuard()
{
_mutex.Unlock();
}
private:
Mutex &_mutex;// 引用形式的Mutex对象(避免拷贝问题)
};
多线程:
Cond.hpp
#pragma once
#include <iostream>
#include <pthread.h>
#include "Mutex.hpp"
using namespace MutexModule;
namespace CondModule
{
class Cond
{
public:
Cond()
{
pthread_cond_init(&_cond, nullptr);
}
void Wait(Mutex &mutex)//线程等待条件
{
int n = pthread_cond_wait(&_cond, mutex.Get());// 原子操作:解锁mutex并等待
(void)n;
}
void Signal()//唤醒单个线程
{
// 唤醒在条件变量下等待的一个线程
int n = pthread_cond_signal(&_cond);// 唤醒至少一个等待线程
(void)n;
}
void Broadcast()//唤醒所有线程
{
// 唤醒所有在条件变量下等待的线程
int n = pthread_cond_broadcast(&_cond);// 唤醒所有等待线程
(void)n;
}
~Cond()
{
pthread_cond_destroy(&_cond);
}
private:
pthread_cond_t _cond;// POSIX原生条件变量
};
};
Log.hpp
#ifndef __LOG_HPP__
#define __LOG_HPP__
#include <iostream>
#include <cstdio>
#include <string>
#include <filesystem> //C++17
#include <sstream>
#include <fstream>
#include <memory>
#include <ctime>
#include <unistd.h>
#include "Mutex.hpp"
namespace LogModule
{
using namespace MutexModule;
const std::string gsep = "\r\n";
// 策略模式,C++多态特性
// 2. 刷新策略 a: 显示器打印 b:向指定的文件写入
// 刷新策略基类
class LogStrategy
{
public:
~LogStrategy() = default;
virtual void SyncLog(const std::string &message) = 0;
};
// 显示器打印日志的策略 : 子类
class ConsoleLogStrategy : public LogStrategy
{
public:
ConsoleLogStrategy()
{
}
void SyncLog(const std::string &message) override
{
LockGuard lockguard(_mutex);
std::cout << message << gsep;
}
~ConsoleLogStrategy()
{
}
private:
Mutex _mutex;
};
// 文件打印日志的策略 : 子类
const std::string defaultpath = "./log";
const std::string defaultfile = "my.log";
class FileLogStrategy : public LogStrategy
{
public:
FileLogStrategy(const std::string &path = defaultpath, const std::string &file = defaultfile)
: _path(path),
_file(file)
{
LockGuard lockguard(_mutex);
if (std::filesystem::exists(_path))
{
return;
}
try
{
std::filesystem::create_directories(_path);
}
catch (const std::filesystem::filesystem_error &e)
{
std::cerr << e.what() << '\n';
}
}
void SyncLog(const std::string &message) override
{
LockGuard lockguard(_mutex);
std::string filename = _path + (_path.back() == '/' ? "" : "/") + _file; // "./log/" + "my.log"
std::ofstream out(filename, std::ios::app); // 追加写入的 方式打开
if (!out.is_open())
{
return;
}
out << message << gsep;
out.close();
}
~FileLogStrategy()
{
}
private:
std::string _path; // 日志文件所在路径
std::string _file; // 日志文件本身
Mutex _mutex;
};
// 形成一条完整的日志&&根据上面的策略,选择不同的刷新方式
// 1. 形成日志等级
enum class LogLevel
{
DEBUG,
INFO,
WARNING,
ERROR,
FATAL
};
std::string Level2Str(LogLevel level)
{
switch (level)
{
case LogLevel::DEBUG:
return "DEBUG";
case LogLevel::INFO:
return "INFO";
case LogLevel::WARNING:
return "WARNING";
case LogLevel::ERROR:
return "ERROR";
case LogLevel::FATAL:
return "FATAL";
default:
return "UNKNOWN";
}
}
//时间戳生成
std::string GetTimeStamp()
{
// 输出格式:2023-08-20 14:30:45
time_t curr = time(nullptr);
struct tm curr_tm;
localtime_r(&curr, &curr_tm);
char timebuffer[128];
snprintf(timebuffer, sizeof(timebuffer),"%4d-%02d-%02d %02d:%02d:%02d",
curr_tm.tm_year+1900,
curr_tm.tm_mon+1,
curr_tm.tm_mday,
curr_tm.tm_hour,
curr_tm.tm_min,
curr_tm.tm_sec
);
return timebuffer;
}
// 1. 形成日志 && 2. 根据不同的策略,完成刷新
class Logger
{
public:
Logger()
{
EnableConsoleLogStrategy();
}
void EnableFileLogStrategy()
{
_fflush_strategy = std::make_unique<FileLogStrategy>();
}
void EnableConsoleLogStrategy()
{
_fflush_strategy = std::make_unique<ConsoleLogStrategy>();
}
// 表示的是未来的一条日志
class LogMessage
{
public:
LogMessage(LogLevel &level, std::string &src_name, int line_number, Logger &logger)
: _curr_time(GetTimeStamp()),
_level(level),
_pid(getpid()),
_src_name(src_name),
_line_number(line_number),
_logger(logger)
{
// 日志的左边部分,合并起来
std::stringstream ss;
ss << "[" << _curr_time << "] "
<< "[" << Level2Str(_level) << "] "
<< "[" << _pid << "] "
<< "[" << _src_name << "] "
<< "[" << _line_number << "] "
<< "- ";
_loginfo = ss.str();
}
// LogMessage() << "hell world" << "XXXX" << 3.14 << 1234
template <typename T>
LogMessage &operator<<(const T &info)
{
// a = b = c =d;
// 日志的右半部分,可变的
std::stringstream ss;
ss << info;
_loginfo += ss.str();
return *this;
}
~LogMessage()
{
if (_logger._fflush_strategy)
{
_logger._fflush_strategy->SyncLog(_loginfo);
}
}
private:
std::string _curr_time;
LogLevel _level;
pid_t _pid;
std::string _src_name;
int _line_number;
std::string _loginfo; // 合并之后,一条完整的信息
Logger &_logger;
};
// 这里故意写成返回临时对象
LogMessage operator()(LogLevel level, std::string name, int line)
{
return LogMessage(level, name, line, *this);
}
~Logger()
{
}
private:
std::unique_ptr<LogStrategy> _fflush_strategy;
};
// 全局日志对象
Logger logger;
// 使用宏,简化用户操作,获取文件名和行号
#define LOG(level) logger(level, __FILE__, __LINE__)
#define Enable_Console_Log_Strategy() logger.EnableConsoleLogStrategy()
#define Enable_File_Log_Strategy() logger.EnableFileLogStrategy()
}
#endif
InetAddr.hpp
#pragma once
#include <iostream>
#include <string>
#include <sys/socket.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <netinet/in.h>
class InetAddr
{
public:
InetAddr(struct sockaddr_in &addr) : _addr(addr)
{
_port = ntohs(_addr.sin_port); // 从网络中拿到的!网络序列
_ip = inet_ntoa(_addr.sin_addr); // 4字节网络风格的IP -> 点分十进制的字符串风格的IP
}
uint16_t Port() {return _port;}
std::string Ip() {return _ip;}
const struct sockaddr_in &NetAddr() { return _addr; }
bool operator==(const InetAddr &addr)
{
return addr._ip == _ip && addr._port == _port;
}
std::string StringAddr()
{
return _ip + ":" + std::to_string(_port);
}
~InetAddr()
{}
private:
struct sockaddr_in _addr;//原始网络地址结构体
std::string _ip;//字符串格式的IP地址
uint16_t _port;//主机字节序的端口号
};
Route.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include "InetAddr.hpp"
#include "Log.hpp"
using namespace LogModule;
class Route
{
private:
bool IsExist(InetAddr &peer)
{
for (auto &user : _online_user)
{
if (user == peer)
{
return true;
}
}
return false;
}
void AddUser(InetAddr &peer)
{
LOG(LogLevel::INFO) << "新增一个在线用户: " << peer.StringAddr();
_online_user.push_back(peer);
}
void DeleteUser(InetAddr &peer)
{
for (auto iter = _online_user.begin(); iter != _online_user.end(); iter++)
{
if (*iter == peer)
{
LOG(LogLevel::INFO) << "删除一个在线用户:" << peer.StringAddr() << "成功";
_online_user.erase(iter);
break;
}
}
}
public:
Route()
{
}
void MessageRoute(int sockfd, const std::string &message, InetAddr &peer)
{
// 1. 新用户自动注册
if (!IsExist(peer))
{
AddUser(peer);
}
// 2. 构造带地址前缀的消息
std::string send_message = peer.StringAddr() + "# " + message; // 127.0.0.1:8080# 你好
// 3. 广播给所有在线用户
for (auto &user : _online_user)
{
sendto(sockfd, send_message.c_str(), send_message.size(), 0, (const struct sockaddr *)&(user.NetAddr()), sizeof(user.NetAddr()));
}
// 4. 处理退出指令
// 这个用户一定已经在线了
if (message == "QUIT")
{
LOG(LogLevel::INFO) << "删除一个在线用户: " << peer.StringAddr();
DeleteUser(peer);
}
}
~Route()
{
}
private:
// 首次给我发消息,等同于登录
std::vector<InetAddr> _online_user; // 在线用户
};
Thread.hpp
#ifndef _THREAD_H_
#define _THREAD_H_
// ...(标准库和POSIX线程头文件)
#include <iostream>
#include <string>
#include <pthread.h>
#include <cstdio>
#include <cstring>
#include <functional>
namespace ThreadModlue
{
static uint32_t number = 1; // 用于生成线程名称的计数器
class Thread
{
using func_t = std::function<void()>; // 线程执行的任务(无参无返回)
private:
void EnableDetach()
{
_isdetach = true;
}
void EnableRunning()
{
_isrunning = true;
}
static void *Routine(void *args) // 属于类内的成员函数,默认包含this指针!
{
Thread *self = static_cast<Thread *>(args);// 将void*转为Thread对象指针
self->EnableRunning();
if (self->_isdetach)
self->Detach(); // 若需分离,调用Detach
pthread_setname_np(self->_tid, self->_name.c_str());// 设置线程名(Linux特有)
self->_func(); // 执行用户任务
return nullptr;
}
// bug
public:
Thread(func_t func)// 生成唯一线程名
: _tid(0),
_isdetach(false),
_isrunning(false),
res(nullptr),
_func(func)
{
_name = "thread-" + std::to_string(number++);
}
void Detach()
{
if (_isdetach)
return;
if (_isrunning)
pthread_detach(_tid);// 分离运行中的线程
EnableDetach();
}
bool Start()
{
if (_isrunning)
return false;// 避免重复启动
int n = pthread_create(&_tid, nullptr, Routine, this);// 创建线程
if (n != 0)
{
return false;
}
else
{
return true;
}
}
bool Stop()
{
if (_isrunning)
{
int n = pthread_cancel(_tid);//强制终止线程
if (n != 0)
{
return false;
}
else
{
_isrunning = false;
return true;
}
}
return false;
}
void Join()
{
if (_isdetach)
{
return;// 分离线程不可Join
}
int n = pthread_join(_tid, &res);// 阻塞等待线程结束
if (n != 0)
{
}
else
{
}
}
pthread_t Id()
{
return _tid;
}
~Thread()
{
}
private:
pthread_t _tid;//线程ID
std::string _name;//线程名称
bool _isdetach;//标记线程是否为分离状态
bool _isrunning;//标记线程是否在运行
void *res;//存储线程返回值
func_t _func;//用户定义的任务函数
};
}
#endif
ThreadPool.hpp
#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <queue>
#include "Log.hpp"
#include "Thread.hpp"
#include "Cond.hpp"
#include "Mutex.hpp"
// .hpp header only
namespace ThreadPoolModule
{
using namespace ThreadModlue;
using namespace LogModule;
using namespace CondModule;
using namespace MutexModule;
static const int gnum = 5;
template <typename T>
class ThreadPool
{
private:
void WakeUpAllThread()
{
LockGuard lockguard(_mutex);
if (_sleepernum)
_cond.Broadcast();
LOG(LogLevel::INFO) << "唤醒所有的休眠线程";
}
void WakeUpOne()
{
_cond.Signal();
LOG(LogLevel::INFO) << "唤醒一个休眠线程";
}
ThreadPool(int num = gnum) : _num(num), _isrunning(false), _sleepernum(0)
{
for (int i = 0; i < num; i++)
{
_threads.emplace_back(
[this]()
{
HandlerTask();
});
}
}
void Start()
{
if (_isrunning)
return;
_isrunning = true;
for (auto &thread : _threads)
{
thread.Start();
LOG(LogLevel::INFO) << "start new thread success: " << thread.Name();
}
}
ThreadPool(const ThreadPool<T> &) = delete;
ThreadPool<T> &operator=(const ThreadPool<T> &) = delete;
public:
//单例模式:全局唯一线程池实例
static ThreadPool<T> *GetInstance()
{
if (inc == nullptr)// 双重检查锁定
{
LockGuard lockguard(_lock);
LOG(LogLevel::DEBUG) << "获取单例....";
if (inc == nullptr)
{
LOG(LogLevel::DEBUG) << "首次使用单例, 创建之....";
inc = new ThreadPool<T>();// 懒加载初始化:首次调用时创建线程池。
inc->Start();
}
}
return inc;
}
void Stop()
{
if (!_isrunning)
return;
_isrunning = false;
// 唤醒所有的线层
WakeUpAllThread();
}
void Join()
{
for (auto &thread : _threads)
{
thread.Join();
}
}
//生产者-消费者:主线程投递任务(Enqueue),工作线程(HandlerTask)从队列取任务执行。
void HandlerTask()
{
char name[128];
pthread_getname_np(pthread_self(), name, sizeof(name));
// 条件等待:队列空且线程池运行中
while (true)
{
T t;
{
LockGuard lockguard(_mutex);
// 1. a.队列为空 b. 线程池没有退出
while (_taskq.empty() && _isrunning)
{
_sleepernum++;
_cond.Wait(_mutex);// 自动释放锁并休眠
_sleepernum--;
}
// 退出条件:线程池关闭且队列空
// 2. 内部的线程被唤醒
if (!_isrunning && _taskq.empty())
{
LOG(LogLevel::INFO) << name << " 退出了, 线程池退出&&任务队列为空";
break;
}
// 一定有任务
// 取任务
t = _taskq.front(); // 从q中获取任务,任务已经是线程私有的了!!!
_taskq.pop();
}// 锁作用域结束,自动释放
t(); // 处理任务,需/要在临界区内部处理吗?1 0
}
}
bool Enqueue(const T &in)
{
if (_isrunning)
{
LockGuard lockguard(_mutex);
_taskq.push(in);
if (_threads.size() == _sleepernum)
WakeUpOne();// 有休眠线程才唤醒
return true;
}
return false;// 线程池未运行
}
~ThreadPool()
{
}
private:
std::vector<Thread> _threads;//工作线程集合
int _num; // 线程池中,线程的个数
std::queue<T> _taskq;//任务队列(存储可调用对象)
Cond _cond;//控制线程休眠与唤醒
Mutex _mutex;//保护任务队列和状态变量
bool _isrunning;//线程池运行状态标志
int _sleepernum;
// bug??
static ThreadPool<T> *inc; // 单例指针
static Mutex _lock;//当前休眠的线程数(优化唤醒策略)
};
template <typename T>
ThreadPool<T> *ThreadPool<T>::inc = nullptr;
template <typename T>
Mutex ThreadPool<T>::_lock;
}
UdpSever.hpp
#pragma once
#include <iostream>
#include <string>
#include <functional>
#include <strings.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "Log.hpp"
#include "InetAddr.hpp"
using namespace LogModule;
using func_t = std::function<void(int sockfd, const std::string&, InetAddr&)>;
const int defaultfd = -1;
class UdpServer
{
public:
UdpServer(uint16_t port, func_t func)
: _sockfd(defaultfd),
_port(port),
_isrunning(false),
_func(func)
{}
void Init()
{
// 1. 创建UDP套接字
_sockfd=socket(AF_INET, SOCK_DGRAM, 0);
if(_sockfd<0)
{
LOG(LogLevel::FATAL) << "socket error!";
exit(1);
}
LOG(LogLevel::INFO) << "socket success, sockfd : " << _sockfd;
// 2. 绑定地址(INADDR_ANY监听所有网卡)
struct sockaddr_in local;
bzero(&local, sizeof(local));
local.sin_family = AF_INET;
local.sin_port = htons(_port); // 端口转网络字节序
local.sin_addr.s_addr = INADDR_ANY; // 监听所有IP
int n=bind(_sockfd, (struct sockaddr*)&local, sizeof(local));
if (n < 0)
{
LOG(LogLevel::FATAL) << "bind error";
exit(2);
}
LOG(LogLevel::INFO) << "bind success, sockfd : " << _sockfd;
}
void Start() {
while (_isrunning) {
char buffer[1024];
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 接收消息
ssize_t s = recvfrom(_sockfd, buffer, sizeof(buffer)-1, 0,
(struct sockaddr*)&peer, &len);
if (s > 0) {
buffer[s] = 0; // 添加字符串结束符
InetAddr client(peer); // 封装客户端地址
_func(_sockfd, std::string(buffer), client); // 调用回调函数
}
}
}
~UdpServer()
{}
private:
int _sockfd;//sockfd:服务端套接字(用于回复消息)
uint16_t _port;
bool _isrunning;
func_t _func; // 服务器的回调函数,用来进行对数据进行处理
};
UdpServer.cc
#include <iostream>
#include <memory>
#include "Route.hpp"
#include "UdpServer.hpp" // 网络通信的功能
#include "ThreadPool.hpp"
using namespace ThreadPoolModule;
using task_t = std::function<void()>;
int main(int argc, char *argv[]) {
// 参数检查
uint16_t port = std::stoi(argv[1]); // 从命令行获取端口
// 1. 初始化路由服务
Route r;
// 2. 获取线程池单例
auto tp = ThreadPool<task_t>::GetInstance();
// 3. 创建UDP服务器,绑定Lambda回调
std::unique_ptr<UdpServer> usvr = std::make_unique<UdpServer>(port,
[&r, &tp](int sockfd, const std::string &message, InetAddr& peer) {
// 将路由任务封装为函数对象,提交到线程池
task_t t = std::bind(&Route::MessageRoute, &r, sockfd, message, peer);
tp->Enqueue(t);
}
);
usvr->Init(); // 初始化套接字
usvr->Start(); // 启动主循环
}
UdpClient.cc
#include <iostream>
#include <string>
#include <cstring>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include "Thread.hpp"
int sockfd = 0; // UDP套接字描述符
std::string server_ip; // 服务器IP
uint16_t server_port = 0; // 服务器端口
pthread_t id; // 接收线程ID(用于终止线程)
using namespace ThreadModlue;
void Recv() {
while (true) {
char buffer[1024];
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
// 阻塞接收数据
int m = recvfrom(sockfd, buffer, sizeof(buffer)-1, 0, (struct sockaddr*)&peer, &len);
if (m > 0) {
buffer[m] = 0; // 添加字符串结束符
std::cerr << buffer << std::endl; // 打印服务器响应
}
}
}
void Send() {
// 配置服务器地址
struct sockaddr_in server;
memset(&server, 0, sizeof(server));
server.sin_family = AF_INET;
server.sin_port = htons(server_port); // 端口转网络字节序
server.sin_addr.s_addr = inet_addr(server_ip.c_str()); // IP转网络字节序
// 发送上线通知
sendto(sockfd, "inline", 6, 0, (struct sockaddr*)&server, sizeof(server));
while (true) {
std::string input;
std::cout << "Please Enter# ";
std::getline(std::cin, input); // 读取用户输入
// 发送消息
sendto(sockfd, input.c_str(), input.size(), 0, (struct sockaddr*)&server, sizeof(server));
if (input == "QUIT") {
pthread_cancel(id); // 终止接收线程
break;
}
}
}
int main(int argc, char *argv[]) {
if (argc != 3)
{
std::cerr << "Usage: " << argv[0] << " server_ip server_port" << std::endl;
return 1;
}
// 参数检查
server_ip = argv[1];
server_port = std::stoi(argv[2]);
// 创建UDP套接字
sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0)
{
std::cerr << "socket error" << std::endl;
return 2;
}
// 启动收发线程
Thread recver(Recv);
Thread sender(Send);
recver.Start();
sender.Start();
id = recver.Id(); // 保存接收线程ID
// 等待线程结束
recver.Join();
sender.Join();
}