首页 > 其他分享 >Rust常用并发示例代码

Rust常用并发示例代码

时间:2022-09-04 16:22:42浏览次数:72  
标签:std thread 示例 并发 let ThreadId println 线程 Rust

记录几个常用的并发用法:


1、如何让线程只创建1次

先看一段熟悉的java代码:

void method1() {
    new Thread(() -> {
        while (true) {
            System.out.println(String.format("thread-id:%s,timestamp:%d",
                    Thread.currentThread().getId(), System.currentTimeMillis()));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
        }
    }).start();
}

如果method1()被多次调用,就会创建多个线程,如果希望不管调用多少次,只能有1个线程,在不使用线程池的前提下,有1个简单的办法:

AtomicBoolean flag = new AtomicBoolean(false);

void method1() {
    //AtomicBoolean保证线程安全,getAndSet是1个原子操作,method1只有第1次执行时,才能if判断才能通过
    if (!flag.getAndSet(true)) {
        new Thread(() -> {
            while (true) {
                System.out.println(String.format("thread-id:%s,timestamp:%d",
                        Thread.currentThread().getId(), System.currentTimeMillis()));
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            }
        }).start();
    }
}

在rust中也可以套用这个思路,完整代码如下:


use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

//声明1个全局静态变量(AtomicBool能保证线程安全)
static FLAG: AtomicBool = AtomicBool::new(false);

fn method1() {
    //fetch_update类似java中的AtomicBoolean.getAndSet
    if FLAG
        .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |_| Some(true))
        .unwrap()
    {
        std::thread::spawn(move || loop {
            println!(
                "thread-id:{:?},timestamp:{}",
                thread::current().id(),
                timestamp()
            );
            thread::sleep(Duration::from_millis(1000));
        });
    }
}

//辅助方法,获取系统时间戳(不用太关注这个方法)
fn timestamp() -> i64 {
    let start = SystemTime::now();
    let since_the_epoch = start
        .duration_since(UNIX_EPOCH)
        .expect("Time went backwards");
    let ms = since_the_epoch.as_secs() as i64 * 1000
        + (since_the_epoch.subsec_nanos() as f64 / 1_000_000.0) as i64;
    ms
}
fn main() {
    //调用2次
    method1();
    method1();

    //用1个死循环,防止main线束(仅演示用)
    loop {
        thread::sleep(Duration::from_millis(1000));
    }
}

输出:

thread-id:ThreadId(2),timestamp:1662265684621
thread-id:ThreadId(2),timestamp:1662265685623
thread-id:ThreadId(2),timestamp:1662265686627
thread-id:ThreadId(2),timestamp:1662265687628
thread-id:ThreadId(2),timestamp:1662265688630
...

从输出的线程id上看,2次method1()只创建了1个线程


2、如何让线程执行完再继续

fn main() {
    let mut thread_list = Vec::<thread::JoinHandle<()>>::new();
    for _i in 0..5 {
        let t = thread::spawn(|| {
            for n in 1..3 {
                println!("{:?}, n:{}", thread::current().id(), n);
                thread::sleep_ms(5);
            }
        });
        thread_list.push(t);
    }
    //运行后会发现,大概率只有下面这行会输出,因为main已经提前线束了,上面的线程没机会执行,就被顺带着被干掉了
    println!("main thread");
}

上面这段代码,如果希望在main主线程结束前,让所有创建出来的子线程执行完,可以使用join方法

fn main() {
    let mut thread_list = Vec::<thread::JoinHandle<()>>::new();
    for _i in 0..5 {
        let t = thread::spawn(|| {
            for n in 1..3 {
                println!("{:?}, n:{}", thread::current().id(), n);
                thread::sleep_ms(5);
            }
        });
        thread_list.push(t);
    }
 
    //将所有线程join,强制执行完后,才继续
    for t in thread_list {
        t.join().unwrap();
    }
 
    println!("main thread");
}

