首页 > 其他分享 >观察者模式重启线程

观察者模式重启线程

时间:2023-04-11 10:37:55浏览次数:35  
标签:count run Thread 重启 观察者 线程 public

观察者模式重启线程

看代码的过程中发现了观察者模式用于重启线程的实例,就顺便研究了一下。

观察者模式

先引用介绍一下观察者模式:

意图: 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。

主要解决: 一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。

何时使用: 一个对象(目标对象)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。

如何解决: 使用面向对象技术,可以将这种依赖关系弱化。

关键代码: 在抽象类里有一个 ArrayList 存放观察者们。

观察者模式算是一种比较简单的设计模式了,主要就是一个思路:当被观察者发生改变时,通知它所有的观察者,从而观察者们可以做出对应的操作。在 Java 中,已经有观察者模式的支持类了,接下来也是基于这个类实现的。

观察者线程

现在的场景是这样:监控系统将获取到的监控数据保存到队列中,系统中的一个线程从队列中获取并处理监控信息,它不能也不可以停下来(007),否则会积压大量数据。但没法保证这个线程运行一直是正常的,当线程运行中遇到异常,它可能会中断、停止或是产生一些奇奇怪怪的行为。而为了保证这个信息处理线程的不间断运行,我们可以采用观察者模式实现(监工模式)。

先用简单的例子实现一下观察者模式的用法。

首先创建一个线程,它的工作就是从0到10打印数字:

public class BeObservered implements Runnable {
  
    private int count = 0;

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(500);
                System.out.println("当前线程名:" + Thread.currentThread().getName());
                System.out.println("当前线程ID:" + Thread.currentThread().getId());
                System.out.println("当前线程优先级:" + Thread.currentThread().getPriority());
                System.out.println("当前线程Count:" + count.get());

		count++;
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }
}

现在的要求是,当这个线程打印到10的时候,需要把它砍掉,再开一个新的线程来打印。这里就可以通过观察者模式来实现:这个打印数字的线程就是被观察者,再创建一个观察者类,当这个线程打印到10的时候,通知观察者,观察者将这个线程停掉并重开一个即可。

说起来简单,做起来也简单,先把这个线程改造一下,继承被观察者类:

public class BeObservered extends Observable implements Runnable {

    // 坏,多个线程共用一个count,疯狂增加
    //private int count = 0;

    // 好,定义count为线程局部变量
    private ThreadLocal<Integer> count = ThreadLocal.withInitial(() -> 0);

    private volatile ThreadLocal<Boolean> flag = ThreadLocal.withInitial(() -> true);

