首页 > 其他分享 >利用线程池给客户端传文件

利用线程池给客户端传文件

时间:2024-06-07 21:24:06浏览次数:14  
标签:task thread int ret 线程 file fd 池给 客户端

主函数


#include <func.h>
#include "threadPool.h"
#include "server.h"

#define EVENTSNUM 100

int pipefd[2];

void sigHandle(int signo){
    printf("%d signal\n",signo);
    int exitNum = 1;
    write(pipefd[1],&exitNum,sizeof(int));
}

int main(int argc,char* argv[]){
    
    if(argc != 4){
        error(1,errno,"need three arguments");
    }
    char* ip = argv[1];
    int port = atoi(argv[2]);
    int threadNum = atoi(argv[3]);
    pipe(pipefd);

    pid_t pid = fork();
    if(pid > 0){
        signal(SIGUSR1,sigHandle);
        wait(NULL);
        exit(0);
    }

    //创建线程池
    threadPool *thread = calloc(1,sizeof(threadPool));
    ThreadPoolInit(thread,threadNum);
    ThreadPoolStart(thread);


    //设置套接字,绑定ip和端口号,并监听
    int listenFd = serverCreate(ip,port);
    if(listenFd == -1){
        perror("serverCreate");
    }
    printf("listening\n");
    //创建epoll
    int epfd = epoll_create1(0);
    if(epfd == -1){
        perror("epoll_create");
    }
    
    int ret = listenAdd(epfd,listenFd);
    if(ret == -1){
        perror("listenAdd");
    }
    listenAdd(epfd,pipefd[0]);
    struct epoll_event events[EVENTSNUM];
    
    while(1){
        int nReady = epoll_wait(epfd,events,EVENTSNUM,-1);
        for(int i = 0;i < nReady;i++){
            int fd = events[i].data.fd;
            if(fd == listenFd){
                printf("connection\n");
                int peerFd = accept(listenFd,NULL,NULL);
                if(peerFd == -1){
                    perror("accept");
                }
                TaskEnqueue(&thread->taskQueue,peerFd);
            }else if(fd == pipefd[0]){
                int result;
                printf("will exit\n");
                read(fd,&result,sizeof(result));
                ThreadPoolStop(thread);
                ThreadPoolDestroy(thread);
                close(epfd);
                close(listenFd);
                free(thread);
                exit(0);
            }
        }
    }
    return 0;
}

server


#include "server.h"
#include <func.h>
int serverCreate(char*ip,int port){
    int sfd = socket(AF_INET,SOCK_STREAM,0);
    if(sfd == -1){
        perror("socket");
    }

    struct sockaddr_in serverAddr;
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_addr.s_addr = inet_addr(ip);
    serverAddr.sin_port = htons(port);
    int on = 1;
    setsockopt(sfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on));

    int ret = bind(sfd,(struct sockaddr*)&serverAddr,sizeof(serverAddr));
    if(ret == -1){
        perror("bind");
    }
    ret = listen(sfd,10);
    if(ret == -1){
        perror("listen");
    }
    return sfd;
}

int listenAdd(int epfd,int fd){
    struct epoll_event ev;
    ev.data.fd = fd;
    ev.events = EPOLLIN;
    int ret = epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&ev);
    if(ret == -1){
        perror("epoll_ctl");
    }
    return 0;
}

int listenDel(int epfd,int fd){
    int ret = epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);
    if(ret == -1){
        perror("epoll_ctl");
    }
    return 0;
}

int sendFile(int clientFd,char* fileName){
    fileInfo file;

    memset(&file,0,sizeof(file));

    file.len = strlen(fileName);

    strcpy(file.fileContent,fileName);    
    send(clientFd,&file,4 + file.len,0);
    printf("fileName: %s\n",file.fileContent);

    int fd = open(fileName,O_RDWR);
    struct stat buf;
    int ret = fstat(fd,&buf);
    if(ret == -1){
        error(1,errno,"fstat");
    }


    file.len = buf.st_size;
    char buffer[100];
    read(fd,buffer,sizeof(buffer));
    strncpy(file.fileContent,buffer,file.len);
    send(clientFd,&file,4 + file.len,0);
    
    close(fd);
    printf("fileContent: %s\n",file.fileContent);
    return 0;
}

