首页 > 系统相关 >linux 8 基于线程池和epoll监听实现聊天服务器

linux 8 基于线程池和epoll监听实现聊天服务器

时间:2023-07-31 15:44:26浏览次数:44  
标签:task epoll int cfd pthread thrPool 线程 linux pool

1.立项

功能
1.聊天总人数显示
2.账号密码注册功能-保留名字-永久保留id->保留id功能取消
3.总聊天室-进入前可输入名字 顺序id
4.聊天室聊天
5.单对单聊天
6.id=cfd串联起来

4.服务器代码
#include "threadpoolsimple.h"
//初始化结构体
#include<stdio.h>
ThreadPool* thrPool = NULL;
typedef struct  sumfd
{
    int cfd;
    char name[64];

}sumfd;
struct sumfd sumfd1[1024];
int beginnum = 1000;
///int sumfd[1024] = { 0 };//所有fd保存处 拷贝一份 不去对ev进行读取 或者再次写入
int lx = 0;
//线程回调 线程抢任务的具体方法
void* thrRun(void* arg)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    ThreadPool* pool = (ThreadPool*)arg;//参数给予
    int taskpos = 0;//任务位置
    PoolTask* task = (PoolTask*)malloc(sizeof(PoolTask));//开辟一个任务数组

    while (1)
    {
        //获取任务,先要尝试加锁
        pthread_mutex_lock(&thrPool->pool_lock);

        //无任务并且线程池不是要摧毁
        while (thrPool->job_num <= 0 && !thrPool->shutdown)
        {
            //如果没有任务,线程会阻塞
            pthread_cond_wait(&thrPool->not_empty_task, &thrPool->pool_lock);//阻塞此处等待信号
        }

        if (thrPool->job_num)
        {
            //有任务需要处理
            taskpos = (thrPool->job_pop++) % thrPool->max_job_num;
            //printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
            //为什么要拷贝?避免任务被修改,生产者会添加任务
            memcpy(task, &thrPool->tasks[taskpos], sizeof(PoolTask));//将任务数组结构体拷贝一份 防止覆盖任务
            task->arg = task;
            thrPool->job_num--;
            //task = &thrPool->tasks[taskpos];
            pthread_cond_signal(&thrPool->empty_task);//通知生产者
        }

        if (thrPool->shutdown)
        {
            //代表要摧毁线程池,此时线程退出即可
            //pthread_detach(pthread_self());//临死前分家
            pthread_mutex_unlock(&thrPool->pool_lock);
            free(task);
            pthread_exit(NULL);
        }

        //释放锁
        pthread_mutex_unlock(&thrPool->pool_lock);
        task->task_func(task->arg);//执行回调函数
    }

    //printf("end call %s-----\n",__FUNCTION__);
}

//创建线程池
void create_threadpool(int thrnum, int maxtasknum)
{
    printf("begin call %s-----\n", __FUNCTION__);
    thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));//给结构体赋值

    thrPool->thr_num = thrnum;//线程个数
    thrPool->max_job_num = maxtasknum;//最大任务个数
    thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
    thrPool->job_push = 0;//任务队列添加的位置
    thrPool->job_pop = 0;//任务队列出队的位置
    thrPool->job_num = 0;//初始化的任务个数为0

    thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask) * maxtasknum));//申请最大的任务队列数组

    //初始化锁和条件变量
    pthread_mutex_init(&thrPool->pool_lock, NULL);//上锁
    pthread_cond_init(&thrPool->empty_task, NULL);//条件变量1
    pthread_cond_init(&thrPool->not_empty_task, NULL);//条件变量2

    int i = 0;
    thrPool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thrnum);//申请n个线程id的空间 线程数组 
    //设置线程自动分离
    pthread_attr_t attr;
    pthread_attr_init(&attr);
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
    //创建线程
    for (i = 0; i < thrnum; i++)
    {
        pthread_create(&thrPool->threads[i], &attr, thrRun, (void*)thrPool);//创建多个线程->追线程回调
    }
    //printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool* pool)
{
    pool->shutdown = 1;//开始自爆
    pthread_cond_broadcast(&pool->not_empty_task);//诱杀 

    int i = 0;
    for (i = 0; i < pool->thr_num; i++)
    {
        pthread_join(pool->threads[i], NULL);
    }

    pthread_cond_destroy(&pool->not_empty_task);
    pthread_cond_destroy(&pool->empty_task);
    pthread_mutex_destroy(&pool->pool_lock);

    free(pool->tasks);
    free(pool->threads);
    free(pool);
}

