//端1
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include <signal.h>
#define TINGYU_PATH "/home/ubuntu/youan/tingyu_fifo" // 定义管道1的路径
#define YOUAN_PATH "/home/ubuntu/youan/youan_fifo" // 定义管道2的路径
// 定义全局变量
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
// 线程函数声明
void* send_thread(void* arg);
void* recv_thread(void* arg);
void cleanup(int sig); // 清理函数声明
int tingyu_fd, youan_fd; // 全局文件描述符
int main() {
int ret;
pthread_t send_tid, recv_tid; // 发送和接收线程的标识符
// 设置信号处理程序
signal(SIGINT, cleanup);
// 创建管道1
ret = mkfifo(TINGYU_PATH, 0777);
if (ret < 0 && errno != EEXIST) {
perror("mkfifo TINGYU_PATH failed"); // 创建失败,打印错误信息
return -1;
}
// 创建管道2
ret = mkfifo(YOUAN_PATH, 0777);
if (ret < 0 && errno != EEXIST)
{
perror("mkfifo YOUAN_PATH failed"); // 创建失败,打印错误信息
return -1;
}
// 打开管道1用于读取(从接收端接收数据)
tingyu_fd = open(TINGYU_PATH, O_RDONLY | O_NONBLOCK);
if (tingyu_fd < 0) {
perror("open TINGYU_PATH failed"); // 打开失败,打印错误信息
return -1;
}
// 打开管道2用于写入(向接收端发送数据)
youan_fd = open(YOUAN_PATH, O_WRONLY);
if (youan_fd < 0) {
perror("open YOUAN_PATH failed"); // 打开失败,打印错误信息
close(tingyu_fd); // 关闭管道1的文件描述符
return -1;
}
// 创建发送线程
ret = pthread_create(&send_tid, NULL, send_thread, (void*)&youan_fd);
if (ret)
{
perror("pthread_create send_tid failed"); // 创建线程失败,打印错误信息
close(tingyu_fd);
close(youan_fd);
return -1;
}
// 创建接收线程
ret = pthread_create(&recv_tid, NULL, recv_thread, (void*)&tingyu_fd);
if (ret) {
perror("pthread_create recv_tid failed"); // 创建线程失败,打印错误信息
close(tingyu_fd);
close(youan_fd);
return -1;
}
// 等待线程结束
pthread_join(send_tid, NULL);
pthread_join(recv_tid, NULL);
// 关闭文件描述符
close(tingyu_fd);
close(youan_fd);
// 删除管道文件
unlink(TINGYU_PATH);
unlink(YOUAN_PATH);
//销毁互斥
pthread_mutex_destroy(&lock);
return 0;
}
// 发送线程函数
void* send_thread(void* arg)
{
int pipefd = *(int*)arg; // 获取管道文件描述符
char buf[1024]; // 缓冲区
ssize_t ret;
printf("发送线程启动\n"); // 打印发送线程启动信息
while (1)
{
printf("输入要发送的消息:"); // 提示用户输入信息
if (fgets(buf, sizeof(buf), stdin) == NULL)
{ // 读取用户输入
perror("fgets failed"); // 输入失败,打印错误信息
break;
}
buf[strcspn(buf, "\n")] = '\0'; // 去掉换行符
// getchar(); // 读取回车键
pthread_mutex_lock(&lock); // 加锁
ret = write(pipefd, buf, strlen(buf)); // 写入管道
pthread_mutex_unlock(&lock); // 解锁
if (ret < 0)
{
perror("write to pipe failed"); // 写入失败,打印错误信息
break;
}
}
printf("发送线程结束\n"); // 打印发送线程结束信息
return NULL;
}
// 接收线程函数
void* recv_thread(void* arg)
{
int pipefd = *(int*)arg; // 获取管道文件描述符
char buf[1024]; // 缓冲区
ssize_t ret;
printf("接收线程启动\n"); // 打印接收线程启动信息
while (1)
{
pthread_mutex_lock(&lock); // 加锁
ret = read(pipefd, buf, sizeof(buf) - 1); // 从管道读取数据
pthread_mutex_unlock(&lock); // 解锁
if (ret < 0)
{
if (errno == EAGAIN || errno == EWOULDBLOCK)
{ // 如果没有数据可读
usleep(100000); // 等待100毫秒再试
continue;
}
perror("read from pipe failed"); // 读取失败,打印错误信息
break;
}
if (ret == 0)
{
// 管道关闭
break;
}
buf[ret] = '\0'; // 添加字符串结束符
printf("\n");
printf("收到消息: %s\n", buf); // 打印接收到的消息
}
printf("接收线程结束\n"); // 打印接收线程结束信息
return NULL;
}
// 清理函数,关闭文件描述符并删除管道
void cleanup(int sig) {
printf("收到信号CTRL+C,准备清理...\n"); // 打印清理信息
if (tingyu_fd >= 0) close(tingyu_fd); // 关闭管道1的文件描述符
if (youan_fd >= 0) close(youan_fd); // 关闭管道2的文件描述符
unlink(TINGYU_PATH); // 删除管道1
unlink(YOUAN_PATH); // 删除管道2
//销毁互斥
pthread_mutex_destroy(&lock);
printf("清理完成,程序退出...\n"); // 打印退出信息
exit(0); // 退出程序
}
//端二
#include <stdio.h>// 标准输入输出头文件
#include <stdlib.h>// 标准函数库头文件
#include <sys/types.h>// 系统头文件
#include <sys/stat.h>// 系统头文件
#include <fcntl.h>// 文件控制头文件
#include <unistd.h>// unistd.h头文件
#include <string.h>// 字符串头文件
#include <pthread.h>// 线程头文件
#include <errno.h>// 错误号头文件
#include <signal.h>// 信号处理头文件
#define TINGYU_PATH "/home/ubuntu/youan/tingyu_fifo"// 发送端管道文件路径
#define YOUAN_PATH "/home/ubuntu/youan/youan_fifo"// 接收端管道文件路径
void* send_thread(void* arg);// 发送线程
void* recv_thread(void* arg);// 接收线程
void cleanup(int sig);// 信号处理函数
int tingyu_fd, youan_fd; // 线程id
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; // 互斥锁
int main() {
int ret;
pthread_t send_tid, recv_tid;
//捕获Ctrl-C信号
signal(SIGINT, cleanup);
// 创建管道1
ret = mkfifo(TINGYU_PATH, 0777);
if (ret < 0 && errno != EEXIST)// 如果管道已经存在,则不再创建
{
perror("mkfifo TINGYU_PATH failed");
return -1;
}
// 创建管道2
ret = mkfifo(YOUAN_PATH, 0777);
if (ret < 0 && errno != EEXIST) // 如果管道已经存在,则不再创建
{
perror("mkfifo YOUAN_PATH failed");
return -1;
}
// 打开管道1用于写入(向发送端发送数据)
tingyu_fd = open(TINGYU_PATH, O_WRONLY);// 打开管道1用于写入
if (tingyu_fd < 0)
{
perror("open TINGYU_PATH failed");
return -1;
}
// 打开管道2用于读取(从发送端接收数据)
youan_fd = open(YOUAN_PATH, O_RDONLY | O_NONBLOCK);// 打开管道2用于读取,设置非阻塞模式
if (youan_fd < 0)
{
perror("open YOUAN_PATH failed");
close(tingyu_fd);
return -1;
}
// 创建发送线程
ret = pthread_create(&send_tid, NULL, send_thread, (void*)&tingyu_fd);
if (ret) {
perror("pthread_create send_tid failed");
close(tingyu_fd);
close(youan_fd);
return -1;
}
// 创建接收线程
ret = pthread_create(&recv_tid, NULL, recv_thread, (void*)&youan_fd);
if (ret) {
perror("pthread_create recv_tid failed");
close(tingyu_fd);
close(youan_fd);
return -1;
}
// 等待线程结束
pthread_join(send_tid, NULL);
pthread_join(recv_tid, NULL);
// 关闭文件描述符
close(tingyu_fd);
close(youan_fd);
// 删除管道文件
unlink(TINGYU_PATH);
unlink(YOUAN_PATH);
// 销毁互斥
pthread_mutex_destroy(&mutex);
return 0;
}
void* send_thread(void* arg)
{
int pipefd = *(int*)arg;
char buf[1024];
ssize_t ret;
printf("发送端线程开始\n");// 打印提示信息
while (1)
{
printf("输入要发送的消息:");
if (fgets(buf, sizeof(buf), stdin) == NULL)
{
perror("fgets failed");
break;
}
buf[strcspn(buf, "\n")] = '\0'; // 去掉换行符
// getchar(); // 读取回车键
pthread_mutex_lock(&mutex); // 加互斥锁
ret = write(pipefd, buf, strlen(buf));
pthread_mutex_unlock(&mutex); // 解互斥锁
if (ret < 0)
{
perror("write to pipe failed");
break;
}
}
printf("发送端线程结束\n");
return NULL;
}
void* recv_thread(void* arg) {
int pipefd = *(int*)arg;
char buf[1024];
ssize_t ret;
printf("接收端线程开始\n");
while (1)
{
pthread_mutex_lock(&mutex); // 加互斥锁
ret = read(pipefd, buf, sizeof(buf) - 1);
pthread_mutex_unlock(&mutex); // 解互斥锁
if (ret < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) //如果管道为空,则等待100ms后再次尝试读取
{
usleep(100000); // 100ms
continue;
}
perror("read from pipe failed");
break;
}
if (ret == 0)
{
// Pipe closed
break;
}
buf[ret] = '\0';
printf("\n");
printf("接收端收到消息: %s\n", buf);
}
printf("接收端线程结束\n");
return NULL;
}
void cleanup(int sig) {
printf("收到信号Ctrl-C,准备清理...\n");
if (tingyu_fd >= 0) close(tingyu_fd);
if (youan_fd >= 0) close(youan_fd);
unlink(TINGYU_PATH);
unlink(YOUAN_PATH);
//销毁互斥
pthread_mutex_destroy(&mutex);
printf("清理完成,程序退出...\n");
exit(0);
}
标签:线程,管道,ret,端线,20240815,fd,pthread,PATH,buf
From: https://blog.csdn.net/qq_65171301/article/details/141230548