5.10-套接字通信 - C++

发布于:2025-05-12 ⋅ 阅读:(12) ⋅ 点赞:(0)

套接字通信

1.1 通信效率问题


  • 服务器端

    • 单线程 / 单进程

      • 无法使用,不支持多客户端
    • 多线程 / 多进程

      • 写程序优先考虑多线程:
      • 什么时候考虑多进程?
        • 启动了一个可执行程序 A ,要在 A 中启动一个可执行程序 B
      • 支持多客户端连接
    • IO 多路转接

      • 单线程 / 进程

        • 支持多客户端连接但是效率不是最高的

          • 所有的客户端请求都是顺序处理的 -> 排队
        • 多线程

          int main()
          {
              // 	1. 监听 fd
          	int lfd = socket();
          }// 2. 绑定
              bind();
          	// 3. 监听
          	listen();
          	// 4. 初始化 epoll()
          	int epfd = epoll_create(x);
          	// 5. epooll 添加检测节点  -> lfd
          	epoll_ctl(epfd, epoll_ctl_add, lfd, &ev);
          	while (1)
              {
                  int num = epoll_wait(epfd, evs, 1024, NULL);
                  
                  for (int i = 0; i < num; i++)
                  {
                      if (curfd == lfd)
                      {
                          pthread_create(&tid, NULL, acceptConn, &epfd);
                          // accept();
                      }
                      else
                      {
                          ...
                          // read();
                          // write();
                      }
                  }
              }
          }
          
          • 线程池

            • 多个线程的一个集合,可以回收用完的线程

              • 线程池的个数取决于业务逻辑
                • 密集型业务逻辑:需要大量 cpu 时间进行数据处理

                  • 线程个数 == 电脑核心数
                • 进行 io 操作

                  • 线程个数 == 两倍 cpu 核心数
            • 不需要重复频繁地创建销毁线程

            • 设计思路

              1. 需要两个角色
              	- 管理者 -> 1 个
              	- 工作的线程 -> N 个
              2. 管理者
              	- 不工作(不处理业务逻辑,监测工作的线程的状态,管理线程的个数)
              	- 假设工作线程不够用了,动态创建新的线程
              	- 假设工作的线程太多了,销毁一部分工作的线程
              	- 动态监测工作的线程的状态
              3. 工作的线程
              	- 处理业务逻辑
              4. 需要一个任务队列
              	- 存储任务 -> 唤醒阻塞线程 -> 条件变量
              	- 工作的线程处理任务队列中的任务
              	- 没有任务 -> 阻塞
              
  • 客户端

    // 创建TcpSocket对象 == 一个连接,这个对象就可以和服务器通信了,多个连接需要创建多个这样的对象
    class TcpSocket
    {
    public:
        TcpSocket()
        {
            m_connfd = socket(af_inet, sock_stream, 0);
        }
    
        TcpSocket(int fd)
        {
            m_connfd = fd;  // 传递进行的fd是可以直接通信的文件描述符,不需要连接操作
        }
    
        ~TcpSocket();
        /* 客户端 连接服务器 */
        int conectToHost(string ip, unsigned short port, int connecttime)
        {
            connect(m_connfd, &serverAddress, &len);
        }
        /* 客户端 关闭和服务端的连接 */
        int disConnect();
        /* 客户端 发送报文 */
        int sendMsg(string sendMsg, int sendtime = 10000)
        {
            senf(m_connfd, data, datalen, 0);
        }
        /* 客户端 接受报文 */
        string recvMsg(int timeout)
        {
            recv(m_connfd, buffer, size, 0);
            return buffer;
        }
    
    private:
        int m_connfd;
    };
    
  • 服务器

    // 思想: 服务端不负责通信,只负责监听,如果通信使用客户端类
    class TcpServer
    {
    public:
        // 初始化监听的套接字: 创建,绑定,监听
        TcpServer();
        ~TcpServer();  // 在这里边关闭监听的fd
    
        TcpSocket* acceptConn(int timeout = 999999)
        {
            int fd = accept(m_lfd, &address, &len);
            // 通信fd -> 类
            TcpSocket* tcp = new TcpSocket(fd);
            if (tcp != nullptr)
            {
                return tcp;
    		}
            return nullptr;
        }
    
    private:
        int m_lfd;  // 监听的fd
    };
    
    // 使用
    void * callback(void* arg)
    {
    	TcpSocket * tcp = (TcpSocket *) arg;
    	tcp->sendMsg();
    	tcp->recvMsg();
    	tcp->disConnect();
    	delete tcp;
    }
    
    
    int main()
    {
    	TcpServer * server = new Tcpserver;
    	while (1)
    	{
    		TcpSocket * tcp = server->acceptConn();
    		// 创建子进程 -> 通信
    		pthread_create(&tid, NULL, callback, arg)
    	}
    	
    	delete server;
    	return 0;
    }
    
    // 客户端程序
    int main()
    {
    	TcpSocket * tcp = new TcpSocket;
    	tcp->ConnectToHost(ip, port, timeout);
        tcp->sendMsg();
    	tcp->recvMsg();
    	tcp->disConnect();
    	delete tcp;
    }
    