3、线程互斥锁

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn main() {
    //声明1个互斥锁Mutex,注意在多线程中使用时,必须套一层Arc
    let flag = Arc::new(Mutex::new(false));
    let mut handlers = vec![];
    for _ in 0..10 {
        let flag = Arc::clone(&flag);
        let handle = thread::spawn(move || {
            thread::sleep(Duration::from_millis(10));
            //只有1个线程会lock成功
            let mut b = flag.lock().unwrap();
            if !*b {
                //抢到锁的,把标志位改成true,其它线程就没机会执行println
                *b = true;
                println!("sub\t=>\t{:?}", thread::current().id());
            }
        });
        handlers.push(handle);
    }
    for h in handlers {
        h.join().unwrap();
    }
    println!("main\t=>\t{:?}", thread::current().id());
}

上面的效果,9个子线程中,只会有1个抢到锁,并输出println,输出类似下面这样:

sub     =>      ThreadId(2)
main    =>      ThreadId(1)

4、线程之间发送数据

use std::sync::mpsc;
use std::sync::mpsc::{Receiver, Sender};
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
 
fn main() {
    let (sender, receiver) = mpsc::channel();
    let t1 = send_something(sender);
    let t2 = receive_something(receiver);
 
    t1.join().unwrap();
    t2.join().unwrap();
}
 
/**
 * 线程发送消息测试
 */
fn send_something(tx: Sender<String>) -> JoinHandle<()> {
    thread::spawn(move || {
        //模拟先做其它业务处理
        thread::sleep(Duration::from_millis(100));
 
        let msg_list = vec![
            String::from("a"),
            String::from("b"),
            String::from("c"),
            //约定:\n是数据的结束符
            String::from("\n"),
        ];
 
        //发送一堆消息
        for msg in msg_list {
            tx.send(msg).unwrap();
        }
    })
}
 
/**
 * 线程收消息
 */
fn receive_something(rx: Receiver<String>) -> JoinHandle<()> {
    thread::spawn(move || loop {
        //try_recv 不会阻塞
        let s = rx.try_recv();
        if s.is_ok() {
            let msg = s.unwrap();
            if msg == "\n" {
                //约定:收到\n表示后面没数据了,可以退出
                println!("end!");
                break;
            } else {
                println!("got msg:{}", msg);
            }
        }
        //模拟没数据时干点其它事情
        println!("do another thing!");
        thread::sleep(Duration::from_millis(100));
    })
}

输出:

do another thing!
do another thing!
got msg:a
do another thing!
got msg:b
do another thing!
got msg:c
do another thing!
end!

5、线程池示例

先要引用threadpool的依赖

[dependencies]
threadpool="1.8.1"

然后就可以使用了:

use std::thread;
use std::time::Duration;
use threadpool::ThreadPool;

fn main() {
    let n_workers = 3;
    //创建1个名为test-pool的线程池
    let pool = ThreadPool::with_name(String::from("test-pool"), n_workers);

    for _ in 0..10 {
        pool.execute(|| {
            println!(
                "{:?},{:?}",
                thread::current().id(),
                thread::current().name()
            );
            thread::sleep(Duration::from_millis(100));
        });
    }

    //待线程池中的所有任务都执行完
    pool.join();
}

输出:

ThreadId(2),Some("test-pool")
ThreadId(3),Some("test-pool")
ThreadId(4),Some("test-pool")
ThreadId(2),Some("test-pool")
ThreadId(3),Some("test-pool")
ThreadId(4),Some("test-pool")
ThreadId(2),Some("test-pool")
ThreadId(3),Some("test-pool")
ThreadId(4),Some("test-pool")
ThreadId(2),Some("test-pool")

6、指定线程名称

use std::thread;

fn main() {
    let t1 = thread::Builder::new()
        //子线程命名
        .name(format!("my-thread"))
        .spawn(|| {
            //打印子线程的id和name
            println!(
                "{:?},{:?}",
                thread::current().id(),
                thread::current().name()
            );
        })
        .unwrap();
    t1.join().unwrap();

    //打印主线程的id和name
    println!(
        "{:?},{:?}",
        thread::current().id(),
        thread::current().name()
    );
}

输出:

ThreadId(2),Some("my-thread")
ThreadId(1),Some("main")

7、如何暂停/恢复线程运行

use std::time::Duration;

