1.WWWWH
DPDK是什么?为什么需要DPDK?
DPDK 是一个“网络加速工具包”。传统Linux网络栈数据包到网卡有很多分层,需要经过中断开销、内存拷贝、协议栈检查,所以开发了DPDK用来让CPU和网卡两个高速选手直接对话。
DPDK是如何实现这样的高效通道呢?
1.将驱动从内核搬到用户态,避免了数据拷贝,上下文切换
2.主动轮询检查网卡状态,避免多次中断的开销,因为网卡传输速率快,不用老一套(中断请求,等待数据就绪,IO响应)
3.网卡数据直接映射到用户态内存,避免网卡到内核到用户态拷贝多次。
4.大页内存 + 网卡队列与核绑定,避免TLB缓存总是失效 + 线程频繁切换
为什么DPDK不能和协程框架一样下载即用,需要配环境?
DPDK 的配置本质上是 把硬件的控制权从内核夺回来,自己精细管理,不能像普通库那样“开箱即用”。需要绑定CPU核、禁用中断改用轮询、增大页内存减少缓存失效、配置多队列等。
DPDK能用在哪?
可用于需要提升吞吐量的地方,比如数据备份、防火墙(提升查包速度)。注意并不能减少网络延迟,因为DPDK传输的是大包。
2.作用流程图
3.在dpdk环境下开发
window给arp添加网卡
netsh -c i i add neighbors 5 192.168.88.9 00-0c-29-18-ef-9d
window给arp删除网卡
netsh -c "i i" delete neighbors 5 "192.168.88.9"
3.1实现能够接收到网卡数据
ustack.c:
#include <stdio.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <arpa/inet.h>
#include <rte_mempool.h> // 内存池核心头文件
#include <rte_mbuf.h> // 数据包缓冲区头文件
int portid = 0;
#define MEM_BUFF_SIZE 4096
#define BURST_SIZE 128
static int init_my_port(struct rte_mempool *mbuf_pool);
static const struct rte_eth_conf port_conf_default = { //网卡配置信息
.rxmode = {.max_rx_pkt_len = RTE_ETHER_MAX_LEN}
};
static int init_my_port(struct rte_mempool *mbuf_pool) {
uint16_t port_cnt = rte_eth_dev_count_avail();
if (port_cnt == 0) {
rte_exit(EXIT_FAILURE, "No available eth bind\n");
}
//struct rte_eth_dev_info port_info;
//rte_eth_dev_info_get(portid,&port_info);
const int recv_queue = 1;
const int send_queue = 0;
rte_eth_dev_configure(portid, recv_queue, send_queue, &port_conf_default);
if (rte_eth_rx_queue_setup(portid, 0, 128, rte_eth_dev_socket_id(portid), NULL, mbuf_pool) < 0) {
//设置网卡接收队列,为portid号网卡的0号队列 配置一个128长度的接收队列,并且绑定内存池用于存放收到的数据包
rte_exit(EXIT_FAILURE, "Setup RX queue error\n");
}
if (rte_eth_dev_start(portid) < 0) {
//让网卡进入工作状态(相当于通电)
rte_exit(EXIT_FAILURE, "start port error\n");
}
return 0;
}
int main(int argc, char *argv[]) {
if (rte_eal_init(argc, argv) < 0) { //检测网卡配置信息
rte_exit(EXIT_FAILURE, "error with eal init\n");
}
printf("hello\n");
struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("mbuff_pool", MEM_BUFF_SIZE, 0, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
rte_socket_id());
if (mbuf_pool == NULL) {
rte_exit(EXIT_FAILURE, "create mbuf pool error\n");
}
init_my_port(mbuf_pool);
while (1) {
//接收数据
struct rte_mbuf *mbufs[BURST_SIZE] = {0};
uint16_t recv_cnt = rte_eth_rx_burst(portid, 0, mbufs, BURST_SIZE);
if (recv_cnt > BURST_SIZE) {
rte_exit(EXIT_FAILURE, "recv error\n");
}
//解析数据包
int i = 0;
for (i = 0; i < recv_cnt; i++) {
//处理以太网头
struct rte_ether_hdr *eth_header = rte_pktmbuf_mtod(mbufs[i], struct rte_ether_hdr *);
if (eth_header->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4)) {
continue; //转大小端
//不是以太网协议 丢弃
}
struct rte_ipv4_hdr *ip_header = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_ipv4_hdr *,
sizeof(struct rte_ether_hdr));
if (ip_header->next_proto_id == IPPROTO_UDP) {
struct rte_udp_hdr *udp_header = (struct rte_udp_hdr *)(ip_header + 1);
printf("udp : %s\n", (char *)(udp_header + 1));
}
}
}
return 0;
}
编译:在ustack目录下运行
export RTE_SDK=/home/king/share/dpdk/dpdk-stable-19.08.2
export RTE_TARGET=x86_64-native-linux-gcc
make
运行结果:
3.2实现能够发送UDP数据
流程:检测网卡配置信息,创建内存池,初始化网卡(设置网卡的接收和发送队列)。开始通过rx_burst接收数据包,然后再一个一个解析数据包。具体解析过程就是,先获取以太网头,再获取IP头,从IP头中可以得知传输是通过UDP还是TCP协议,随后分情况处理。
UDP协议发送过程是:获取udp头后,获取接收包的目的和源地址,因为发送时需要调换。随后从内存池申请一片空间用于存储要发送的数据包,并且给这个包按顺序分配以太网头、IP头、udp头,最后加上接收到的UDP包中的数据部分,tx_burst发送即可。
TCP协议发送过程也类似,但是在组装包的时候没有数据部分,只有三层协议头。要传输数据需要先模拟三次握手的过程。(见下文)
代码:
#include <stdio.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <arpa/inet.h>
#include <rte_mempool.h> // 内存池核心头文件
#include <rte_mbuf.h> // 数据包缓冲区头文件
int portid = 0;
#define MEM_BUFF_SIZE 4096
#define BURST_SIZE 128
#define ENABLE_SEND 1
uint8_t global_smac[RTE_ETHER_ADDR_LEN];
uint8_t global_dmac[RTE_ETHER_ADDR_LEN];
uint32_t global_sip;
uint32_t global_dip;
uint16_t global_sport;
uint16_t global_dport;
static int init_my_port(struct rte_mempool *mbuf_pool);
int encode_udp_package(uint8_t *msg, uint8_t *data, uint16_t total_len);
int encode_tcp_package(uint8_t *msg, uint16_t total_len);
static const struct rte_eth_conf port_conf_default = { //网卡配置信息
.rxmode = {.max_rx_pkt_len = RTE_ETHER_MAX_LEN}
};
static int init_my_port(struct rte_mempool *mbuf_pool) {
uint16_t port_cnt = rte_eth_dev_count_avail();
if (port_cnt == 0) {
rte_exit(EXIT_FAILURE, "No available eth bind\n");
}
struct rte_eth_dev_info port_info;
rte_eth_dev_info_get(portid, &port_info);
const int recv_queue = 1;
const int send_queue = 1;
rte_eth_dev_configure(portid, recv_queue, send_queue, &port_conf_default);
if (rte_eth_rx_queue_setup(portid, 0, 128, rte_eth_dev_socket_id(portid), NULL, mbuf_pool) < 0) {
//设置网卡接收队列,为portid号网卡的0号队列 配置一个128长度的接收队列,并且绑定内存池用于存放收到的数据包
rte_exit(EXIT_FAILURE, "Setup RX queue error\n");
}
struct rte_eth_txconf tx_conf = port_info.default_txconf;
tx_conf.offloads = port_conf_default.rxmode.offloads;
if (rte_eth_tx_queue_setup(portid, 0, 512, rte_eth_dev_socket_id(portid), &tx_conf) < 0) {
//设置网卡发送队列,为portid号网卡的0号队列 配置一个512长度的发送队列
rte_exit(EXIT_FAILURE, "Setup TX queue error\n");
}
if (rte_eth_dev_start(portid) < 0) {
//让网卡进入工作状态(相当于通电)
rte_exit(EXIT_FAILURE, "start port error\n");
}
return 0;
}
int encode_udp_package(uint8_t *msg, uint8_t *data, uint16_t total_len) {
//msg表示装包的内存,data是数据,total_len是数据包总长度
//以太网头(mac头)
struct rte_ether_hdr *eth_header = (struct rte_ether_hdr *)msg; //在msg所指的内存空间,开辟出一个固定大小的mac头
rte_memcpy(eth_header->d_addr.addr_bytes, global_dmac, RTE_ETHER_ADDR_LEN);
rte_memcpy(eth_header->s_addr.addr_bytes, global_smac, RTE_ETHER_ADDR_LEN);
eth_header->ether_type = htons(RTE_ETHER_TYPE_IPV4);
//ip头
struct rte_ipv4_hdr *ip_header = (struct rte_ipv4_hdr *)(eth_header + 1);//msg + sizeof(struct rte_ether_hdr)
ip_header->version_ihl = 0x45;
ip_header->type_of_service = 0;
ip_header->total_length = htons(total_len - sizeof(struct rte_ether_hdr));
ip_header->packet_id = 0;
ip_header->fragment_offset = 0;
ip_header->time_to_live = 64;//TTL 最多经过路由数量
ip_header->next_proto_id = IPPROTO_UDP;
ip_header->src_addr = global_sip;
ip_header->dst_addr = global_dip;
ip_header->hdr_checksum = 0;
ip_header->hdr_checksum = rte_ipv4_cksum(ip_header);//计算检验和
//udp头·
struct rte_udp_hdr *udp_header = (struct rte_udp_hdr *)(ip_header + 1);
udp_header->src_port = global_sport;
udp_header->dst_port = global_dport;
uint16_t udplen = total_len - sizeof(struct rte_ether_hdr) - sizeof(struct rte_ipv4_hdr);
udp_header->dgram_len = htons(udplen);
//数据部分
rte_memcpy((uint8_t *)(udp_header + 1), data, udplen);
udp_header->dgram_cksum = 0;
udp_header->dgram_cksum = rte_ipv4_udptcp_cksum(ip_header, udp_header);
return 0;
}
int encode_tcp_package(uint8_t *msg, uint16_t total_len) {
//msg表示装包的内存,data是数据,total_len是数据包总长度
//以太网头(mac头)
struct rte_ether_hdr *eth_header = (struct rte_ether_hdr *)msg; //在msg所指的内存空间,开辟出一个固定大小的mac头
rte_memcpy(eth_header->d_addr.addr_bytes, global_dmac, RTE_ETHER_ADDR_LEN);
rte_memcpy(eth_header->s_addr.addr_bytes, global_smac, RTE_ETHER_ADDR_LEN);
eth_header->ether_type = htons(RTE_ETHER_TYPE_IPV4);
//ip头
struct rte_ipv4_hdr *ip_header = (struct rte_ipv4_hdr *)(eth_header + 1);//msg + sizeof(struct rte_ether_hdr)
ip_header->version_ihl = 0x45;
ip_header->type_of_service = 0;
ip_header->total_length = htons(total_len - sizeof(struct rte_ether_hdr));
ip_header->packet_id = 0;
ip_header->fragment_offset = 0;
ip_header->time_to_live = 64;//TTL 最多经过路由数量
ip_header->next_proto_id = IPPROTO_TCP;
ip_header->src_addr = global_sip;
ip_header->dst_addr = global_dip;
ip_header->hdr_checksum = 0;
ip_header->hdr_checksum = rte_ipv4_cksum(ip_header);//计算检验和
//tcp头· 为什么tcp头没有记录数据长度?因为可以通过ip数据报记录的总长度减去ip头和tcp头得到
struct rte_tcp_hdr *tcp_header = (struct rte_tcp_hdr *)(ip_header + 1);
tcp_header->src_port = global_sport;
tcp_header->dst_port = global_dport;
tcp_header->sent_seq = htonl(12345);//tcp三次握手随机起始seq
tcp_header->recv_ack = 0x0;
tcp_header->data_off = 0x50;//头部长度
tcp_header->tcp_flags = 0x1 << 1;
tcp_header->rx_win = htons(4096);//还能接收多少
tcp_header->cksum = 0;
tcp_header->cksum = rte_ipv4_udptcp_cksum(ip_header, tcp_header);
//没有数据部分 因为tcp三次握手发包不带数据
return 0;
}
int main(int argc, char *argv[]) {
if (rte_eal_init(argc, argv) < 0) { //检测网卡配置信息
rte_exit(EXIT_FAILURE, "error with eal init\n");
}
printf("hello\n");
struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("mbuff_pool", MEM_BUFF_SIZE, 0, 0, RTE_MBUF_DEFAULT_BUF_SIZE,
rte_socket_id());
if (mbuf_pool == NULL) {
rte_exit(EXIT_FAILURE, "create mbuf pool error\n");
}
init_my_port(mbuf_pool);
while (1) {
//接收数据
struct rte_mbuf *mbufs[BURST_SIZE] = {0};
uint16_t recv_cnt = rte_eth_rx_burst(portid, 0, mbufs, BURST_SIZE);
if (recv_cnt > BURST_SIZE) {
rte_exit(EXIT_FAILURE, "recv error\n");
}
//解析数据包
int i = 0;
for (i = 0; i < recv_cnt; i++) {
//处理以太网头(物理层mac地址)
struct rte_ether_hdr *eth_header = rte_pktmbuf_mtod(mbufs[i], struct rte_ether_hdr *);
if (eth_header->ether_type != rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4)) {
continue; //转大小端
}
//处理ip头(网络层ip地址)
struct rte_ipv4_hdr *ip_header = rte_pktmbuf_mtod_offset(mbufs[i], struct rte_ipv4_hdr *,
sizeof(struct rte_ether_hdr));
if (ip_header->next_proto_id == IPPROTO_UDP) {
//处理udp头(传输层端口号)
struct rte_udp_hdr *udp_header = (struct rte_udp_hdr *)(ip_header + 1);
printf("udp : %s\n", (char *)(udp_header + 1));
//获取发送包的 源与目的
rte_memcpy(global_smac, eth_header->d_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(global_dmac, eth_header->s_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(&global_sip, &ip_header->dst_addr, sizeof(uint32_t));
rte_memcpy(&global_dip, &ip_header->src_addr, sizeof(uint32_t));
rte_memcpy(&global_sport, &udp_header->dst_port, sizeof(uint16_t));
rte_memcpy(&global_dport, &udp_header->src_port, sizeof(uint16_t));
struct in_addr addr;
addr.s_addr = ip_header->src_addr;
printf("udp: sip %s:%d --> ", inet_ntoa(addr), ntohs(udp_header->src_port));
addr.s_addr = ip_header->dst_addr;
printf("dip %s:%d \n", inet_ntoa(addr), ntohs(udp_header->dst_port));
//准备发送包
uint16_t length = ntohs(udp_header->dgram_len);//udp包长度
uint16_t total_len = length + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_ether_hdr);
struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool);
if (!mbuf) {
rte_exit(EXIT_FAILURE, "Error rte_pktmbuf_alloc\n");
}
mbuf->pkt_len = total_len;
mbuf->data_len = total_len;
uint8_t *msg = rte_pktmbuf_mtod(mbuf, uint8_t *);
//msg指向从内存池申请的用来存发送包的mbuf
encode_udp_package(msg, (uint8_t *)(udp_header + 1), total_len);
rte_eth_tx_burst(portid, 0, &mbuf, 1);
} else if (ip_header->next_proto_id == IPPROTO_TCP) {
struct rte_tcp_hdr *tcp_header = (struct rte_tcp_hdr *)(ip_header + 1);
struct in_addr addr;
addr.s_addr = ip_header->src_addr;
printf("tcp :sip %s:%d --> ", inet_ntoa(addr), ntohs(tcp_header->src_port));
addr.s_addr = ip_header->dst_addr;
printf("dip %s:%d \n", inet_ntoa(addr), ntohs(tcp_header->dst_port));
rte_memcpy(global_smac, eth_header->d_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(global_dmac, eth_header->s_addr.addr_bytes, RTE_ETHER_ADDR_LEN);
rte_memcpy(&global_sip, &ip_header->dst_addr, sizeof(uint32_t));
rte_memcpy(&global_dip, &ip_header->src_addr, sizeof(uint32_t));
rte_memcpy(&global_sport, &tcp_header->dst_port, sizeof(uint16_t));
rte_memcpy(&global_dport, &tcp_header->src_port, sizeof(uint16_t));
//准备一个包发送
uint16_t total_len = sizeof(struct rte_tcp_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_ether_hdr);
struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool);
if (!mbuf) {
rte_exit(EXIT_FAILURE, "Error rte_pktmbuf_alloc\n");
}
mbuf->pkt_len = total_len;
mbuf->data_len = total_len;
uint8_t *msg = rte_pktmbuf_mtod(mbuf, uint8_t *);
encode_tcp_package(msg, total_len);
rte_eth_tx_burst(portid, 0, &mbuf, 1);
}
}
}
return 0;
}
运行结果: 可以看到udp发送对方能够接收到数据,但是tcp只能看到服务端接收到了数据包(连接请求),通过wareshark可以看到服务端回应发送了数据包,但是不符合TCP的标准,只是符合了TCP协议,三次握手的过程没有,对方接收无法接收,下面尝试建立TCP连接。
3.3TCP发送数据前的三次握手实现
获取到服务端发来的数据包中的seqnum,随后完成两点。
1.把标志位中的ACK和SYN置1
2.在打包tcp包的过程中给recv_ack值赋上seqnum+1,再回发,即可实现三次握手前两次。
//标志位
tcp_header->tcp_flags = RTE_TCP_SYN_FLAG | RTE_TCP_ACK_FLAG;
//ack值
tcp_seqnum = ntohl(tcp_header->sent_seq);//net to host
tcp_header->recv_ack = htonl(tcp_seqnum + 1);// host to net long
//第一次发送的seq值是随机的
在实现了上述之后,运行会发现能连接成功但是客户端不停的发送第三次握手的包,原因在于我们只有一个状态,所以在while(1)循环中,服务器不断给客户端发送第二次握手的包,客户端也就会不断回复。
要解决这个问题并且实现后续能接收到客户端发来的data数据,需要根据TCP三次握手的状态变化编写一个状态机,根据不同状态对客户端发来的包做出不同回应。大致代码如下:
在确定是tcp协议后:
if (tcp_flags & RTE_TCP_SYN_FLAG) {
if (tcp_status == USTACK_TCP_STATUS_LISTEN) {//第一次收到
//准备一个包发送
uint16_t total_len = sizeof(struct rte_tcp_hdr) + sizeof(struct rte_ipv4_hdr) + sizeof(struct rte_ether_hdr);
struct rte_mbuf *mbuf = rte_pktmbuf_alloc(mbuf_pool);
if (!mbuf) {
rte_exit(EXIT_FAILURE, "Error rte_pktmbuf_alloc\n");
}
mbuf->pkt_len = total_len;
mbuf->data_len = total_len;
uint8_t *msg = rte_pktmbuf_mtod(mbuf, uint8_t *);
encode_tcp_package(msg, total_len);
rte_eth_tx_burst(portid, 0, &mbuf, 1);
tcp_status = USTACK_TCP_STATUS_SYN_RCVD;
}
}
if (tcp_flags & RTE_TCP_ACK_FLAG) {
if (tcp_status == USTACK_TCP_STATUS_SYN_RCVD) { //第二次收到 三次握手结束
tcp_status = USTACK_TCP_STATUS_ESTABLISHED;
}
}
if (tcp_flags & RTE_TCP_PSH_FLAG) { //建立连接后收到了客户端发来的数据
if (tcp_status == USTACK_TCP_STATUS_ESTABLISHED) {
uint8_t header_len = (tcp_header->data_off >> 4) * sizeof(uint32_t);
uint8_t *data = ((uint8_t *)tcp_header + header_len);
printf("tcp : recv data %s\n", data);
}
}
3.4实现TCP协议栈的并发
在服务器能用TCP发送数据的前提下,如何做到同时有多个用户连接并且交互?
tcp.c代码思路图:
要做到并发,可以在tcp_server_entry中,采用前面学过的一请求一线程,每naccept(hook accept)一次,就创建一个线程。也可以结合epoll提升性能,但却没有想的那么简单:
直接把naccept返回的fd加入epoll就好了?这是错误的做法,因为现在的tcp协议栈都是自己编写的,已经完全脱离了内核的掌控在用户态自己实现一套流程,所以现在得到的fd也只是应用层的一个变量,在内核无意义,加入epoll监听也无意义,所以我们要做的第一步就是,了解epoll的原理。
3.4.1epoll的实现原理
epoll在上述数据流程图中的位置:
epoll的功能实现在于三个接口,create、wait、ctl。且采用的是红黑树的数据结构实现,不采用hash(O(1)查找)、b树的原因主要在于红黑树有三大优势:
1.不低的查找性能,适配epoll强查找的需求
2.线性增长的内存,不会像b树一样一次分配一块,而是分配一个node
3.允许不连续的内存使用,不会像hash一样需要大片连续内存,服务器在运行一段时间后可能无法找到大片连续内存。
3.4.1.1epoll的数据结构
epoll使用到了两个结构体,两种数据结构。分别是:
struct epitem { //数据结构中的一个结点node
RB_ENTRY(epitem) rbn;
LIST_ENTRY(epitem) rdlink;
int rdy; //就绪
int sockfd;
struct epoll_event event;
};
struct eventpoll { //描述整个epoll 与epfd对应
int fd;
ep_rb_tree rbr; //红黑树的根
int rbcnt;
LIST_HEAD( ,epitem) rdlist; //就绪队列的头
int rdnum;
int waiting;
pthread_mutex_t mtx; //rbtree update
pthread_spinlock_t lock; //rdlist update
pthread_cond_t cond; //block for event
pthread_mutex_t cdmtx; //mutex for cond
};
3.4.1.2epoll的三个对外函数
epoll有下面三个函数供用户使用:
epoll_create的具体工作就是:
创建一个eventpoll,然后用bitmap分配一个epfd后,将eventpoll的fd = epfd,实现二者绑定,与accept的工作类似。
epoll_ctl(add)的具体工作就是:
(1)根据epfd,找到对应的eventpoll,然后找到对应的那颗红黑树,通过要添加的fd,先在红黑树中比对查找有没有这个fd,如果没有就建立一个epitem然后和这个fd绑定。
(2)每个通过ctl注册的fd都会关联一个回调函数,当fd的事件发送,协议栈就会触发回调将事件添加进就绪队列。
epoll_wait的具体工作是:
(1)根据timewait参数检测就绪队列情况.若timewait == 0,则立即返回,若timewait > 0则pthread_cond_timewait(timewait) 后返回 ,若timewait < 0则pthread_cond_wait阻塞,等待就绪队列有事件后signal解救。
(2)将就绪队列的事件epitem->event全都memcpy到用户态events[i]数组
3.4.1.3epoll的一个对内函数
epoll监听那么多个fd,但是上面介绍的都是epoll开始监听后的操作,那谁来通知epoll有数据到了?epoll_event_callback
callback具体工作就是,拿着fd,找到红黑树中的结点epitem,将对应的rdy状态置1,并且加入就绪队列(就绪到整集),且释放条件信号量cond。
在哪调用的callback?
1.tcp_handle_syn_rcvd(),三次握手的最后一次。
2.tcp_handle_established(),在tcp协议栈中,服务器端在established状态接收到PUSH 或者 FIN后传输数据的时候。
代码参考: