观察者模式重启线程
看代码的过程中发现了观察者模式用于重启线程的实例,就顺便研究了一下。
观察者模式
先引用介绍一下观察者模式:
意图: 定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。
主要解决: 一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。
何时使用: 一个对象(目标对象)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。
如何解决: 使用面向对象技术,可以将这种依赖关系弱化。
关键代码: 在抽象类里有一个 ArrayList 存放观察者们。
观察者模式算是一种比较简单的设计模式了,主要就是一个思路:当被观察者发生改变时,通知它所有的观察者,从而观察者们可以做出对应的操作。在 Java 中,已经有观察者模式的支持类了,接下来也是基于这个类实现的。
观察者线程
现在的场景是这样:监控系统将获取到的监控数据保存到队列中,系统中的一个线程从队列中获取并处理监控信息,它不能也不可以停下来(007),否则会积压大量数据。但没法保证这个线程运行一直是正常的,当线程运行中遇到异常,它可能会中断、停止或是产生一些奇奇怪怪的行为。而为了保证这个信息处理线程的不间断运行,我们可以采用观察者模式实现(监工模式)。
先用简单的例子实现一下观察者模式的用法。
首先创建一个线程,它的工作就是从0到10打印数字:
public class BeObservered implements Runnable {
private int count = 0;
@Override
public void run() {
try {
while (true) {
Thread.sleep(500);
System.out.println("当前线程名:" + Thread.currentThread().getName());
System.out.println("当前线程ID:" + Thread.currentThread().getId());
System.out.println("当前线程优先级:" + Thread.currentThread().getPriority());
System.out.println("当前线程Count:" + count.get());
count++;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
现在的要求是,当这个线程打印到10的时候,需要把它砍掉,再开一个新的线程来打印。这里就可以通过观察者模式来实现:这个打印数字的线程就是被观察者,再创建一个观察者类,当这个线程打印到10的时候,通知观察者,观察者将这个线程停掉并重开一个即可。
说起来简单,做起来也简单,先把这个线程改造一下,继承被观察者类:
public class BeObservered extends Observable implements Runnable {
// 坏,多个线程共用一个count,疯狂增加
//private int count = 0;
// 好,定义count为线程局部变量
private ThreadLocal<Integer> count = ThreadLocal.withInitial(() -> 0);
private volatile ThreadLocal<Boolean> flag = ThreadLocal.withInitial(() -> true);
@Override
public void run() {
try {
while (flag.get()) {
Thread.sleep(500);
System.out.println("当前线程名:" + Thread.currentThread().getName());
System.out.println("当前线程ID:" + Thread.currentThread().getId());
System.out.println("当前线程优先级:" + Thread.currentThread().getPriority());
System.out.println("当前线程Count:" + count.get());
if (count.get() >= 10) {
doBusiness();
}
count.set(count.get() + 1);
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
private void doBusiness() {
// 通知观察者的两步操作 不可缺少
super.setChanged();
notifyObservers();
}
public void setFlag(boolean flag){
this.flag.set(flag);
}
}
其中有几个改动,说明一下:
BeObservered
继承了Observable
类,它就是一个被观察者了;count
的类型从int
变成了ThreadLocal <Integer>
,做出这个变化时还没有加入观察者停止之前线程的功能,而是另开一个线程继续打印。在这个场景下,原本的count
不是线程私有变量,而是类变量,会被多个线程共享。导致当第一个线程打印到10时,通知观察者启动第二个线程,而第二个线程的count也是10,又会通知观察者...如此往复,被观察者阵营指数扩大,马上就要起义了。因此需要将count
设置为线程私有变量,即ThreadLocal
,每个线程的count
都是独立的,就不会出现新生的线程的count
就是10的情况了。- 增加
doBusiness
方法,其中先是将自己设置为已改变,并通知所有观察者。这两步操作缺一不可,因为如果不设置自身为已改变,是不会对观察者发出通知的。 - 增加
flag
变量,线程通过检测flag
变量来执行打印任务。这个变量的设置是为了观察者能通过改变flag
而终止线程的打印。
接着是线程的观察者的实现,也非常简单:
public class MeObserver implements Observer {
private BeObservered beObservered;
@Override
public void update(Observable o, Object arg) {
System.out.println("观察者:" + Thread.currentThread().getName() + "通知,开始处理...");
beObservered.setFlag(false);
BeObservered run = new BeObservered();
run.addObserver(this);
new Thread(run).start();
setBeObservered(run);
}
public void setBeObservered(BeObservered beObservered){
this.beObservered = beObservered;
}
}
也简单说一下其中的几个地方:
- 观察者需要实现观察接口
Observer
,实现了这个接口,它就能观察别人了; - 维护一个被观察者变量
beObservered
,表示当前被监工的线程; - 观察者需要重写
update
方法,当观察者收到被观察者发生改变的通知后,会执行update
方法,在这里,我们将维护的线程,也就是执行到10的线程停掉,并创建一个新线程,将自己加入到新线程的观察者列表中,并维护这个新的被观察者。
如此一来,观察者就能一直监督一个线程从0工作到10,这个线程工作完成后再换下一个线程,卸磨杀驴,疯狂产出,无穷无尽...感觉这个例子有点奇怪的含义,当时写的时候没想到,现在越想越不对劲勒。
解决问题
回到监控系统中,同理,只需要为处理监控信息的线程创建一个观察者,当处理线程抛出异常时,通知观察者,观察者将之前的线程停掉,另开一个线程继续处理即可。这样就实现了线程的自动重启,保证了线程处理信息不中断。
参考代码,被观察者,信息处理线程:
public class AlarmRecordThread extends Observable implements Runnable {
protected IServicePreAlarm servicePreAlarm;
private Logger log = LoggerFactory.getLogger(AlarmRecordThread.class);
public AlarmRecordThread(IServicePreAlarm servicePreAlarm){
this.servicePreAlarm = servicePreAlarm;
}
public void doBusiness(){
super.setChanged();
notifyObservers();
}
@Override
public void run() {
while(true){
try{
AlarmHandlerDto dto = AlarmDtoQueue.queue.poll();
//log.info("queueSize:" + AlarmDtoQueue.queue.size());
if(dto!=null){
servicePreAlarm.singleHandler(dto);
}else{
Thread.sleep(500);//200
}
}catch (Exception e) {
doBusiness();
log.error("---------警报处理线程异常--------:",e);
break;
}
}
}
}
参考代码,观察者,负责重启信息处理线程:
@Component
public class ARThreadListener implements Observer {
@Autowired
IServicePreAlarm serviceMysql;
private Logger log = LoggerFactory.getLogger(ARThreadListener.class);
@Override
public void update(Observable o, Object arg) {
AlarmRecordThread run = new AlarmRecordThread(serviceMysql);
run.addObserver(this);
new Thread(run).start();
log.info("--------警报处理线程重启成功--------");
}
}
这里的代码没把之前的线程停掉,感觉会有问题。。。
标签:count,run,Thread,重启,观察者,线程,public From: https://www.cnblogs.com/qiyuanc/p/work6.html