use std::thread;

fn main() {
    let (tx, rx) = std::sync::mpsc::channel();
    let t = thread::spawn(move || loop {
        //获取当前时间的秒数
        let seconds = std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_secs();
        println!("{}", seconds);
        //每当秒数为5的倍数,就把自己暂停,同时对外发消息pause
        if seconds % 5 == 0 {
            tx.send("pause").unwrap();
            println!("\nwill be parked !!!");
            //将自己暂停
            thread::park();
        }
        thread::sleep(Duration::from_millis(1000));
    });

    //不断收消息,发现是pause后,过3秒将线程t解封
    loop {
        let flag = rx.recv();
        if flag.is_ok() && flag.unwrap() == "pause" {
            thread::sleep(Duration::from_millis(3000));
            //解封t
            t.thread().unpark();
            println!("unparked !!!\n");
        }
    }
}

这样就实现了一个简易版的ScheudleThread,可以周期性的运行,运行效果:

1662278909
1662278910

will be parked !!!
unparked !!!

1662278914
1662278915

will be parked !!!
unparked !!!

1662278919
1662278920
...

标签:std,thread,示例,并发,let,ThreadId,println,线程,Rust
From: https://www.cnblogs.com/yjmyzz/p/16654831.html

相关文章

  • LIME模型---"Why Should I Trust You?": Explaining the Predictions of Any Classifi
    文章核心思想速读:提出了一个LIME解释技术能够对任何分类器做出的预测进行解释。L指LOCAL,意思是模型是针对局部某个样本的解释,而不是所有样本I指:INTERPRETABLE,可解释性,能......
  • 并发学习记录10:共享模型之无锁
    一个小例子引入importjava.util.ArrayList;importjava.util.List;importjava.util.concurrent.atomic.AtomicInteger;interfaceAccount{IntegergetBalance......
  • 并发学习记录09:共享模型之内存
    Java内存模型JMM指的是Javamemorymodel,它定义了主存,工作内存等抽象概念,相当于做一个隔离层,将底层CPU寄存器,缓存,硬件内存,CPU指令优化提供的功能通过一个简单接口给使用......
  • 学习 Go,一段旅程:标准库包和并发 #5
    学习Go,一段旅程:标准库包和并发#5大家好!很高兴再次见到你,我希望你做得很好。在本文中,我想分享我在学习Go编程语言方面的进展。本周,我了解了标准库包和并发。标准库包......
  • Rust 标准库
    1.One-Liners1.strings//1.拼接字符串format!("{x}{y}")//2.显示Displaywrite!(x,"{y}")//3.分隔符分开string,s.split(pattern)s.split("abc");s.split('......
  • 提升linux下tcp服务器并发连接数限制
    提升linux下tcp服务器并发连接数限制-星辰大海ゞ-博客园 https://www.cnblogs.com/wjoyxt/p/6155672.html1、修改用户进程可打开文件数限制   在Linux平台上,无论......
  • 记一次 Linux 生产环境,高并发处理过程
    一、关闭用不到的服务器资源:定时任务,减轻数据库的压力。二、前端H5调用接口个数减少,减少请求连接次数。缓解服务器及数据库的压力,同时增大客户端与服务器端的连接超时时......
  • 并发的核心:CAS 是什么?Java8是如何优化 CAS 的?
    大家可能都听说说Java中的并发包,如果想要读懂Java中的并发包,其核心就是要先读懂CAS机制,因为CAS可以说是并发包的底层实现原理。今天就带大家读懂CAS是......
  • 并发的核心:CAS 是什么?Java8是如何优化 CAS 的?_2
    大家可能都听说说Java中的并发包,如果想要读懂Java中的并发包,其核心就是要先读懂CAS机制,因为CAS可以说是并发包的底层实现原理。今天就带大家读懂CAS是......
  • vb.net 开发 VSIX VS插件开发 Addin 入门 示例
    使用vb.net写一个小的VS插件,简直就是灾难. 微软网站上机器翻译的中文,惨不忍睹.折腾了一天多,把心得给大家分享一下.同时也希望有高手指导一下.如果创建项目没......