零.前置芝士(可以不了解)
信息量
定义
信号量(semaphore)是操作系统中用来解决并发中的互斥和同步问题的一种方法。是可以用来保证两个或多个关键代码段不被并发调用。
目的
类似计数器,常用在多线程同步任务上,信号量可以在当前线程某个任务完成后,通知别的线程,再进行别的任务。
同步:处理竞争就是同步,安排线程执行的先后顺序就是同步,每个线程都有一定的个先后执行顺序。
互斥:互斥访问不可共享的临界资源,同时会引发两个新的控制问题(互斥可以说是特殊的同步)。
竞争:当并发进程竞争使用同一个资源的时候,我们就称为竞争进程。)
那信号量的值n代表什么意思呢?
n>0:当前有可用资源,可用资源数量为n
n=0:资源都被占用,可用资源数量为0
n<0:资源都被占用,并且还有n个进程正在排队
故信号量有两个伪代码
struct semaphore{
int count;//信号量大小
queryType q;//阻塞队列,用来让进程排队
}
//申请资源
void semWait(semaphore s){
s.count--;
if(s.count<0){
//将当前进程放入阻塞队列排队
}
//设置为<0是因为count-1后,如果为-1或更小说明已经没有可用资源,故放入队列
}
//释放资源
void semSignal(semaphore s){
s.count++;//一个进程终会结束,离开后可用资源++
if(s.count<=0){
//从队列中唤醒一个进程
}
//设置为<=0是因为count+1后,如果之前为-1或更小则还有进程排队,故唤醒一个排队的进程
}
(伪代码仅为了更好的理解信号量的实际作用,接下来会实例使用申请释放操作)
实例
消费者-生产者模型
问题描述
该问题描述了共享固定大小缓冲区的两个线程——即所谓的“生产者”和“消费者”——在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据。
问题分析
该问题需要注意的几点:
- 在缓冲区为空时,消费者不能再进行消费
- 在缓冲区为满时,生产者不能再进行生产
- 在一个线程进行生产或消费时,其余线程不能再进行生产或消费等操作,即缓冲区是临界资源,各进程必须互斥的访问
- 注意同步信号量与互斥锁的顺序
由于前两点原因,因此需要保持线程间的同步,即一个线程消费(或生产)完,其他线程才能进行竞争CPU,获得消费(或生产)的机会。对于这一点,可以使用同步信号量进行线程间的同步:生产者线程在product之前,需要wait直至获取自己所需的信号量之后,才会进行product的操作;同样,对于消费者线程,在consume之前需要wait直到没有线程在访问共享区(缓冲区),再进行consume的操作
伪代码实现
semaphore mutex = 1; //互斥信号量,实现对缓冲区的互斥访问;
semaphore empty = n; //同步信号量,表示空闲缓冲区的数量;
semaphore full = 0; //同步信号量,表示产品的数量,也即非空缓冲区的数量.
producer () {
while(1) {
semWait(empty);//消耗一个空闲缓冲区
semWait(mutex);//保证在生产时不会有其他线程访问缓冲区
//把产品放入缓冲区;
semSignal(mutex);
semSignal(full);//增加一个非空缓冲区
}
}
consumer () {
while(1) {
semWait(full);//消耗一个非空缓冲区
semWait(mutex);
//从缓冲区取出一个产品;
semSignal(mutex);
semSignal(empty);//增加一个空闲缓冲区
}
}
(至于同步信号量和互斥锁的顺序问题先记住,原因不重要)
一.多消费者-单一生产者模型
问题描述和问题分析和前文类似,不再赘述
实现代码
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.Semaphore;
import java.util.Random;
public class ProducerConsumerExample {
private Semaphore locksemaphore;//互斥锁,用来防止buffer被同时使用
private Semaphore consumersemaphore;//消费者有多个,需要同步信号量保证同步
private Thread[] consumerThreads;//消费者线程们
private Thread producerThread;//生产者线程
private List<Integer> buffer;//缓冲区,存放产品
private int buffersize;//缓冲区最大长度
private Random rand;//随机数
//设置启动线程函数
private void startthread(){
locksemaphore=new Semaphore(1);//互斥信号量设为1,代表同时只有一个线程能使用
consumersemaphore=new Semaphore(0);//设为0,一开始没有产品
buffer=new ArrayList<>();
buffersize=5;//可以更改
rand=new Random();
producerThread=new Thread(new Producer());
int consumercount=3;//可以更改
consumerThreads=new Thread[consumercount];
for(int i=0;i<consumercount;i++)
consumerThreads[i]=new Thread(new Consumer(i+1));
producerThread.start();
consumerThreads[0].start();
consumerThreads[1].start();
consumerThreads[2].start();
//下面是让线程停止
try{
Thread.sleep(1000);//让线程运行一会
System.out.println("还剩"+buffer.size());
producerThread.interrupt();
consumerThreads[1].interrupt();
consumerThreads[2].interrupt();
consumerThreads[0].interrupt();
}catch (InterruptedException e){
Thread.currentThread().interrupt();
}
}
//生产者线程类
private class Producer implements Runnable {
public void run() {
try {
for (int i = 0; i < 1000; i++) {
locksemaphore.acquire();//关锁,防止其他线程冲突
//注意,因为只有一个生产者,所以直接判断缓冲区资源数即可,不用信号量判断
if(buffer.size()<buffersize){
int r=rand.nextInt(100);
buffer.add(r);//生成产品——一个随机数
System.out.println("生产者生产: " + r);
consumersemaphore.release();//增加非空缓冲区
}
locksemaphore.release();//解锁
int r=rand.nextInt(100);
Thread.sleep(r);//等一会消费者的消耗
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private class Consumer implements Runnable {
private int id;//不同的消费者编号
public Consumer(int id){
this.id=id;
}
public void run() {
try {
while (true) {
consumersemaphore.acquire();//先申请资源
locksemaphore.acquire();//关锁
if(!buffer.isEmpty()){
int num=buffer.remove(0);//消费
System.out.println("消费者"+id+"消费:"+num);
}
locksemaphore.release();//解锁
int r=rand.nextInt(300);
Thread.sleep(r);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args) {
ProducerConsumerExample demo=new ProducerConsumerExample();
demo.startthread();
}
}
二.UI设计
三.实验代码实现
import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.Semaphore;
import java.util.Random;
public class Test3 extends JFrame {
private JButton startbutton;
private JButton stopbutton;
private JLabel producerlabel;
private JLabel consumerlabel;
private JTextArea textArea;
private JPanel buttonpanel;
private boolean running;
private Semaphore locksemaphore;//互斥锁,用来防止buffer被同时使用
private Semaphore consumersemaphore;//消费者有多个,需要同步信号量保证同步
private Thread[] consumerThreads;//消费者线程们
private Thread producerThread;//生产者线程
private List<Integer> buffer;//缓冲区,存放产品
private int buffersize;//缓冲区最大长度
private Random rand;//随机数
public Test3(){
super("多消费者-单一生产者问题模拟");
setSize(400,300);
setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
textArea=new JTextArea();
textArea.setEditable(false);//禁止编辑area
JScrollPane scrollPane=new JScrollPane(textArea);//滚动条
add(scrollPane, BorderLayout.CENTER);//将滑动条放在边框布局的中部
producerlabel=new JLabel("生产者:1");
consumerlabel=new JLabel("消费者:3");
startbutton=new JButton("开始");
stopbutton=new JButton("停止");
stopbutton.setEnabled(false);//同样禁止更改
ButtonListener listener=new ButtonListener();//监听对象
startbutton.addActionListener(listener);
stopbutton.addActionListener(listener);
//将按钮和标签加入panel
buttonpanel=new JPanel();
buttonpanel.add(startbutton);
buttonpanel.add(stopbutton);
buttonpanel.add(producerlabel);
buttonpanel.add(consumerlabel);
add(buttonpanel,BorderLayout.SOUTH);//将按钮面板放在边框布局的中部
}
//下面是专门用来接收开始和结束按钮响应的类
private class ButtonListener implements ActionListener{
@Override
public void actionPerformed(ActionEvent e) {
Object source=e.getSource();
if(source==startbutton){
startsimulation();
}else if(source==stopbutton){
stopsimulation();
}
}
}
private void startsimulation(){
running=true;
locksemaphore=new Semaphore(1);//互斥信号量设为1,代表同时只有一个线程能使用
consumersemaphore=new Semaphore(0);//设为0,一开始没有产品
buffer=new ArrayList<>();
buffersize=5;//可以更改
rand=new Random();
producerThread=new Thread(new Producer());
int consumercount=3;//可以更改
consumerThreads=new Thread[consumercount];
for(int i=0;i<consumercount;i++)
consumerThreads[i]=new Thread(new Consumer(i+1));
producerThread.start();
consumerThreads[0].start();
consumerThreads[1].start();
consumerThreads[2].start();
startbutton.setEnabled(false);//开始了不能再按
stopbutton.setEnabled(true);//可以按了
appendMessage("模拟开始");
}
private void stopsimulation(){
running=false;
producerThread.interrupt();
for(Thread consumerThread:consumerThreads){
consumerThread.interrupt();
}
startbutton.setEnabled(true);
stopbutton.setEnabled(false);
appendMessage("模拟停止");
}
private void appendMessage(String message){
textArea.append(message+"\n");
textArea.setCaretPosition(textArea.getDocument().getLength());
}
private int produceItem(){
int item=rand.nextInt(100);
appendMessage("生产者生产了一个项目:"+item);
return item;
}
private void consumeItem(int item,int consumserId){
appendMessage("消费者"+consumserId+"消费了一个项目:"+item);
}
private class Producer implements Runnable {
@Override
public void run() {
try {
while(running) {
Thread.sleep(800);
locksemaphore.acquire();
if(buffer.size()<buffersize){
buffer.add(produceItem());
//System.out.println("生产者生产: " + r);
consumersemaphore.release();
}
locksemaphore.release();
int r=rand.nextInt(100);
//Thread.sleep(800);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
private class Consumer implements Runnable {
private int id;
public Consumer(int id){
this.id=id;
}
public void run() {
try {
while (running) {
consumersemaphore.acquire();
if(!running) break;
locksemaphore.acquire();
if(!buffer.isEmpty()){
int num=buffer.remove(0);
//System.out.println("消费者"+id+"消费:"+num);
consumeItem(num,id);
}
locksemaphore.release();
int r=rand.nextInt(300);
Thread.sleep(r);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public static void main(String[] args){
Test3 test=new Test3();
test.setVisible(true);
}
}
标签:java,private,信号量,互斥,实验,new,缓冲区,线程
From: https://www.cnblogs.com/jasony/p/17901813.html