首页 > 其他分享 >Rust 无畏并发

Rust 无畏并发

时间:2024-02-27 15:23:02浏览次数:26  
标签:std use thread unwrap 无畏 并发 let 线程 Rust

本文在原文基础上有删减,原文链接 无畏并发

目录

并发编程(Concurrent programming),代表程序的不同部分相互独立地执行,而 并行编程(parallel programming)代表程序不同部分同时执行。

注意:出于简洁的考虑将很多问题归类为 并发,而不是更准确的区分 并发和(或)并行,谈到 并发 时请自行脑内替换为 并发和(或)并行

使用线程同时运行代码

无法预先保证不同线程中的代码的执行顺序会导致诸如此类的问题:

  • 竞态条件(Race conditions),多个线程以不一致的顺序访问数据或资源
  • 死锁(Deadlocks),两个线程相互等待对方,这会阻止两者继续运行
  • 只会发生在特定情况且难以稳定重现和修复的 bug

编程语言有一些不同的方法来实现线程,Rust 标准库使用 1:1 线程实现,这代表程序的每一个语言级线程使用一个系统线程。

使用 spawn 创建新线程

为了创建一个新线程,需要调用 thread::spawn 函数并传递一个闭包,并在其中包含希望在新线程运行的代码。

创建一个打印某些内容的新线程,但是主线程打印其它内容:

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        //强制线程停止执行一小段时间,这会允许其他不同的线程运行
        thread::sleep(Duration::from_millis(1));
    }
}

注意:当 Rust 程序的主线程结束时,新线程也会结束,而不管其是否执行完毕。

使用 join 等待所有线程结束

可以通过将 thread::spawn 的返回值储存在变量中来修复新建线程部分没有执行或者完全没有执行的问题,返回值类型是 JoinHandle。JoinHandle 是一个拥有所有权的值,当对其调用 join 方法时,它会等待其线程结束。

使用创建的线程的 JoinHandle 并调用 join 来确保新建线程在 main 退出前结束运行:

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 1..10 {
            println!("hi number {} from the spawned thread!", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 1..5 {
        println!("hi number {} from the main thread!", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

这两个线程会交替执行,主线程会由于 handle.join() 调用会等待直到新建线程执行完毕。

将 move 闭包与线程一同使用

move 关键字经常用于传递给 thread::spawn 的闭包,因为闭包会获取从环境中取得的值的所有权,因此会将这些值的所有权从一个线程传送到另一个线程。

使用 move 关键字强制获取它使用的值的所有权:

use std::thread;

fn main() {
    let v = vec![1, 2, 3];
    
    //Rust 不知道这个新建线程会执行多久,无法知晓对 v 的引用是否一直有效
    //let handle = thread::spawn(|| {

    //使用 move 关键字强制获取它使用的值的所有权    
    let handle = thread::spawn(move || {    
        
        println!("Here's a vector: {:?}", v);
    });

    // 增加 move 将不能在主线程中对其调用 drop 
    //drop(v); 

    handle.join().unwrap();
}

使用消息传递在线程间传送数据

一个日益流行的确保安全并发的方式是 消息传递(message passing),这里线程或 actor 通过发送包含数据的消息来相互沟通。

这个思想来源于 Go 编程语言文档中 的口号:“不要通过共享内存来通讯;而是通过通讯来共享内存。”(“Do not communicate by sharing memory; instead, share memory by communicating.”)

为了实现消息传递并发,Rust 标准库提供了一个 信道(channel)实现。信道是一个通用编程概念,表示数据从一个线程发送到另一个线程

编程中的信息渠道(信道)有两部分组成,一个发送者(transmitter)和一个接收者(receiver)。发送者位于上游位置发送信息,收者则位于下游接受信息。代码中的一部分调用发送者的方法以及希望发送的数据,另一部分则检查接收端收到的消息。当发送者或接收者任一被丢弃时可以认为信道被 关闭(closed)了。

创建一个信道并将其两端赋值给 tx 和 rx,现在还不能编译:

use std::sync::mpsc;

fn main() {.
    //使用 mpsc::channel 函数创建一个新的信道
    let (tx, rx) = mpsc::channel();
}

mpsc多个生产者单个消费者(multiple producer, single consumer)的缩写。简而言之,Rust 标准库实现信道的方式意味着一个信道可以有多个产生值的 发送(sending)端,但只能有一个消费这些值的 接收(receiving)端

mpsc::channel 函数返回一个元组,使用一个 let 语句和模式来解构此元组:

  • 第一个元素是发送端 -- tx 发送者(transmitter)
  • 第二个元素是接收端 -- rx 接收者(receiver)

将 tx 移动到一个新建的线程中并发送 “hi” :

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    //创建一个新线程并使用 move 将 tx 移动到闭包中
    thread::spawn(move || {
        let val = String::from("hi");
        //send 方法返回一个 Result<T, E> 类型
        //出错的时候调用 unwrap 产生 panic,真正的程序则需要合理地处理它
        tx.send(val).unwrap();
    });
}

信道的发送端有一个 send 方法用来获取需要放入信道的值,send 方法返回一个 Result<T, E> 类型。

在主线程中接收并打印内容 “hi”:

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
    //打印结果 Got: hi
}

信道的接收者有两个有用的方法(recv 是 receive 的缩写):

  • recv:,方法会阻塞主线程执行直到从信道中接收一个值。有发送值时 recv 会在一个 Result<T, E> 中返回它,信道发送端关闭时 recv 会返回一个错误表明不会再有新的值到来了。
  • try_recv:不会阻塞,相反它立刻返回一个 Result<T, E>:Ok 值包含可用的信息,而 Err 值代表此时没有任何消息。可以编写一个循环来频繁调用 try_recv,在有可用消息时进行处理,其余时候则处理一会其他工作直到再次检查。

信道与所有权转移

尝试在新建线程中的信道中发送完 val 值 之后 再使用它(代码无法通过编译):

use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hi");
        tx.send(val).unwrap();
        println!("val is {}", val);
    });

    let received = rx.recv().unwrap();
    println!("Got: {}", received);
}

