1.立项
功能
1.聊天总人数显示
2.账号密码注册功能-保留名字-永久保留id->保留id功能取消
3.总聊天室-进入前可输入名字 顺序id
4.聊天室聊天
5.单对单聊天
6.id=cfd串联起来
4.服务器代码
#include "threadpoolsimple.h"
//初始化结构体
#include<stdio.h>
ThreadPool* thrPool = NULL;
typedef struct sumfd
{
int cfd;
char name[64];
}sumfd;
struct sumfd sumfd1[1024];
int beginnum = 1000;
///int sumfd[1024] = { 0 };//所有fd保存处 拷贝一份 不去对ev进行读取 或者再次写入
int lx = 0;
//线程回调 线程抢任务的具体方法
void* thrRun(void* arg)
{
//printf("begin call %s-----\n",__FUNCTION__);
ThreadPool* pool = (ThreadPool*)arg;//参数给予
int taskpos = 0;//任务位置
PoolTask* task = (PoolTask*)malloc(sizeof(PoolTask));//开辟一个任务数组
while (1)
{
//获取任务,先要尝试加锁
pthread_mutex_lock(&thrPool->pool_lock);
//无任务并且线程池不是要摧毁
while (thrPool->job_num <= 0 && !thrPool->shutdown)
{
//如果没有任务,线程会阻塞
pthread_cond_wait(&thrPool->not_empty_task, &thrPool->pool_lock);//阻塞此处等待信号
}
if (thrPool->job_num)
{
//有任务需要处理
taskpos = (thrPool->job_pop++) % thrPool->max_job_num;
//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
//为什么要拷贝?避免任务被修改,生产者会添加任务
memcpy(task, &thrPool->tasks[taskpos], sizeof(PoolTask));//将任务数组结构体拷贝一份 防止覆盖任务
task->arg = task;
thrPool->job_num--;
//task = &thrPool->tasks[taskpos];
pthread_cond_signal(&thrPool->empty_task);//通知生产者
}
if (thrPool->shutdown)
{
//代表要摧毁线程池,此时线程退出即可
//pthread_detach(pthread_self());//临死前分家
pthread_mutex_unlock(&thrPool->pool_lock);
free(task);
pthread_exit(NULL);
}
//释放锁
pthread_mutex_unlock(&thrPool->pool_lock);
task->task_func(task->arg);//执行回调函数
}
//printf("end call %s-----\n",__FUNCTION__);
}
//创建线程池
void create_threadpool(int thrnum, int maxtasknum)
{
printf("begin call %s-----\n", __FUNCTION__);
thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));//给结构体赋值
thrPool->thr_num = thrnum;//线程个数
thrPool->max_job_num = maxtasknum;//最大任务个数
thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
thrPool->job_push = 0;//任务队列添加的位置
thrPool->job_pop = 0;//任务队列出队的位置
thrPool->job_num = 0;//初始化的任务个数为0
thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask) * maxtasknum));//申请最大的任务队列数组
//初始化锁和条件变量
pthread_mutex_init(&thrPool->pool_lock, NULL);//上锁
pthread_cond_init(&thrPool->empty_task, NULL);//条件变量1
pthread_cond_init(&thrPool->not_empty_task, NULL);//条件变量2
int i = 0;
thrPool->threads = (pthread_t*)malloc(sizeof(pthread_t) * thrnum);//申请n个线程id的空间 线程数组
//设置线程自动分离
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
//创建线程
for (i = 0; i < thrnum; i++)
{
pthread_create(&thrPool->threads[i], &attr, thrRun, (void*)thrPool);//创建多个线程->追线程回调
}
//printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool* pool)
{
pool->shutdown = 1;//开始自爆
pthread_cond_broadcast(&pool->not_empty_task);//诱杀
int i = 0;
for (i = 0; i < pool->thr_num; i++)
{
pthread_join(pool->threads[i], NULL);
}
pthread_cond_destroy(&pool->not_empty_task);
pthread_cond_destroy(&pool->empty_task);
pthread_mutex_destroy(&pool->pool_lock);
free(pool->tasks);
free(pool->threads);
free(pool);
}
//添加任务到线程池
void addtask(ThreadPool* pool, int fd, struct epoll_event* evs)
{
//printf("begin call %s-----\n",__FUNCTION__);
pthread_mutex_lock(&pool->pool_lock);
//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理)
while (pool->max_job_num <= pool->job_num)
{
pthread_cond_wait(&pool->empty_task, &pool->pool_lock);
}
int taskpos = (pool->job_push++) % pool->max_job_num;
//printf("add task %d tasknum===%d\n",taskpos,beginnum);
pool->tasks[taskpos].tasknum = beginnum++;
pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];
pool->tasks[taskpos].task_func = taskRun;
pool->tasks[taskpos].fd = fd;
pool->tasks[taskpos].evs = evs;
pool->job_num++;
pthread_mutex_unlock(&pool->pool_lock);
pthread_cond_signal(&pool->not_empty_task);//通知包身工
//printf("end call %s-----\n",__FUNCTION__);
}
//任务回调函数
void taskRun(void* arg)
{
//
PoolTask* task = (PoolTask*)arg;
char buf[1024] = "";
int n = Read(task->fd, buf, sizeof(buf));
if (n == 0)
{
printf("客户端%d 即将关闭\n", task->fd);
for (int i = 0; i < 1024; i++)
{
if (sumfd1[i].cfd == task->fd)
{
sumfd1[i].cfd = 0;
break;
}
}
close(task->fd);//关闭cfd
epoll_ctl(task->epfd, EPOLL_CTL_DEL, task->fd, task->evs);//将cfd上树
}
else if (n > 0)
{
for (int i = 0; i < 1024; i++)
{
if (sumfd1[i].cfd != 0)
{
//"ID XX 发送:
char buf1[80] = "id ";
int iu=0;
for(iu=0;iu<1024;iu++)
{
if(sumfd1[iu].cfd==task->fd)
{
break;
}
}
sprintf(buf1, "name: %s 发送:%s ", sumfd1[iu].name,buf);
send(sumfd1[i].cfd, buf1, sizeof(buf1), 0);
// send(sumfd1[i].cfd, buf, sizeof(buf), 0);
}
else
{
lx++;
}
if (lx >= 4)
{
lx = 0;
break;
}
}
}
lx = 0;
}
void prtip(struct sockaddr_in* cliaddr)
{
char ip[16] = "";
printf("主机 ip=%s port=%d 即将链接\n", inet_ntop(AF_INET, &(cliaddr->sin_addr.s_addr), ip, 16),
ntohs(cliaddr->sin_port));
}
char bufname[90] = "";
int kg=0;
int main()
{
create_threadpool(3, 20);
printf("请输入服务器端口号\n");
char buf[8] = "";
read(STDIN_FILENO, buf, sizeof(buf));
printf(" \n");
printf("等待客户端链接\n");
//
int port = atoi((char*)buf);
//创建 绑定
int lfd = tcp4bind(port, NULL);
//监听
Listen(lfd, 128);
//前置操作
int hhs = epoll_create(1);//创建树
struct epoll_event ev, evs[1024];//进行上树等等结构体
//将lfd上树
ev.data.fd = lfd;
ev.events = EPOLLIN;
epoll_ctl(hhs, EPOLL_CTL_ADD, lfd, &ev);
//存放信息
struct sockaddr_in as;
socklen_t len = sizeof(&as);
char buf12[600] = "";
while (1)
{
//监听开始
int sum;
if(kg==0)
{
sum= epoll_wait(hhs, evs, 1024, -1);
}else
{
sum=0;
}
printf("监听中\n");
if (sum == 0)
{
continue;
}
else if (sum > 0)
{
//判断是lfd 还是cfd
for (int i = 0; i < sum; i++)
{
if (evs[i].data.fd == lfd && evs[i].events & EPOLLIN)
{
//lfd变化
int cfd = Accept(lfd, (struct sockaddr*)&as, &len);
//设置非阻塞
int flage = fcntl(cfd, F_GETFL);//获取CFD标志位
flage |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flage);
prtip(&as);
int m = 0;
int i2 = 0;
for (i2 = 0; i2 < 1024; i2++)//实际循环次数会很少
{
if (sumfd1[i2].cfd == 0)
{
sumfd1[i2].cfd = cfd;
kg=1;
printf("请在5S内输入名字\n");
sleep(5);
char buf2[80]="";
Read(cfd ,buf2, sizeof(buf2));
strcpy(sumfd1[i2].name,buf2);
kg=0;
m = i2;
break;
}
}
//加入到监听集合
//上树
ev.data.fd = cfd;
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(hhs, EPOLL_CTL_ADD, cfd, &ev);
// sumfd1
//"ID XX 发送:
strcpy(buf12, " ");
sprintf(buf12, "你的编号 %d 成功注册 切记不要随意告诉别人哦\n当前聊天室总人数%d \n",cfd, m + 1);
send(cfd, buf12, sizeof(buf12), 0);
lx = 0;
}
else if (evs[i].events & EPOLLIN)
{
addtask(thrPool, evs[i].data.fd, &ev);
}
}//for
}//else
}///while
return 0;
}
3.总结
不完善 客户端没写 会等技术上来用qt重写一次聊天服务器
标签:task,epoll,int,cfd,pthread,thrPool,线程,linux,pool From: https://www.cnblogs.com/lzfyz/p/17593624.html