2 套接字超时

套接字通信过程中的默认的阻塞函数

等待并接受客户端连接

通信、接受数据、发送数据、连接服务器时

设置超时处理的原因:不想让进程(线程)一直在对应为止阻塞

超时处理的思路:

  1. 定时器
  2. sleep(10)
    • 以上两种不可用,在指定时间内阻塞函数满足条件直接解除阻塞,以上两种不满足要求
  3. IO 多路转接函数
    • 这些函数最后一个参数是设置阻塞的时长,如果有 fd发生变化,函数直接返回
    • 帮助委托内核检测 fd 状态:读写异常

2.1 accept 超时


// 等待并接受客户端连接
// 如果没有客户端连接,一直阻塞
// 检测 accept 函数的 fd 读缓冲区就可以了
int accept(int sockfd, struct sockaddr * addr, socklen_t * addrlen);

// 使用select 函数检测状态
int select(int nfds, fd_set * readfds,
          fd_set * writefds,
          fd_set * exceptfds,
          struct timeval * timeout);
// 通信的fd放到 fd_set 中检测
if (ret == 0)
{
    // 超时
}
else if (ret == 1)
{
    // 有新连接
    accept();	// 绝对不阻塞
}
else
{
    // error, -1
}

读超时、写超时略

2.2 connect 超时

// 连接服务器 -> 处于连接过程中,函数不返回 -> 程序阻塞在这个函数上,可通过返回值判断是不是连接成功了
// 返回值 0,成功,-1,失败
// 该函数默认有一个超时处理:75s 或 175s
	- 设置 connect 函数操作的文件描述符为非阻塞
    - 使用 select 检测
    - 设置 connect 函数操作的文件描述符为阻塞 -> 状态还原
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

// 与上述同理

2.3 tcp 粘包问题

造成原因:

  1. 发送的时候,内核进行了优化,数据达到一定量发一次
  2. 网络环境不好,有延迟
  3. 接收方频率低,一次性读到了多条客户端发送的数据

解决方案:

  • 发送的时候强制缓存区发送数据
  • 在发送数据的时候添加包头
    • 包头:一块内存,存储了当前消息的属性信息
      • 属于谁:char[12]
      • 有多大:int