int sendBigFile(int clientFd,char* fileName){
    fileInfo file;
    file.len = strlen(fileName);

    strcpy(file.fileContent,fileName);    
    send(clientFd,&file,4 + file.len,0);
    printf("fileName: %s\n",file.fileContent);

    int fd = open(fileName,O_RDONLY);
    struct stat buf;
    int ret = fstat(fd,&buf);
    if(ret == -1){
        error(1,errno,"fstat");
    }


    file.len = buf.st_size;
    while(1){

        ret = read(fd,file.fileContent,1024);
        if(ret == 0){
            break;
        }
        file.len = ret;
        send(clientFd,&file,4 + file.len,0);
    } 

    printf("send file success\n");
    return 0;
}

threadPool


#include "threadPool.h"
#include "server.h"

void* ThreadFunc(void* args){
    taskQueue *q = args;
    char* fileName = "dog.jpg";
    while(1){
        int clienFd = TaskDequeue(q);
        if(clienFd == -1){
            break;
        }
        sendBigFile(clienFd,fileName);
        close(clienFd);
    }
    return NULL;
}

void ThreadPoolInit(threadPool* thread,int num){
    if(thread != NULL){
        thread->threads = calloc(num,sizeof(pthread_t));
        thread->threadNum = num;
        TaskQueueInit(&thread->taskQueue);

    }
}

void ThreadPoolStart(threadPool* thread){
    for(int i = 0;i < thread->threadNum;i++){
        printf("thread %d create\n",i);
        pthread_create(&thread->threads[i],NULL,ThreadFunc,&thread->taskQueue);
    }
}

void broadcastAll(taskQueue* q){
   q->exitFlag = 1;
   pthread_cond_broadcast(&q->taskCond);
}

void ThreadPoolStop(threadPool* thread){
    while(!TaskIsEmpty(&thread->taskQueue)){
        sleep(1);
        printf("there have tasks");
    }
    broadcastAll(&thread->taskQueue);
    for(int i = 0;i < thread->threadNum;i++){
        pthread_join(thread->threads[i],NULL);
    }

}

void ThreadPoolDestroy(threadPool* thread){
    if(thread != NULL){
        free(thread->threads);
        TaskQueueDestroy(&thread->taskQueue);        
    }
}

void TaskQueueInit(taskQueue* q){
    q = calloc(1,sizeof(taskQueue));
    q->pFront = q->pRear = NULL;
    pthread_cond_init(&q->taskCond,NULL);
    pthread_mutex_init(&q->taskMutex,NULL);
    q->taskSize = 0;
    q->exitFlag = 0;
}

void TaskEnqueue(taskQueue* task,int peerFd){
    pthread_mutex_lock(&task->taskMutex);
    if(task != NULL){
        taskNode* node = calloc(1,sizeof(taskNode));
        node->peerFd = peerFd;
        node->next = NULL;

        if(task->taskSize == 0){
            task->pRear = task->pFront = node;
        }else{
            task->pRear->next = node;
            task->pRear = node;
        }
        task->taskSize++;
        pthread_cond_signal(&task->taskCond);
        printf("task in the queue\n");
    }
    pthread_mutex_unlock(&task->taskMutex);
}

int TaskDequeue(taskQueue* task){

    if(task != NULL){
        pthread_mutex_lock(&task->taskMutex);
        while(task->taskSize == 0 && task->exitFlag == 0){
            pthread_cond_wait(&task->taskCond,&task->taskMutex);
        }
        
        int ret = -1;
        if(task->exitFlag == 0){
            taskNode* retNode = task->pFront;
            ret = retNode->peerFd;
            if(task->taskSize > 1){
                task->pFront = task->pFront->next;
            }else{
                task->pFront = task->pRear = NULL;
            }
            task->taskSize--;
            free(retNode);
        }
        pthread_mutex_unlock(&task->taskMutex);
        return ret;
    }

    return -1;
}

void TaskQueueDestroy(taskQueue* task){
    if(task != NULL){
        pthread_mutex_destroy(&task->taskMutex);
        pthread_cond_destroy(&task->taskCond);
    }
}

int TaskIsEmpty(taskQueue* task){
    return task->taskSize == 0;
}

标签:task,thread,int,ret,线程,file,fd,池给,客户端
From: https://www.cnblogs.com/EavenWang/p/18237877

相关文章

  • 使用jmeter,响应体response body中有两个同名的cookies时,如何获取第二个cookie进行跨线
     如图两个同名cookie:.AspNetCore.Cookies正则表达式提取器引用名称:loginCookie正则表达式:Set-Cookie:(.AspNetCore.Cookies=.*?;)模板:$1$(确保正确匹配到第二个.AspNetCore.Cookies)匹配数字2  beanshell后置处理程序${__setProperty(loginCookie,${loginCookie},)......
  • Python实现投递多线程任务
    使用Python的apscheduler库中的BackgroundScheduler实现投递多线程任务的示例代码。这个示例将展示如何根据任务ID投递和停止任务,设置任务同时执行的上限,以及删除全部任务。首先,确保你已经安装了apscheduler库:``pipinstallapscheduler``代码示例:``fromapscheduler.sched......
  • Python简单实现多线程例子
    使用Python实现多线程的例子,演示如何在主线程内分别启动ABC三个线程,并实现启动和停止指定线程的功能``importthreadingimporttime#定义一个全局标志,用于控制线程的运行状态stop_thread_A=Falsestop_thread_B=Falsestop_thread_C=False#线程A的函数......
  • 49.线程池的关闭方法
    shutdown方法1.线程池状态变为shutdown2.不会接收新任务3.已提交的任务会执行完4.此方法不会阻塞调用线程执行ExecutorServiceexecutorService=Executors.newFixedThreadPool(2);executorService.submit(()->{log.debug("task1running");......
  • 48.线程池提交任务的方法
     execute方法submit方法提交任务task,用返回值Future获得任务执行结果。Future用于主线程接受线程池中线程的返回结果。ExecutorServiceexecutorService=Executors.newFixedThreadPool(2);//提交第一个任务返回结果Future<String>future=execu......
  • UDP——实现C/S架构,有一台服务器,服务器中存储n首音频,要求客户端可以直接下载服务器的
    实现C/S架构,有一台服务器,服务器中存储n首音频,要求客户端可以直接下载服务器的音频,并且可以正常在客户端播放。服务器/*************************************************************************************************************************** filename: udp_ser......
  • Kafka源码分析(六)——Producer:Sender线程——Batch筛选
    作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬学习必须往深处挖,挖的越深,基础越扎实!阶段1、深入多线程阶段2、深入多线程设计模式阶段3、深入juc源码解析阶段4、深入jdk其余源码解析......
  • Kafka源码分析(七)——Producer:Sender线程——Broker连接检查
    作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬学习必须往深处挖,挖的越深,基础越扎实!阶段1、深入多线程阶段2、深入多线程设计模式阶段3、深入juc源码解析阶段4、深入jdk其余源码解析......
  • C语言通过socket实现TCP客户端
    socket概念​ 从wiki上了解,socket这个词追溯到1971年RFC147的发布。​ 目前我的理解:常用于指操作系统提供的API,该API允许使用TCP、UDP进行连接,但不仅限于TCP、UDP协议。实现目的利用系统提供函数接口,通过C语言实现对TCP服务器(IP地址)的连接,以及收发数据。实现......
  • 实战:干掉高德地图7.2.0版iOS客户端的反动态调试保护
    沙梓社snakeninny315年2月 高德是中国领先的数字地图内容、导航和位置服务解决方案提供商。苹果自带的地图采用的就是高德的数据,足见高德之权威 昨天突发奇想,对高德地图上中一个官方不提供的功能产生了浓厚的兴趣,试图通过hack的方式来实现这个功能。谁知刚架上LLDB......