TCP实现线程池竞争任务

发布于:2025-09-02 ⋅ 阅读:(14) ⋅ 点赞:(0)

服务端:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<netinet/ip.h>
#include<strings.h>
#include<unistd.h>
#include<ctype.h>
#include<arpa/inet.h>
#include<stdlib.h>
#include<string.h>
#include<sys/wait.h>
#include<pthread.h>

#define SERV_PORT 8000
#define MAXLINE 80


//打印报错信息
#define prrexit(msg){ \
     perror(msg); \
    exit(1); \
 }

typedef struct Task{
    int fd;
    struct Task  *next;
}Task;
//任务池子,队列
typedef struct Task_pool{
    Task *head;
    Task *tail;
    pthread_mutex_t lock;
    pthread_cond_t havetask;
}Task_pool;

Task_pool *task_pool_init(){
    Task_pool *tp=(Task_pool *)malloc(sizeof(Task_pool));
    tp->head=NULL;
    tp->tail=NULL;
    pthread_mutex_init(&tp->lock,NULL);
    pthread_cond_init(&tp->havetask,NULL);

    return tp;
}


void task_pool_push( Task_pool *tp,int fd){
    pthread_mutex_lock(&tp->lock);
    Task *t=(Task *)malloc(sizeof(Task));
    t->fd=fd;
    t->next=NULL;
    //两种情况
    if(!tp->tail){
       tp-> head=tp->tail=t;
    }else{
     tp->tail->next=t;
     tp-> tail=t;
    }

    pthread_cond_broadcast(&tp->havetask);

    pthread_mutex_unlock(&tp->lock);
}


Task task_pool_pop(Task_pool *tp){
    pthread_mutex_lock(&tp->lock);

    //为什么不能用if
    while(tp->head==NULL){
        pthread_cond_wait(&tp->havetask,&tp->lock);
        
    }


    Task tmp,*k;
    k=tp->head;
    tmp=*k;
    tp->head=tp->head->next;
    //队列一开始为空的情况下
    if(!tp->head){
        tp->tail=NULL;
    }
    free(k);
    pthread_mutex_unlock(&tp->lock);
    return tmp;
}

void task_pool_free(Task_pool *tp){
   pthread_mutex_lock(&tp->lock);
    Task *p=tp->head,*k;

    while(p){
        k=p;
        p=p->next;
        free(k);
    }
    tp->head=NULL;

    pthread_mutex_unlock(&tp->lock);
    pthread_mutex_destroy(&tp->lock);
    pthread_cond_destroy(&tp->havetask);
    free(tp);
    
    return ;
}





//子线程
void *up_server(void *arg){
    //获取自己的线程id,自我释放
    pthread_detach(pthread_self());

//进行安全的类型转换
      //  int connfd=(int)(intptr_t)arg;
        char buff[MAXLINE];
        int n,i;

        Task_pool *tp=arg;
        
    while(1){ 
        Task tmp=task_pool_pop(tp);
        int connfd=tmp.fd;
        printf("get task fd=%d\n",connfd);
        while(1){
            n=read(connfd,buff,MAXLINE);
            if(n<=0){
                perror("read error  or connections closed");
                break;
        }

            buff[n]='\0';//添加字符串终止符
            write(1,buff,n);

            if(strncmp(buff,"quit",4)==0){
                break;
            }

        
            for(i = 0; i < n ; i++)
            buff[i]=toupper(buff[i]);

            write(connfd,buff,n);
        }
        printf("finish task fd=%d\n",connfd);
        close(connfd);
    }
    //正常退出
    return (void *)0;
}