    @Override
    public void run() {
        try {
            while (flag.get()) {
                Thread.sleep(500);
                System.out.println("当前线程名:" + Thread.currentThread().getName());
                System.out.println("当前线程ID:" + Thread.currentThread().getId());
                System.out.println("当前线程优先级:" + Thread.currentThread().getPriority());
                System.out.println("当前线程Count:" + count.get());

                if (count.get() >= 10) {
                    doBusiness();
                }
                count.set(count.get() + 1);
            }
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private void doBusiness() {
        // 通知观察者的两步操作 不可缺少
        super.setChanged();
        notifyObservers();
    }

    public void setFlag(boolean flag){
        this.flag.set(flag);
    }
}

其中有几个改动,说明一下:

  1. BeObservered 继承了 Observable 类,它就是一个被观察者了;
  2. count 的类型从 int 变成了 ThreadLocal <Integer>,做出这个变化时还没有加入观察者停止之前线程的功能,而是另开一个线程继续打印。在这个场景下,原本的 count 不是线程私有变量,而是类变量,会被多个线程共享。导致当第一个线程打印到10时,通知观察者启动第二个线程,而第二个线程的count也是10,又会通知观察者...如此往复,被观察者阵营指数扩大,马上就要起义了。因此需要将 count 设置为线程私有变量,即 ThreadLocal,每个线程的 count 都是独立的,就不会出现新生的线程的 count 就是10的情况了。
  3. 增加 doBusiness 方法,其中先是将自己设置为已改变,并通知所有观察者。这两步操作缺一不可,因为如果不设置自身为已改变,是不会对观察者发出通知的。
  4. 增加 flag 变量,线程通过检测 flag 变量来执行打印任务。这个变量的设置是为了观察者能通过改变 flag 而终止线程的打印。

接着是线程的观察者的实现,也非常简单:

public class MeObserver implements Observer {

    private BeObservered beObservered;

    @Override
    public void update(Observable o, Object arg) {
        System.out.println("观察者:" + Thread.currentThread().getName() + "通知,开始处理...");
        beObservered.setFlag(false);

        BeObservered run = new BeObservered();
        run.addObserver(this);
        new Thread(run).start();

        setBeObservered(run);
    }

    public void setBeObservered(BeObservered beObservered){
        this.beObservered = beObservered;
    }
}

也简单说一下其中的几个地方:

  1. 观察者需要实现观察接口 Observer,实现了这个接口,它就能观察别人了;
  2. 维护一个被观察者变量 beObservered,表示当前被监工的线程;
  3. 观察者需要重写 update 方法,当观察者收到被观察者发生改变的通知后,会执行 update 方法,在这里,我们将维护的线程,也就是执行到10的线程停掉,并创建一个新线程,将自己加入到新线程的观察者列表中,并维护这个新的被观察者。

如此一来,观察者就能一直监督一个线程从0工作到10,这个线程工作完成后再换下一个线程,卸磨杀驴,疯狂产出,无穷无尽...感觉这个例子有点奇怪的含义,当时写的时候没想到,现在越想越不对劲勒。

解决问题

回到监控系统中,同理,只需要为处理监控信息的线程创建一个观察者,当处理线程抛出异常时,通知观察者,观察者将之前的线程停掉,另开一个线程继续处理即可。这样就实现了线程的自动重启,保证了线程处理信息不中断。

参考代码,被观察者,信息处理线程:

public class AlarmRecordThread extends Observable implements  Runnable {

    protected IServicePreAlarm servicePreAlarm;
    private Logger log = LoggerFactory.getLogger(AlarmRecordThread.class);

    public AlarmRecordThread(IServicePreAlarm servicePreAlarm){
        this.servicePreAlarm = servicePreAlarm;
    }

    public void doBusiness(){
        super.setChanged();
        notifyObservers();
    }

    @Override
    public void run() {
        while(true){
            try{
                AlarmHandlerDto dto = AlarmDtoQueue.queue.poll();
                //log.info("queueSize:" + AlarmDtoQueue.queue.size());
                if(dto!=null){
                    servicePreAlarm.singleHandler(dto);
                }else{
                    Thread.sleep(500);//200
                }
            }catch (Exception e) {
                doBusiness();
                log.error("---------警报处理线程异常--------:",e);
                break;
            }

        }

    }

}

参考代码,观察者,负责重启信息处理线程:

@Component
public class ARThreadListener implements Observer {

    @Autowired
    IServicePreAlarm serviceMysql;
    private Logger log = LoggerFactory.getLogger(ARThreadListener.class);

    @Override
    public void update(Observable o, Object arg) {
        AlarmRecordThread run = new AlarmRecordThread(serviceMysql);
        run.addObserver(this);
        new Thread(run).start();
        log.info("--------警报处理线程重启成功--------");

    }
}

这里的代码没把之前的线程停掉,感觉会有问题。。。

标签:count,run,Thread,重启,观察者,线程,public
From: https://www.cnblogs.com/qiyuanc/p/work6.html

相关文章

  • SpringBoot线程池和Java线程池的实现原理
    使用默认的线程池方式一:通过@Async注解调用publicclassAsyncTest{@Asyncpublicvoidasync(Stringname)throwsInterruptedException{System.out.println("async"+name+""+Thread.currentThread().getName());Thread.sleep(10......
  • 多线程事务的提交解决办法
    多线程处理的时候,如果发生了错误,不会因为加了@Transcational注解而生效,这里需要额外使用SqlSessionTemplate{//插入主表electronicTaxBillMapper.insertBatch(masterList);//更新出库单状态outOrderDetailMapper.updateByOrderCodeList(orderCodeList);//切......
  • 线程中的终极异常处理处理
    提问线程中的终极异常处理处理回答为了异常阻塞主线程是不值得的使用事件通知方式,这样不会阻塞主线程捕捉AggregateException......
  • 使用vCenter的主机配置文件重置ESXi主机密码,无需重启
    说在前面ESXi主机多次输入用户名密码错误之后,账号会被锁定900秒(15分钟),在此期间即使输入了正确的密码也不会进入到系统里;(而且会导致计时器重置,不妨等一等)前言在日常过程中由于配置好ESXi主机后很久不登录,或者更改密码时疏忽导致改过的密码怎么输入都不对,这个时候肯定......
  • Flask快速入门day 06 (sqlalchemy的使用,scoped-session线程安全)
    目录Flask框架之sqlalchemy的使用一、SQLAlchemy基本使用1、简介2、操作原生sql3、表创建4、ORM操作4、1.基本使用4、2.增删改查4、3.高级查询二、外键关系1、一对多1、1.表模型1、2.新增和基于对象的查询2、多对多2、1.表模型2、2.新增和基于对象查询3、连表查询三、scoped_sessi......
  • 进程与线程&并行与并发的概念
    一、进程与线程进程程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理IO的当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进......
  • Java并发(一)----进程、线程、并行、并发
    一、进程与线程进程程序由指令和数据组成,但这些指令要运行,数据要读写,就必须将指令加载至CPU,数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理IO的当一个程序被运行,从磁盘加载这个程序的代码至内存,这时就开启了一个进......
  • Java中创建线程的方式以及线程池创建的方式、推荐使用ThreadPoolExecutor以及示例
    场景Java中创建线程的方式有三种1、通过继承Thread类来创建线程定义一个线程类使其继承Thread类,并重写其中的run方法,run方法内部就是线程要完成的任务,因此run方法也被称为执行体,使用start方法来启动线程。2、通过实现Runanle接口来创建线程首先定义Runnable接口,并重写Runnab......
  • Jmeter线程组间传递变量
    做接口测试,上一个线程组(A线程组)提取的变量,需要传递给下一个线程组(B线程组)使用。故需要将A线程组内提取的变量设置为全局变量。实现如下:1.json提取变量(A线程组)通过json提取器,将A线程组请求中的billId提取出来,如下:2. BeanShell取样器定义变量(A线程组)添加【BeanShell......
  • 线程和队列应用--消费者和生产者
    1、用一个队列存储商品2、创建一个专门生产商品的线程类,当商品数量少于50时,开始生产商品,每次生产200个商品,每生产一轮,暂停1s3、创建一个专门消费商品的线程类,当商品数量大于10时就开始消费,循环消费,每次消费3个,当商品数量少于10的时候,暂停2s    ......