需求以及思路
客户端启动以后,需要去连接服务端,并在控制台输入消息发送到服务端,服务端收到该消息后发送给所有已连接的客户端。
所以客户端需要做的事情只有两个:
- 接收用户输入并将其发送到服务端
- 接收服务端消息并将其显示到控制台
服务端要做的事情也是两个:
- 接待新连接上的客户端,为其分配一个服务者
- 接收每个客户端的消息,并将其发送给所有客户端
通过分析以上的需求,很容易得出一个简单的实现思路,使用多线程或者多进程来实现。
- 客户端连接到服务端之后,创建一个线程来处理服务端的消息,然后在主线程中处理控制台输入。
- 服务端在主线程中监听请求,当有客户端连接时分配一个线程用于服务该客户端。
- 并且需要记录所有已连接的客户端,以便能够给所有客户端发送消息。
这种方式很简单粗暴,每次连接上一个客户端时服务端就会新建一个线程或者进程,这并不是很理智的一种做法。
使用多线程实现
客户端
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <iostream>
#include "thread"
int main(){
std::atomic_bool stop = false;
// 服务端地址
struct sockaddr_in serv_addr{};
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
serv_addr.sin_port = htons(1234);
//创建socket并连接到服务端
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(connect(fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr))<0){
std::cerr<<"connect failed."<<std::endl;
return -1;
}
//连接建立后,创建一个线程读取数据
std::thread t([&](){
thread_local char buffer[1024];
while(!stop){
memset(buffer,0,sizeof (buffer));
ssize_t bytes_len = recv(fd,buffer,sizeof(buffer),0);
if(bytes_len==0){
std::cout<<"server closed."<<std::endl;
break;
}else if(bytes_len<0){
std::cout<<"recv error. exit!"<<std::endl;
break;
}else{
std::cout<<buffer<<std::endl;
}
}
stop = true;
exit(0);
});
//主线程中输入并发送
while(!stop){
char buffer[1024];
std::cin>>buffer;
if(stop||strcmp(buffer,"quit")==0){
break;
}
ssize_t bytes_len = send(fd, buffer, sizeof(buffer),0);
if(bytes_len<0)
{
std::cout<<"send error. exit!"<<std::endl;
break;
}
}
stop = true;
t.join();
close(fd);
return 0;
}
服务端
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <list>
#include <thread>
std::atomic_bool stop = false;
std::mutex mu;
// 存储所有客户端文件描述符的容器
std::vector<int> clientFdVec;
// 存储所有服务线程的容器
std::vector<std::thread> clientWorkerVec;
/**
* @brief notify 发送消息给某个指定的客户端或者所有的客户端
* @param msg
* @param receiverFd -1表示发送给所有客户端,若要发送给指定客户端,该值需要>1
* @param exceptFd receiverFd为-1时生效,即不发送给某客户端
*/
void notify(const std::string & msg,int receiverFd=-1,int exceptFd=-1){
if(receiverFd<0){
std::lock_guard<std::mutex> lk(mu);
for(int cfd : clientFdVec){
if(cfd==exceptFd) {
continue;
}
send(cfd,msg.c_str(),msg.length(),0);
}
}else{
send(receiverFd,msg.c_str(),msg.length(),0);
}
}
/**
* @brief 客户端服务线程
* @param fd
*/
void clientWork(int fd){
char buffer[1024];
while(!stop){
memset(buffer,0,sizeof(buffer));
ssize_t bytes_len = recv(fd,buffer,sizeof(buffer),0);
if(bytes_len==0){
std::cout<<"client "<<fd<<" closed."<<std::endl;
break;
} else if(bytes_len<0){
std::cerr<<"client "<<fd<<" recv error."<<std::endl;
break;
}else{
std::string msg = std::to_string(fd)+":"+buffer;
notify(msg);
}
}
std::lock_guard<std::mutex> lk(mu);
auto it = std::remove_if(clientFdVec.begin(), clientFdVec.end(),[&](int element){
return element == fd;
});
clientFdVec.erase(it, clientFdVec.end());
}
int main() {
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
addr.sin_port = htons(1234);
// 创建socket
int fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
// 绑定socket和addr
if(bind(fd, (struct sockaddr *) &addr, sizeof(addr))<0){
std::cout << "bind failed!" << std::endl;
return -1;
}
// 监听指定地址端口
while(0==listen(fd, 20)){
//接收连接请求
sockaddr_in clientAddr{};
socklen_t clientAddrLen = sizeof(clientAddr);
int clientFd = accept(fd, (struct sockaddr*)&clientAddr, &clientAddrLen);
if(clientFd<0){
std::cerr<<"accept error."<<std::endl;
break;
}
//记录这个客户端
{
std::lock_guard<std::mutex> lk(mu);
clientFdVec.emplace_back(clientFd);
}
//开启一个线程服务这个客户端
clientWorkerVec.emplace_back(clientWork,clientFd);
//给所有客户端发送消息:welcome xxx!
notify(std::string("welcome ")+std::to_string(clientFd));
}
stop = true;
for(auto & t : clientWorkerVec){
t.join();
}
close(fd);
for(int cfd : clientFdVec){
close(cfd);
}
return 0;
}
使用select解决线程创建过多的问题
select基本用法
详细用法以及原理参考 关于Select Model的两篇译文
//#include <sys/select.h>
void FD_CLR(fd, fd_set *fdset);
void FD_COPY(fd_set *fdset_orig, fd_set *fdset_copy);
int FD_ISSET(fd, fd_set *fdset);
void FD_SET(fd, fd_set *fdset);
void FD_ZERO(fd_set *fdset);
int select(int nfds,
fd_set *restrict readfds,
fd_set *restrict writefds,
fd_set *restrict errorfds,
struct timeval *restrict timeout);
简单示例
#include <sys/select.h>
#include <cstdio>
#include <unistd.h>
int main() {
fd_set rd;
struct timeval tv{};
int ret;
FD_ZERO(&rd); //所有位置归零
// #include <unistd.h>
// 0:STDIN_FILENO 标准输入
// 1:STDOUT_FILENO 标准输出
// 2:STDERR_FILENO 标准错误输出
FD_SET(0, &rd); //标准输入文件描述符加入到rd集合中
// 设置超时时间位5s
tv.tv_sec = 3;
tv.tv_usec = 0;
// 将rd集合进行select,监听其可读事件
// 程序在这里阻塞,直到超时或者标准输入上有数据可读
ret = select(1, &rd, nullptr, nullptr, &tv);
if (ret == 0) // Timeout
{
printf("select timeout!\n");
} else if (ret == -1) // Failure
{
printf("fail to select!\n");
} else // Successful
{
printf("data is available!\n");
char buffer[1024]{0};
read(0,buffer,sizeof(buffer));
printf("the data is [%s]!\n",buffer);
}
return 0;
}
服务端
#include <iostream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <list>
#include <thread>
std::atomic_bool stop = false;
int main() {
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = inet_addr("127.0.0.1");
addr.sin_port = htons(1234);
fd_set readFdSet;
fd_set tmpReadFdSet;
FD_ZERO(&readFdSet);
int srvFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (bind(srvFd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
std::cout << "bind failed!" << std::endl;
return -1;
}
listen(srvFd, 5);
FD_SET(srvFd, &readFdSet);
int ret;
while (!stop) {
tmpReadFdSet = readFdSet;
ret = select(FD_SETSIZE, &tmpReadFdSet, nullptr, nullptr, nullptr);
if (ret == 0) {
std::cerr << "time out." << std::endl;
break;
} else if (ret < 0) {
std::cerr << "select error." << std::endl;
break;
}
for (int fd = 0; fd < FD_SETSIZE; fd++) {
if (!FD_ISSET(fd, &tmpReadFdSet)) {
continue;
}
if (fd == srvFd) //处理server
{
struct sockaddr_in client_address{};
socklen_t client_len = sizeof(client_address);
int cfd = accept(fd, (struct sockaddr *) &client_address, &client_len);
FD_SET(cfd, &readFdSet);
std::cout << "client[" << cfd << "]connected." << std::endl;
} else //处理消息
{
char buffer[1024];
memset(buffer, 0, sizeof(buffer));
ssize_t bytes_len = recv(fd, buffer, sizeof(buffer), 0);
if (bytes_len == 0) {
std::cout << "client " << fd << " closed." << std::endl;
FD_CLR(fd, &readFdSet);
std::string msg = std::to_string(fd) + " leaved.";
for (int i = 1; i < FD_SETSIZE; i++) {
if (i != srvFd && FD_ISSET(i, &readFdSet)) {
send(i, msg.c_str(), msg.length(), 0);
}
}//end for
break;
} else if (bytes_len < 0) {
std::cerr << "client " << fd << " recv error." << std::endl;
break;
} else {
//send message to all
std::string msg = std::to_string(fd) + ":" + buffer;
for (int i = 1; i < FD_SETSIZE; i++) {
if (i != srvFd && FD_ISSET(i, &readFdSet)) {
send(i, msg.c_str(), msg.length(), 0);
}
}//end for
}
}
}
}
return 0;
}
客户端
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <iostream>
#include "thread"
int main(){
std::atomic_bool stop = false;
// 服务端地址
struct sockaddr_in serv_addr{};
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
serv_addr.sin_port = htons(1234);
//创建socket并连接到服务端
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(connect(fd, (struct sockaddr*)&serv_addr, sizeof(serv_addr))<0){
std::cerr<<"connect failed."<<std::endl;
return -1;
}
//连接建立后,创建一个线程读取数据
std::thread t([&](){
thread_local char buffer[1024];
while(!stop){
memset(buffer,0,sizeof (buffer));
ssize_t bytes_len = recv(fd,buffer,sizeof(buffer),0);
if(bytes_len==0){
std::cout<<"server closed."<<std::endl;
break;
}else if(bytes_len<0){
std::cout<<"recv error. exit!"<<std::endl;
break;
}else{
std::cout<<buffer<<std::endl;
}
}
stop = true;
exit(0);
});
//主线程中输入并发送
while(!stop){
char buffer[1024];
std::cin>>buffer;
if(stop||strcmp(buffer,"quit")==0){
break;
}
ssize_t bytes_len = send(fd, buffer, sizeof(buffer),0);
if(bytes_len<0)
{
std::cout<<"send error. exit!"<<std::endl;
break;
}
}
stop = true;
t.join();
close(fd);
return 0;
}
标签:聊天室,addr,int,C++,buffer,fd,TCP,include,客户端
From: https://www.cnblogs.com/pengpengda/p/18165416