//添加任务到线程池
void addtask(ThreadPool* pool, int fd, struct epoll_event* evs)
{
    //printf("begin call %s-----\n",__FUNCTION__);
    pthread_mutex_lock(&pool->pool_lock);

    //实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
    while (pool->max_job_num <= pool->job_num)
    {
        pthread_cond_wait(&pool->empty_task, &pool->pool_lock);
    }

    int taskpos = (pool->job_push++) % pool->max_job_num;
    //printf("add task %d  tasknum===%d\n",taskpos,beginnum);
    pool->tasks[taskpos].tasknum = beginnum++;
    pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];
    pool->tasks[taskpos].task_func = taskRun;
    pool->tasks[taskpos].fd = fd;
    pool->tasks[taskpos].evs = evs;
    pool->job_num++;

    pthread_mutex_unlock(&pool->pool_lock);

    pthread_cond_signal(&pool->not_empty_task);//通知包身工
    //printf("end call %s-----\n",__FUNCTION__);
}

//任务回调函数
void taskRun(void* arg)
{

    //
    PoolTask* task = (PoolTask*)arg;
    char buf[1024] = "";
    int n = Read(task->fd, buf, sizeof(buf));
    if (n == 0)
    {
        printf("客户端%d 即将关闭\n", task->fd);
        for (int i = 0; i < 1024; i++)
        {
            if (sumfd1[i].cfd == task->fd)
            {
                sumfd1[i].cfd = 0;

                break;
            }

        }
        close(task->fd);//关闭cfd
        epoll_ctl(task->epfd, EPOLL_CTL_DEL, task->fd, task->evs);//将cfd上树

    }
    else if (n > 0)
    {
        for (int i = 0; i < 1024; i++)
        {
            if (sumfd1[i].cfd != 0)
            {
                //"ID XX 发送:
                char buf1[80] = "id ";
int iu=0;
for(iu=0;iu<1024;iu++)
{
if(sumfd1[iu].cfd==task->fd)
{
break;
}


}
               
                sprintf(buf1, "name: %s  发送:%s ", sumfd1[iu].name,buf);
                send(sumfd1[i].cfd, buf1, sizeof(buf1), 0);
               // send(sumfd1[i].cfd, buf, sizeof(buf), 0);
            }
            else
            {
                lx++;

            }
            if (lx >= 4)
            {
                lx = 0;
                break;
            }

        }
    }
    lx = 0;


}


void prtip(struct sockaddr_in* cliaddr)
{
    char ip[16] = "";
    printf("主机 ip=%s port=%d 即将链接\n", inet_ntop(AF_INET, &(cliaddr->sin_addr.s_addr), ip, 16),
        ntohs(cliaddr->sin_port));

}
char bufname[90] = "";
int kg=0;
int main()
{

    create_threadpool(3, 20);
    printf("请输入服务器端口号\n");
    char buf[8] = "";

    read(STDIN_FILENO, buf, sizeof(buf));
    printf("   \n");
    printf("等待客户端链接\n");

    //
   


    int port = atoi((char*)buf);
    //创建 绑定
    int lfd = tcp4bind(port, NULL);
    //监听
    Listen(lfd, 128);
    //前置操作
    int hhs = epoll_create(1);//创建树
    struct epoll_event ev, evs[1024];//进行上树等等结构体
    //将lfd上树
    ev.data.fd = lfd;
    ev.events = EPOLLIN;
    epoll_ctl(hhs, EPOLL_CTL_ADD, lfd, &ev);

    //存放信息
    struct sockaddr_in as;
    socklen_t len = sizeof(&as);
    char buf12[600] = "";
    while (1)
    {
        //监听开始
int sum;
if(kg==0)
{
  sum= epoll_wait(hhs, evs, 1024, -1);
}else
{
sum=0;
}
       
        printf("监听中\n");
        if (sum == 0)
        {
            continue;
        }
        else if (sum > 0)
        {
            //判断是lfd 还是cfd
            for (int i = 0; i < sum; i++)
            {
                if (evs[i].data.fd == lfd && evs[i].events & EPOLLIN)
                {
                    //lfd变化 
                    int cfd = Accept(lfd, (struct sockaddr*)&as, &len);
                    //设置非阻塞
                    int flage = fcntl(cfd, F_GETFL);//获取CFD标志位
                    flage |= O_NONBLOCK;
                    fcntl(cfd, F_SETFL, flage);
                    prtip(&as);
                    int m = 0;
                    int i2 = 0;
                       for (i2 = 0; i2 < 1024; i2++)//实际循环次数会很少
                    {
                        if (sumfd1[i2].cfd == 0)
                        {
                            sumfd1[i2].cfd = cfd;
kg=1;
                             printf("请在5S内输入名字\n");

                             sleep(5);
                             char buf2[80]="";
                             Read(cfd ,buf2, sizeof(buf2));
                             strcpy(sumfd1[i2].name,buf2);
                             
                             kg=0;
                            m = i2;
                            break;

                        }
                    }

                    //加入到监听集合
                    //上树
                    ev.data.fd = cfd;
                    ev.events = EPOLLIN | EPOLLET;
                    epoll_ctl(hhs, EPOLL_CTL_ADD, cfd, &ev);
                   
                   // sumfd1
                    
                    //"ID XX 发送:
                    strcpy(buf12, " ");
                    sprintf(buf12, "你的编号 %d 成功注册 切记不要随意告诉别人哦\n当前聊天室总人数%d \n",cfd, m + 1);
                    send(cfd, buf12, sizeof(buf12), 0);


                    lx = 0;

                }
                else if (evs[i].events & EPOLLIN)
                {
                    addtask(thrPool, evs[i].data.fd, &ev);

                }




            }//for



        }//else



    }///while
    return 0;
}