3 进程间通信:共享内存

  1. 使用流程

    1. 向内核申请一块内存 -> 指定大小
    2. 如果有两个进程需要进行通信,可以使用共享内存通信,先创建两个进程
    3. 进程 A 和进程 B 分别和共享内存进程关联
    	- 拿到共享内存的地址 -> 首地址
    4. 两个进程可以公国这个首地址对共享内存进行读(写)操作
    5. 如果这个进程不再使用这块共享内存,需要和共享内存断开连接
    	- 进程退出对共享内存没有任何影响
    6. 当不再使用共享内存的时候,需要将共享内存销毁
    
  2. 共享内存头文件

    #include <sys/ipc.h>
    #include <sys/shm.h>
    
  3. 共享内存操作函数

    • 创建或打开一块共享内存区

      // 创建共享内存
      // 共享内存已经存在
      // 可以创建多块共享内存
      int shmget(key_t key, size_t size, int sh mhflg);
      	key:通过 key 记录共享内存在内核中的位置,为一个大于零的整数,等于 0 不行,随便指定一个就可以
      	size:创建共享内存的时候指定共享内存的大小。如果已经创建,设为 0
      	shmflg:创建共享内存的时候使用,指定打开文件的方式
      返回值:
      	成功:创建(打开)成功,得到一个整形数
      	失败:-1
      	
      	
      // 应用
      // 1. 创建共享内存
      int shmid = shmget(100, 4096, IPC_CREAT | 0664);
      int shmid = shmget(200, 4096, IPC_CREAT | 0664);
      // 2. open share memory
      int shmid = shmget(100, 0, 0)
      
      
    • 将当前进程与共享内存关联

      void * shmat(int shmid, const void * shmaddr, int shmflg);
      	参数:
      		- shmid:通过这个参数访问共享内存,shmget() 的返回值
      		- shmaddr:指定共享内存在内核中的位置,指定为空,委托内核寻找
      		- shmflg:关联成功后对内存的操作权限
      			- SHM_RDONLY:只读
      			- 0:读写
      成功:
      	内存的地址
      失败:
      	(void *)-1
      
      
      // 函数调用:
      shmat(shmid, NULL, 0);
      // write on shm
      memcpy(ptr, "xxx", len);
      printf("%s", (char*)ptr);
      
    • depart shm

      int shmdt(const void * shmaddr);
      	arg: address of shm
          return:
      		suc: 0
              fai: -1
      
    • control shm

      int shmctl(int shmid, int com, struct shmid_ds * buf);
      
      arg:
      	-shimd: reuturn value of shmget
          - cmd: operation to shm
      		- IPC_STAT: get status of cmd
              - IPC_SET: set shm
              - IPC_RMID: mark shm to be destroyed
          - buf: as a struct can describe the status of shm
      reutrn value:
      	succ: 0
          fail: -1
      
      // demo: destroy shm
      shmctl(shmid, IPC_RMID, nullptr);
      

      demo: communication of processes

      read:

      #include <stdio.h>
      #include <stdlib.h>
      #include <unistd.h>
      #include <string.h>
      #include <sys/ipc.h>
      #include <sys/shm.h>
      
      int main()
      {
          // 1. open shm
          int shmid = shmget(100, 0, 0);
          // 2. relate shm with cur process
          void* ptr = shmat(shmid, NULL, 0);
          // 3. write on shm
          printf("content: %s\n", (char *)ptr);
          
          
          printf("press any key to continue ...\n");
          getchar();
          // 4. release relevance
      
          shmdt(ptr);
          // 5. destory shm
          shmctl(shmid, IPC_RMID, NULL);
      
          return 0;
      }
      

      write:

      #include <stdio.h>
      #include <stdlib.h>
      #include <unistd.h>
      #include <string.h>
      #include <sys/ipc.h>
      #include <sys/shm.h>
      
      int main()
      {
          // 1. create shm
          int shmid = shmget(100, 4096, IPC_CREAT|0664);
          // 2. relate shm with cur process
          void* ptr = shmat(shmid, NULL, 0);
          // 3. write on shm
          const char * tmp = "this is a damo for shm ...";
          memcpy(ptr, tmp, strlen(tmp) + 1);
          
          printf("press any key to continue ...\n");
          getchar();
          // 4. release relevance
      
          shmdt(ptr);
          // 5. destory shm
          shmctl(shmid, IPC_RMID, NULL);
      
          return 0;
      }
      
  4. question:

  • question 1: how dose the operation system know the number of process that relate with a shm?
    • shm keep a struct struct shmid_ds, that has a member shm_nattch.
    • shm_nattch records the number of being related process.
  • question 2: whether can call shmctl to destroyed a shm more than once?
    • yes
    • because shmctl function is just marking a shm to be destroy, not destroying it directly.
    • when it was destroyed truly?
      • when shm_nattch == 0

the commands to observe the shm:

ipcs -m
  1. ftok function prototype

    key_t ftok(const char *pathname, int proj_id);
    	- pathname: the absolute path
    	- proj_id: only use the top 1 byte
    		- value range: 1 - 255
    		
    		
    key_t t = fotk("/home/", 'a');
    
  2. difference of shm and mmp

    . . .

4 the API encapsulation of shm

class BashShm
{
public:
    BashShm(int key);	// open a shm according a given key.
    BashShm(string path);	// **with no size**, according to string path -> int key, open a shm.
    BashShm(int key, int size);	// create a shm by the given key and size.
    BashShm(string path, int size);	// string -> key, according to size.
    
    void * mapshm()
    {
        m_ptr = shmat(shmid);
        return shmat();
    }
    
    int unmapshm()
    {
        return shmdt(m_ptr);
    }
    int delshm()
    {
        return shmctl(shmid);
    }
    
private:
    int m_shmid;	// reuturn value of `shmat()`
    void * m_ptr
}

网站公告

今日签到

点亮在社区的每一天
去签到