int main(){
    struct sockaddr_in serveraddr,claddr;
    int listenfd, connfd;
    socklen_t  claddr_len;

   // char buff[MAXLINE];
    char str[INET_ADDRSTRLEN];
    int n,i;
    //任务池创建
    Task_pool *tp=task_pool_init();
 
    //多线程   
    pthread_t  tid;
    //多少核就多少个
    //一上来就会打印id
    for(i=0;i<4;i++){
    pthread_create(&tid,NULL,up_server,(void *)(intptr_t)tp);
    printf("new thread is %#lx\n",tid);
    }


    listenfd =socket(AF_INET,SOCK_STREAM,0);
    if(listenfd<0){
        prrexit("socket");
    }

    //服务器ip地址,端口初始化
    bzero(&serveraddr,sizeof(serveraddr));
    serveraddr.sin_family=AF_INET;
    serveraddr.sin_port = htons(SERV_PORT);
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);

    if(bind(listenfd,(struct  sockaddr *)&serveraddr,sizeof(serveraddr))<0)
    prrexit("bind");


    if(listen(listenfd,3)<0)
    prrexit("listen");

    printf("Accepting connections...\n");

    while(1){
        claddr_len= sizeof(claddr);
        connfd=accept(listenfd,(struct sockaddr *)&claddr,&claddr_len);
        if(connfd<0)
        prrexit("accept");

        printf("received from %s:%d\n",
               inet_ntop(AF_INET,&claddr.sin_addr,str,sizeof(str)),
               ntohs(claddr.sin_port));

        /*多进程
        pid_t pid= fork();
        if(pid<0){
        prrexit("fork");
    }
        //父进程 :等待 创建连接
        if(pid > 0){
        close(connfd);
        //回收进程资源
        while(waitpid(-1,NULL,WNOHANG)>0){ };

        continue;
    }

        close(listenfd);

        */

        //多线程
      //  pthread_t  tid;
      //  pthread_create(&tid,NULL,up_server,(void *)(intptr_t)connfd);
      //  printf("new thread is %#lx\n",tid);
   
    
    task_pool_push(tp,connfd);
    }
    
    task_pool_free(tp);
     return 0;
}

客户端:

#include<stdio.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<netinet/ip.h>
#include<string.h>
#include<unistd.h>
#include<ctype.h>
#include<arpa/inet.h>
#include<stdlib.h>


#define SERV_PORT 8000
#define MAXLINE 80

int main(){
    struct  sockaddr_in servaddr;
    char buff[MAXLINE];
    int sockfd = socket(AF_INET, SOCK_STREAM,0);
    
    if(sockfd < 0){
        perror("socket");
        exit(1);
    }

    bzero(&servaddr,sizeof(servaddr));

    servaddr.sin_family=AF_INET;
    servaddr.sin_port = htons(SERV_PORT);

    inet_pton(AF_INET,"127.0.0.1",&servaddr.sin_addr);

    if(connect(sockfd,(struct sockaddr *)&servaddr,sizeof(servaddr))<0){
        perror("cnnect");
        exit(1);
    }
    
    printf("Connect to server .Type 'quit' to exit.\n");

    //死循环进行读入
    int n;
    while((n=read(0,buff,MAXLINE))>0){
    
    if(n > 0)buff[n-1] = '\0';

    //边界检查,只比较前四个字节
    if(strncmp(buff,"quit",4)==0){
        printf("Quitting ..\n");
        write(sockfd,buff,strlen(buff));
        break;
    }
    
    ssize_t bytes_written =write(sockfd,buff,strlen(buff));
     if(bytes_written!=strlen(buff)){
            perror("write error");
            break;
        }
    //读取云服务器响应
    n = read(sockfd,buff,MAXLINE);
        if(n<=0){
            if(n==0){
                printf("Server closed the connection.\n");
            }else{
                perror("read error");
            }
            break;
        }
        
        buff[n] = '\0';//确保响应字符串正确终止

        printf("Server  response: %s\n",buff);
        printf("Enter next message : ");
        fflush(stdout);//强制刷新输出缓冲区
        
    }  

    if(n<0){
        perror("read from stdin error");
    }
    close(sockfd);
    printf("Client exited.\n");
    return 0;
}