3.总结

不完善 客户端没写 会等技术上来用qt重写一次聊天服务器

标签:task,epoll,int,cfd,pthread,thrPool,线程,linux,pool
From: https://www.cnblogs.com/lzfyz/p/17593624.html

相关文章

  • Linux文件系统与日志分析
    目录Linux文件系统与日志分析日志是解决问题的唯一手段1.inode表结构1.1元数据1.2inode内容1.3inode内容1.4目录文件的结构1.5inode的号码1.6恢复XFS类型的文件2.日志服务管理2.1系统日志介绍2.2rsyslog系统日志服务2.3rsyslog日志管理2.3.1系统日志术语2.4日志文件2.......
  • linux的hw_breakpoint
    参考:https://martin.uy/blog/hardware-breakpoints-in-the-linux-kernel-through-perf_events/https://www.cnblogs.com/sunkang/archive/2011/05/04/2038816.htmlhttps://www.cnblogs.com/hellokitty2/p/16212629.htmlhttps://blog.csdn.net/shenhuxi_yu/article/details/......
  • Linux集群监控部署: prometheus 普罗米修斯 + Grafana
    前言之前我们有用到top、free、iostat等等命令,去监控服务器的性能,但是这些命令,我们只针对单台服务器进行监控,通常我们线上都是一个集群的项目,难道我们需要每一台服务器都去敲命令监控吗?这样显然不是符合逻辑的,Linux中就提供了一个集群监控工具–prometheus。prometheus监......
  • Linux fdisk command All In One
    LinuxfdiskcommandAllInOnediskpartition/磁盘分区$fdisk-hUsage:fdisk[options]<disk>changepartitiontablefdisk[options]-l[<disk>...]listpartitiontable(s)Displayormanipulateadiskpartitiontable.Options:......
  • linux环境中,如何查看网络设备的序列号?
    通过iplink查看网络设备的序列号 iplink  查询结果中,最左边的一列,就是这个网络接口,在主机上的序列号。......
  • CoaXPress 2.0 FPGA HOST IP Core Linux Demo
      目录Hello-FPGACoaXPress2.0HostFPGAIPCoreLinuxDemo41说明42设备连接73VIVADOFPGA工程74调试说明10图1‑1资料目录4图1‑2VIVADO工程目录结构5图1‑3SDK工程目录结构5图1‑4设备树信息6图1‑5petalinux应用程序6图2‑1ZCU10......
  • Alpine Linux使用入门(Docker视角)
    前言我们在了解AlpineLinux时,多数都应该是从docker系统镜像了解的这个操作系统,今天我们就简单说一下AlpineLinux的基础使用AlpineLinux是一种基于musl和BusyBox的Linux发行版,专为安全性、简单性和资源效率而设计。体积非常小巧,适合用来做Docker镜像。如果你有Centos或者Ubun......
  • Linux网络编程
    1Socket在linux网络编程中我们主要使用套接字Socke进行不同主机上进程间的通信,该套接字提供了透明传输接口使得我们不需要根据协议栈进行手动封装数据包,我们不必在意协议栈上下层之间的具体服务,而是仅需调用提供的api即可套接字通信的一般流程为:创建套接字:在应用程序中使用网......
  • 【高并发】SimpleDateFormat类到底为啥不是线程安全的?(附六种解决方案,建议收藏)
    大家好,我是冰河~~首先问下大家:你使用的SimpleDateFormat类还安全吗?为什么说SimpleDateFormat类不是线程安全的?带着问题从本文中寻求答案。提起SimpleDateFormat类,想必做过Java开发的童鞋都不会感到陌生。没错,它就是Java中提供的日期时间的转化类。这里,为什么说SimpleDateFormat......
  • linux中如何修改网络命名空间中veth设备端点的名字?
    查看原有的设备名称为veth1  [root@centos7~]#ipnetnsexecns1iplink1:lo:<LOOPBACK>mtu65536qdiscnoopstateDOWNmodeDEFAULTgroupdefaultqlen1000link/loopback00:00:00:00:00:00brd00:00:00:00:00:005:veth1@if6:<BROADCAST,MULTIC......