首页 > 其他分享 >Rust简明教程第九章-多线程和并发

Rust简明教程第九章-多线程和并发

时间:2024-07-05 19:55:47浏览次数:28  
标签:教程 thread unwrap Arc 线程 new let 多线程 Rust

并发

并发指在同一时刻只能有一条指令执行,但多个进程指令被快速的轮换执行,使得在宏观上具有多个进程同时执行的效果

  • 进程是一个程序的实例
  • 线程是一个进程中执行的一个单一线性执行流程,一个进程包含多个线程,线程可以并发执行
  • main是主线程,系统的入口

区别:

  • 并发指一个系统能够同时处理多个任务,在单处理器上多个任务交替执行,看起来像同时执行
  • 并行指一个系统能够同时处理多个任务,每个任务在不同核心执行,真正同时执行任务

交替执行,主线程结束新线程也会结束,本例新线程只执行5次

use std::thread;
use std::time::Duration;
fn main() {
    thread::spawn(|| {
        for i in 1..=10 {
            println!("新线程{}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..=5 {
        println!("主线程{}", i);
        thread::sleep(Duration::from_millis(1));
    }
}

使用join handle等待所有线程结束后再退出程序

use std::thread;
use std::time::Duration;
fn main() {
    let handle = thread::spawn(|| {
        for i in 1..=10 {
            println!("新线程{}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..=5 {
        println!("主线程{}", i);
        thread::sleep(Duration::from_millis(1));
    }
    handle.join().unwrap(); //阻塞主线程,直到新线程结束
}

使用其他线程的数据

use std::thread;
fn main() {
    let v = vec![1, 2, 3];
    let handle = thread::spawn(move || {
        //整个程序是主线程,将v的所有权move到线程,使用其他线程的数据
        println!("动态数组:{:?}", v); //动态数组:[1, 2, 3]
    });
    handle.join().unwrap();
}

Channel 管道

  • 使用消息传递跨线程传递数据

  • Channel:用通信共享内存,包含发送端、接收端

  • 如果发送端、接收端任意一端被丢弃,那么Channel就关闭了

recv:阻塞当前线程,有消息返回消息,发送端关闭返回RecvErr

use std::sync::mpsc;
use std::thread;
fn main() {
    let (tx, rx) = mpsc::channel();
    thread::spawn(move || {
        let val = String::from("hello");
        tx.send(val).unwrap(); //发送消息
    });
    //recv阻止当前线程执行
    let received = rx.recv().unwrap(); //接收消息,有消息返回消息,发送端关闭返回RecvErr
    println!("接收:{}", received); //接收:hello
}

try_recv:不阻塞当前线程,无消息返回Empty错误,Channel关闭返回Disconnected

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    // 创建一个通道,用于在不同线程之间传递消息
    let (sender, receiver) = mpsc::channel();
    // 启动一个新线程,发送消息到通道
    thread::spawn(move || {
        let messages = vec!["hello", "world", "from", "another", "thread"];
        for msg in messages {
            sender.send(msg.to_string()).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    // 在主线程中接收通道中的消息
    loop {
        match receiver.try_recv() {
            Ok(msg) => println!("收到消息: {}", msg),
            Err(mpsc::TryRecvError::Empty) => {
                println!("没有接收到消息, 等待...");
                thread::sleep(Duration::from_secs(2));
            }
            Err(mpsc::TryRecvError::Disconnected) => {
                println!("出错!, 关闭...");
                break;
            }
        }
    }
}

Rc<T>

引用计数类型,其数据可以有多个所有者

  • 通过不可变引用,使程序不同部分共享只读数据
  • 实现了clone trait,通过Rc::clone(&rc)增加引用计数
  • 共享不可变数据的所有权,避免大型数据结构的开销
  • 单线程场景使用,性能开销低
#[derive(Debug)]
struct SharedData {
    message: String,
}
fn main() {
    use std::rc::Rc;
    // 创建一个包含共享数据的 Rc 智能指针
    let shared_data = Rc::new(SharedData {
        message: String::from("Hello, Rc!"),
    });
    // 克隆 Rc 智能指针,增加引用计数
    let shared_data1 = Rc::clone(&shared_data);
    let shared_data2 = Rc::clone(&shared_data);

    // 输出引用计数
    println!("引用计数:{}", Rc::strong_count(&shared_data)); //引用计数:3
    // 访问共享数据
    println!("共享数据1:{:?}", shared_data1); //共享数据1:SharedData { message: "Hello, Rc!" }
    println!("共享数据2:{:?}", shared_data2); //共享数据2:SharedData { message: "Hello, Rc!" }
}

RefCell<T>

  • 运行时检查内部可变性
  • 允许在不可变引用(&T)存在的情况下获取可变引用(&mut T),也就是运行时动态修改数据
  • 单线程场景使用,性能开销低
use std::cell::RefCell;
fn main() {
    // 创建一个包含可变数据的 RefCell
    let data = RefCell::new(vec![1, 2, 3]);

    // 在不可变引用存在的情况下,获取可变引用并修改数据
    {
        let mut borrow = data.borrow_mut();
        borrow.push(4);
    }
    // 获取不可变引用并访问数据
    let borrow = data.borrow();
    println!("Data: {:?}", *borrow); //Data: [1, 2, 3, 4]
}

Mutex<T>

共享状态并发

mutal exclusion互斥锁,允许多线程访问数据,但同一时刻只允许一个线程来访问某些数据

  • 访问数据前需要获得lock互斥锁
  • 使用完数据必须解锁,例如在块内使用自动调用drop
  • 多线程场景使用,性能开销高
use std::sync::Mutex;
fn main() {
    let m = Mutex::new(5);
    {
        let mut n = m.lock().unwrap();
        *n = 6;// move所有权并修改值
    } //drop解锁
    println!("m:{:?}", m); //m:Mutex { data: 6, poisoned: false, .. }
}

Arc<T>:Atomic Reference Counted

  • Arc允许多个线程之间共享数据不可变所有权,并且在引用计数管理上是线程安全的
  • Arc 本身不提供内部可变性,因此不能直接用于多个线程修改同一数据,通常与 MutexRwLock 结合使用
  • 多线程场景使用,性能开销高
use std::sync::{Arc, Mutex};
use std::thread;
fn main() {
    // 创建一个共享的可变数据结构
    let counter = Arc::new(Mutex::new(0));
    // 创建多个线程来增加计数器的值
    let mut handles = vec![];
    for _ in 0..10 {
        let counter = Arc::clone(&counter); // 每个线程都需要共享 Arc 的引用
        let handle = thread::spawn(move || {
            // 获取锁,并在作用域结束时自动释放锁
            let mut num = counter.lock().unwrap();
            *num += 1;
        });
        handles.push(handle);
    }
    // 等待所有线程完成
    for handle in handles {
        handle.join().unwrap();
    }
    // 输出最终计数器的值
    println!("计数最终值: {:?}", counter.lock().unwrap()); //计数最终值: 10
}
  • 用户线程:默认情况下创建的线程或线程池都是用户线程
  • 守护线程:后台线程或服务线程
use std::thread;
use std::time::Duration;
fn main() {
    // 创建一个用户线程
    let user_thread = thread::spawn(|| {
        user_task();
    });
    // 创建一个守护线程
    let _daemon_thread = thread::spawn(|| {
        daemon_task();
    });
    // 主线程休眠3秒
    thread::sleep(Duration::from_secs(3));
    println!("主线程退出");
    // 等待用户线程结束
    user_thread.join().unwrap();
}
// 用户线程任务
fn user_task() {
    println!("用户线程开始执行");
    thread::sleep(Duration::from_secs(2));
    println!("用户线程执行完毕");
}
// 守护线程任务
fn daemon_task() {
    println!("守护线程开始执行");
    loop {
        thread::sleep(Duration::from_millis(500));
        println!("守护线程执行中");
    }
}

线程同步

取款案例,不加锁会造成数据竞争

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct BankAccount {
    balance: f64,
}
impl BankAccount {
    fn new(initial_balance: f64) -> Self {
        BankAccount {
            balance: initial_balance,
        }
    }
    // 使用 Mutex 确保线程安全
    fn deposit(&mut self, amount: f64) {
        if amount > 0.0 {
            self.balance += amount;
            println!("{} 存款 {},当前余额: {}", thread::current().name().unwrap(), amount, self.get_balance());
        }
    }
    fn get_balance(&self) -> f64 {
        self.balance
    }
}
fn main() {
        //Arc原子引用计数类型,允许多个线程安全地共享所有权
    let account = Arc::new(Mutex::new(BankAccount::new(1000.0)));
    let account1 = Arc::clone(&account);
    let account2 = Arc::clone(&account);
    // 创建两个线程同时进行存款操作
    let t1 = thread::Builder::new()
        .name("线程1".to_string())
        .spawn(move || {
            for _ in 0..10 {
                let mut account = account1.lock().unwrap();
                //存款100
                account.deposit(100.0);
                thread::sleep(Duration::from_millis(100));
            }
        })
        .unwrap();

    let t2 = thread::Builder::new()
        .name("线程2".to_string())
        .spawn(move || {
            for _ in 0..10 {
                let mut account = account2.lock().unwrap();
                //存款200
                account.deposit(200.0);
                thread::sleep(Duration::from_millis(100));
            }
        })
        .unwrap();

    t1.join().unwrap();
    t2.join().unwrap();
}

死锁

多个进程在运行过程中因争夺资源而造成的一种僵局

产生死锁的条件:

  • 互斥条件:一个资源每次只能被一个进程使用
  • 请求与保持条件:一个进程因请求资源而阻塞时,不能强行剥夺
  • 不剥夺条件:进程已获得的资源在未使用完之前不能强行剥夺
  • 循环等待条件:若干进程之间形成一种头尾相接的循环等待资源关系

Task1 获取到 lock1 并等待 lock2,而 Task2 获取到 lock2 并等待 lock1 时,两个线程互相等待对方释放锁,导致死锁

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    // 定义两个对象锁
    let lock1 = Arc::new(Mutex::new(()));
    let lock2 = Arc::new(Mutex::new(()));

    let lock1_task1 = Arc::clone(&lock1);
    let lock2_task1 = Arc::clone(&lock2);

    // 创建线程1,执行 Task1
    let thread1 = thread::spawn(move || {
        let _lock1 = lock1_task1.lock().unwrap();
        println!("任务1获得锁1");
        thread::sleep(Duration::from_millis(1000));
        let _lock2 = lock2_task1.lock().unwrap();
        println!("任务1获得锁2");
    });

    let lock1_task2 = Arc::clone(&lock1);
    let lock2_task2 = Arc::clone(&lock2);

    // 创建线程2,执行 Task2
    let thread2 = thread::spawn(move || {
        let _lock2 = lock2_task2.lock().unwrap();
        println!("任务2获得锁2");
        thread::sleep(Duration::from_millis(1000));
        let _lock1 = lock1_task2.lock().unwrap();
        println!("任务2获得锁1");
    });

    // 等待两个线程完成
    thread1.join().unwrap();
    thread2.join().unwrap();
}

通过使用获得锁的相同顺序避免死锁

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    // 定义两个对象锁
    let lock1 = Arc::new(Mutex::new(()));
    let lock2 = Arc::new(Mutex::new(()));

    // 创建线程1,执行 Task1
    let lock1_task1 = Arc::clone(&lock1);
    let lock2_task1 = Arc::clone(&lock2);
    let thread1 = thread::spawn(move || {
        let _lock1 = lock1_task1.lock().unwrap();
        println!("任务1获得锁1");
        thread::sleep(Duration::from_millis(1000));
        let _lock2 = lock2_task1.lock().unwrap();
        println!("任务1获得锁2");
    });

    // 创建线程2,执行 Task2
    let lock1_task2 = Arc::clone(&lock1);
    let lock2_task2 = Arc::clone(&lock2);
    let thread2 = thread::spawn(move || {
        let _lock1 = lock1_task2.lock().unwrap();
        println!("任务2获得锁1");
        thread::sleep(Duration::from_millis(1000));
        let _lock2 = lock2_task2.lock().unwrap();
        println!("任务2获得锁2");
    });

    // 等待两个线程完成
    thread1.join().unwrap();
    thread2.join().unwrap();
}

线程通信:生产者和消费者

用通信共享内存

管程法:使用缓冲区

use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
const MAX_SIZE: usize = 5;
struct Monitor {
    buffer: Mutex<VecDeque<i32>>,
    condvar: Condvar,
}
impl Monitor {
    fn new() -> Self {
        Monitor {
            //mutex互斥
            buffer: Mutex::new(VecDeque::new()),
            //条件变量
            condvar: Condvar::new(),
        }
    }
    fn produce(&self, value: i32) {
        let mut buffer = self.buffer.lock().unwrap();
        //比5小就生产
        while buffer.len() >= MAX_SIZE {
            buffer = self.condvar.wait(buffer).unwrap();
        }
        buffer.push_back(value);
        println!("生产者: {}", value);
        //唤醒所有等待队列中阻塞的线程
        self.condvar.notify_all();
    }
    fn consume(&self) -> i32 {
        let mut buffer = self.buffer.lock().unwrap();
        while buffer.is_empty() {
            //未空则等待
            buffer = self.condvar.wait(buffer).unwrap();
        }
        let value = buffer.pop_front().unwrap();
        println!("消费者: {}", value);
         //唤醒所有等待队列中阻塞的线程
        self.condvar.notify_all();
        value
    }
}
fn main() {
    let monitor = Arc::new(Monitor::new());
    let monitor_producer = Arc::clone(&monitor);
    let monitor_consumer = Arc::clone(&monitor);

    let producer_thread = thread::spawn(move || {
        let mut value = 0;
        loop {
            monitor_producer.produce(value);
            value += 1;
            //模拟生产者
            thread::sleep(Duration::from_secs(1));
        }
    });
    let consumer_thread = thread::spawn(move || loop {
        monitor_consumer.consume();
        //模拟消费者
        thread::sleep(Duration::from_secs(2));
    });
    // 等待线程结束
    producer_thread.join().unwrap();
    consumer_thread.join().unwrap();
}

信号灯法:标志位

信号量

  • emptyfull 信号量用于跟踪缓冲区中的可用空间和已填充空间。
  • 生产者在生产一个项目之前,会尝试获取 empty 信号量许可。如果缓冲区已满,则生产者会等待,直到有空间可用
  • 消费者在消费一个项目之前,会尝试获取 full 信号量许可。如果缓冲区为空,则消费者会等待,直到有项目可用

互斥锁

  • mutex 信号量确保生产者和消费者不会同时访问缓冲区。它充当一个二进制信号量(互斥锁),以确保对共享资源的互斥访问
  • 生产者或消费者在访问缓冲区时,会获取 mutex 信号量许可,并在完成访问后,释放许可(通过增加许可数)

Cargo.toml里添加,cargo run一下

[dependencies]
tokio = { version = "1", features = ["full"] }
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::Semaphore;

const MAX_SIZE: usize = 5;

struct SemaphoreBuffer {
    buffer: Mutex<VecDeque<i32>>,
    mutex: Arc<Semaphore>,
    empty: Arc<Semaphore>,
    full: Arc<Semaphore>,
}
impl SemaphoreBuffer {
    fn new() -> Self {
        SemaphoreBuffer {
            //缓冲区
            buffer: Mutex::new(VecDeque::new()),
            //三个信号
            mutex: Arc::new(Semaphore::new(1)),//互斥
            empty: Arc::new(Semaphore::new(MAX_SIZE)),//空信号
            full: Arc::new(Semaphore::new(0)),//满信号
        }
    }

    async fn produce(&self, value: i32) {
        self.empty.acquire().await.unwrap().forget();
        self.mutex.acquire().await.unwrap().forget();
        let mut buffer = self.buffer.lock().unwrap();
        //生产
        buffer.push_back(value);
        println!("生产者: {}", value);
        drop(buffer);
        //增加信号许可证
        self.mutex.add_permits(1);
        self.full.add_permits(1);
    }

    async fn consume(&self) -> i32 {
        //获得full和mutex许可证
        self.full.acquire().await.unwrap().forget();
        self.mutex.acquire().await.unwrap().forget();
        let mut buffer = self.buffer.lock().unwrap();
        //消费
        let value = buffer.pop_front().unwrap();
        println!("消费者: {}", value);
        drop(buffer);
        //增加信号许可证
        self.mutex.add_permits(1);
        self.empty.add_permits(1);
        value
    }
}

#[tokio::main]
async fn main() {
    let semaphore_buffer = Arc::new(SemaphoreBuffer::new());
    let producer_buffer = Arc::clone(&semaphore_buffer);
    let consumer_buffer = Arc::clone(&semaphore_buffer);
    let producer_thread = tokio::spawn(async move {
        let mut value = 0;
        loop {
            producer_buffer.produce(value).await;
            value += 1;
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
    });
    let consumer_thread = tokio::spawn(async move {
        loop {
            consumer_buffer.consume().await;
            tokio::time::sleep(Duration::from_secs(2)).await;
        }
    });
    //等待异步任务完成再结束
    let _ = tokio::try_join!(producer_thread, consumer_thread);
}

线程池

经常创建和销毁线程对性能影响大,提前创建好线程放入线程池,使用时直接获取,使用结束放回线程池,避免频繁创建销毁线程

use tokio::time::{sleep, Duration};
use tokio::task::JoinHandle;
struct WorkerTask {
    task_name: String,
}
impl WorkerTask {
    fn new(task_name: String) -> Self {
        WorkerTask { task_name }
    }
    async fn execute(&self) {
        println!("执行任务: {}", self.task_name);
        // 模拟任务执行
        sleep(Duration::from_secs(1)).await;
        println!("任务完成啦: {}", self.task_name);
    }
}
#[tokio::main]
async fn main() {
    let mut handles: Vec<JoinHandle<()>> = Vec::new();
    // 创建并行任务数量为5的线程池
    let max_threads = 5;
    let mut current_threads = 0;

    // 模拟20个任务
    for i in 0..20 {
        let task_name = format!("任务 {}", i + 1);
        let worker_task = WorkerTask::new(task_name);
        if current_threads >= max_threads {
            // 如果当前并行任务数达到最大值,等待第一个任务完成
            let handle = handles.remove(0);
            handle.await.unwrap();
            current_threads -= 1;
        }
        // 提交任务给线程池执行
        let handle = tokio::spawn(async move {
            worker_task.execute().await;
        });
        handles.push(handle);
        current_threads += 1;
    }
    // 等待所有任务完成
    for handle in handles {
        handle.await.unwrap();
    }
}

标签:教程,thread,unwrap,Arc,线程,new,let,多线程,Rust
From: https://blog.csdn.net/weixin_62799021/article/details/140217084

相关文章

  • Andriod SDK安装教程
    前言最简单的方式我们使用ANDROIDSTUDIO这款开发工具下载对应的AndriodSDK。可是我们如果不开发安卓,只是用它的一些SDK包的话而安装整个开发工具,就没必要了。这里讲的是用独立的命令行工具来操作。下载命令行工具点击此处进入下载页面,滑动到最下边,选择合适的系统下载,我......
  • 【教程】一步一步构建一个RBF神经网络-详细解说
    本文来自《老饼讲解-BP神经网络》https://www.bbbdata.com/目录一、什么是RBF神经网络1.1.RBF神经网络介绍二、matlab实现RBF神经网络2.1.matlab实现RBF代码示例2.2.代码解说一、什么是RBF神经网络1.1.RBF神经网络介绍RBF神经网络是指使用RBF作为激活函数......
  • python多线程与多进程开发实践及填坑记(1)
    1.需求分析1.1.概述基于Flask、Pika、Multiprocessing、Thread搭建一个架构,完成多线程、多进程工作。具体需求如下:并行计算任务:使用multiprocessing模块实现并行计算任务,提高计算效率、计算能力。消息侦听任务:使用threading模块完成RabbitMQ消息队列的侦听任务,将接收到......
  • 什么是知识付费系统?知识付费系统的功能有哪些?知识付费系统平台搭建教程
    技术栈:前端:uniappvue3后端:php thinkphp8数据库:mysql5.7技术搭建咨询:ywxs5787  备注来意知识付费系统一、知识付费系统界面演示:二、知识付费平台如何搭建知识付费系统拥有了源码,使用宝塔搭建的知识付费系统,步骤操作如下:1、准备阶段1.下载并安......
  • 制作CentOS7.9的U盘系统启动盘超详细操作教程
    1、下载centos7.9操作系统阿里云镜像源https://mirrors.aliyun.com/centos/7.9.2009/isos/x86_64/?spm=a2c6h.25603864.0.0.7ed1f5ad877Uus2、制作u盘注意事项:制作u盘会让直接格式化u盘里面的所有数据,已有数据的u盘需要做好备份,建议购买新的u盘格式化操作。2.1格式化u盘,选......
  • Docker安装Niginx(详细教程)
    1.在Docker官方寻找Nginx镜像    Docker官方镜像2.下载Nginx镜像dockerpullnginx3.创建Nginx配置文件创建挂载目录mkdir/home/nginx/confmkdir/home/nginx/logmkdir/home/nginx/html创建Docker容器,并将配置文件拷贝到宿主机上#创建并运行Docker容器......
  • ElasticSearch入门教程(保姆级)
    目录一、引言1.1海量数据1.2全文检索1.3高亮显示二、ES概述2.1ES的介绍2.2ES的由来2.3ES和Solr2.4倒排索引三、ElasticSearch安装3.1安装ES&Kibana3.2安装IK分词器四、ElasticSearch基本操作4.1ES的结构4.1.1索引Index,分片和备份4.1.2类型Type......
  • 超详细Python教程——函数和模块的使用
    函数和模块的使用在讲解本章节的内容之前,我们先来研究一道数学题,请说出下面的方程有多少组正整数解。事实上,上面的问题等同于将8个苹果分成四组每组至少一个苹果有多少种方案。想到这一点问题的答案就呼之欲出了。可以用Python的程序来计算出这个值,代码如下所示。"""......
  • AI大模型从零到专家:全面教程,一课掌握!
    在学习大模型之前,你不必担心自己缺乏相关知识或认为这太难。我坚信,只要你有学习的意愿并付出努力,你就能够掌握大模型,并能够用它们完成许多有意义的事情。在这个快速变化的时代,虽然新技术和概念不断涌现,但希望你能静下心来,踏实地学习。一旦你精通了某项技术,你就能够用它来实......
  • 鸿蒙OpenHarmony南向/北向快速开发教程-迅为RK3568开发板
    鸿蒙OpenHarmony南向/北向快速开发教程-迅为RK3568开发板 大家期待已久的迅为RK3568开发板终于迎来了鸿蒙4.1系统的强势支持!想知道如何实现快速开发学习吗?跟着我们一起来探索吧!    迅为RK3568开发板:     想象一下,你手中的RK3568开发板能够轻松运行鸿蒙4.1......