send 函数获取其参数的所有权并移动这个值归接收者所有,这可以防止在发送后再次意外地使用这个值。

发送多个值并观察接收者的等待

发送多个消息,并在每次发送后暂停一段时间:

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        //字符串 vector
        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("thread"),
        ];

        //遍历 vector
        for val in vals {
            //单独的发送每一个字符串
            tx.send(val).unwrap();
            //调用 thread::sleep 函数来暂停一秒
            thread::sleep(Duration::from_secs(1));
        }
    });

    //将 rx 当作一个迭代器,打印每一个接收到的值
    //当信道被关闭时迭代器也将结束
    for received in rx {
        println!("Got: {}", received);
    }
}

输出如下:

Got: hi
Got: from
Got: the
Got: thread

通过克隆发送者来创建多个生产者

从多个生产者发送多个消息:

// --snip--
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
    let vals = vec![
        String::from("hi"),
        String::from("from"),
        String::from("the"),
        String::from("thread"),
    ];
    for val in vals {
        tx1.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});
thread::spawn(move || {
    let vals = vec![
        String::from("more"),
        String::from("messages"),
        String::from("for"),
        String::from("you"),
    ];
    for val in vals {
        tx.send(val).unwrap();
        thread::sleep(Duration::from_secs(1));
    }
});
for received in rx {
    println!("Got: {}", received);
}
// --snip--

共享状态并发

消息传递是一个很好的处理并发的方式,但并不是唯一一个。另一种方式是让多个线程拥有相同的共享数据

互斥器一次只允许一个线程访问数据

互斥器(mutex)是 mutual exclusion 的缩写,任意时刻其只允许一个线程访问某些数据。为了访问互斥器中的数据,线程首先需要通过获取互斥器的 锁(lock)来表明其希望访问数据,互斥器通过锁系统 保护(guarding)其数据。

Mutex<T>的 API

出于简单的考虑,在一个单线程上下文中探索 Mutex<T> 的 API:

use std::sync::Mutex;

fn main() {
    //使用关联函数 new 来创建一个 Mutex<T>
    let m = Mutex::new(5);

    {
        //使用 lock 方法获取锁以访问互斥器中的数据
        //调用会阻塞当前线程,直到拥有锁为止
        //如果另一个线程拥有锁并且那个线程 panic 了,则 lock 调用会失败,选择 unwrap 并在遇到这种情况时使线程 panic。
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

lock 调用 返回 一个叫做 MutexGuard 的智能指针,它实现了 Deref 来指向其内部数据,也提供了一个 Drop 实现当 MutexGuard 离开作用域时自动释放锁,锁的释放是自动发生的。

在线程间共享 Mutex<T>

程序启动了 10 个线程,每个线程都通过 Mutex<T> 来增加计数器的值:

//这段代码无法通过编译!
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Mutex::new(0);
    let mut handles = vec![];

    for _ in 0..10 {
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

错误信息: counter 值在上一次循环中被移动了,不能将 counter 锁的所有权移动到多个线程中。

多线程和多所有权

尝试使用 Rc<T> 来允许多个线程拥有 Mutex<T>

//这段代码无法通过编译!
use std::rc::Rc;
use std::sync::Mutex;
use std::thread;

fn main() {
    let counter = Rc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Rc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

Rc<T> 并不能安全的在线程间共享,Rc<T> 并没有使用任何并发原语来确保改变计数的操作不会被其他线程打断,在计数出错时可能会导致诡异的 bug。

原子引用计数 Arc<T>

Arc<T> 是一个类似 Rc<T> 并可以安全的用于并发环境的类型,字母 “a” 代表 原子性(atomic),所以这是一个 原子引用计数(atomically reference counted)类型。

原子性是另一类这里还未涉及到的并发原语:请查看标准库中 std::sync::atomic 的文档来获取更多细节。

使用 Arc<T> 包装一个 Mutex<T> 能够实现在多线程之间共享所有权:

use std::sync::{Arc, Mutex};
use std::thread;

fn main() {
    let counter = Arc::new(Mutex::new(0));
    let mut handles = vec![];

    for _ in 0..10 {
        let counter = Arc::clone(&counter);
        let handle = thread::spawn(move || {
            let mut num = counter.lock().unwrap();

            *num += 1;
        });
        handles.push(handle);
    }

    for handle in handles {
        handle.join().unwrap();
    }

    println!("Result: {}", *counter.lock().unwrap());
}

RefCell<T>/Rc<T> 与 Mutex<T>/Arc<T> 的相似性

  • 使用 RefCell<T> 可以改变 Rc<T> 中的内容那样,可以使用 Mutex<T> 来改变 Arc<T> 中的内容
  • 使用 Rc<T> 有造成引用循环的风险,这时两个 Rc<T> 值相互引用造成内存泄漏,同理 Mutex<T> 也有造成 死锁(deadlock)的风险

使用 Sync 和 Send trait 的可扩展并发

由于不需要语言提供并发相关的基础设施,并发方案不受标准库或语言所限。然而有两个并发概念是内嵌于语言中的:std::marker 中的 Sync 和 Send trait

通过 Send 允许在线程间转移所有权

Send 标记 trait 表明实现了 Send 的类型值的所有权可以在线程间传送,任何完全由 Send 的类型组成的类型也会自动被标记为 Send,几乎所有基本类型都是 Send 的。

Sync 允许多线程访问

Sync 标记 trait 表明一个实现了 Sync 的类型可以安全的在多个线程中拥有其值的引用,基本类型是 Sync 的,完全由 Sync 的类型组成的类型也是 Sync 的。

手动实现 Send 和 Sync 是不安全的

通常并不需要手动实现 Send 和 Sync trait,手动实现这些标记 trait 涉及到编写不安全的 Rust 代码,在创建新的由不是 Send 和 Sync 的部分构成的并发类型时需要多加小心,以确保维持其安全保证。

标签:std,use,thread,unwrap,无畏,并发,let,线程,Rust
From: https://www.cnblogs.com/timefiles/p/18036944

相关文章

  • [Rust] module with public and private methods
    Methods:modsausage_factory{//privatemethodfnget_secret_recipe()->String{String::from("Ginger")}//publicmethodpubfnmake_sausage(){get_secret_recipe();println!("sausage!&qu......
  • Rust开发日记
    Gettingstarted-RustProgrammingLanguage(rust-lang.org)  安装好配置环境变量Path:%CARGO_HOME%和%RUSTUP_HOME% 建立config文件,不要扩展名。[source.crates-io]registry="https://github.com/rust-lang/crates.io-index"#替换成你偏好的镜像源replace-......
  • 《安富莱嵌入式周报》第333期:F35战斗机软件使用编程语言占比,开源10V基准电源,不断电运
    周报汇总地址:http://www.armbbs.cn/forum.php?mod=forumdisplay&fid=12&filter=typeid&typeid=104 视频版:https://www.bilibili.com/video/BV1y1421f7ip目录:1、F35战斗机软件使用编程语言占比2、开源10V基准电源,不断电运行一年,误差小于1uV3、资讯(1)苹果开源配置语言Pkl......
  • 类变量在高并发环境下引发的线程安全问题
    ###背景生产环境中,登录接口出现偶发性的异常,排查发现是获取当前时间的工具类抛出异常,以下为代码片段:``````java/***时间工具类*/publicclassDateUtil{ Loggerlogger=LoggerFactory.getLogger(this.getClass());privatefinalstaticSimpleDateFormatshortSdf=new......
  • 基于Rust的Tile-Based游戏开发杂记(01)导入
    什么是Tile-Based游戏?Tile-based游戏是一种使用tile(译为:瓦片,瓷砖)作为基本构建单位来设计游戏关卡、地图或其他视觉元素的游戏类型。在这样的游戏中,游戏世界的背景、地形、环境等都是由一系列预先定义好的小图片(即tiles)拼接而成的网格状结构。每个tile通常代表一个固定的尺寸区域,......
  • 深入解析Python并发编程的多线程和异步编程
    本文分享自华为云社区《Python并发编程探秘:多线程与异步编程的深入解析》,作者:柠檬味拥抱。在Python编程中,多线程是一种常用的并发编程方式,它可以有效地提高程序的执行效率,特别是在处理I/O密集型任务时。Python提供了threading模块,使得多线程编程变得相对简单。本文将深入探讨thre......
  • 设置PHP最大连接数及php-fpm -static高并发
    设置PHP最大连接数及php-fpm高并发参数调整 服务器中找到php-fpm.conf配置(有的会在引入的www.conf中)1234567891011121314151617181920212223[global]pid=/usr/local/php/var/run/php-fpm.piderror_log=/usr/local/php/var/log/ph......
  • [Rust] impl TryFrom and try_into()
    TheTryFromandtry_into()methodsarepartofthe.standardlibaray'sconversiontraits,designedtohandleconversionsbetweentypesinafalliblemanner-thatis,conversionsthatmightfail.Thisisincontrastto`From`and`into()`methods,wh......
  • 利用IO复用技术Epoll与线程池实现多线程的Reactor高并发模型
    Reactor模型是一种常见的高并发设计模式,特别是在网络编程中。在Reactor模型中,一个或多个输入同时传递给一个或多个服务处理程序。服务处理程序对输入进行处理,然后将结果传递给相应的输出处理程序。使用IO复用技术(如epoll)和线程池,可以实现多线程的Reactor高并发模型。下面是一个简......
  • Go语言精进之路读书笔记第36条——使用atomic包实现伸缩性更好的并发读取
    atomic包提供了两大类原子操作接口:一类是针对整型变量的,包括有符号整型、无符号整型以及对应的指针类型;另一个类是针对自定义类型的。atomic包十分适合一些对性能十分敏感、并发量较大且读多写少的场合。如果要对一个复杂的临界区数据进行同步,那么首选依旧是sync包中的原语。36.......