定义
观察者模式定义了对象之间的一对多依赖,当一个对象改变状态时,它的所有依赖者都会受到通知并自动更新。
观察者模式中有两个角色,一个是主题(Subject),一个是观察者(Observer)。观察者会观察主题,当主题发生了变化,观察者会做出相应的处理。主题跟观察者的关系是一对多。观察者模式也叫发布订阅模式或者事件监听模式。比如很多主播会说,”点关注,不迷路“。当主播上线时,那些订阅了主播的用户会收到通知消息。
简单观察者实现
主题提供三个行为:
- 注册功能:这样观察者才能订阅道它。
- 取消注册功能:观察者能随时取消订阅。
- 通知功能:告诉观察者们有变化。
因此主题会依赖观察者,在它内部维护一个观察者集合,负责观察者的初始化和删除。
观察者要提供一个行为:
- 更新功能:观察者收到通知消息后,随即做出自己的逻辑处理。
需求描述
举个例子。有个商城项目,我们要设计一个缓存模型,要把项目中的重要的实体对象,比如有商品对象goods
、订单对象order
等,在服务启动时从 MySQL 库中查出来,存入 Redis 缓存,然后供服务层使用。项目中有一个数据管理服务,它专门负责实体数据的增删改,比如上架、修改、删除一些商品。数据管理服务的操作修改的是 MySQL 库中的数据。当数据管理服务更新了库里的数据,就要立即通知缓存层更新 Redis 缓存。缓存层肯定有不同的缓存类,比如 GoodsCache
、OrderCache
等。
代码实现
我们可以简单的把数据管理服务当作是一个主题,它变化了(增、删、改),要通知缓存类更新缓存数据。那么缓存类就看作是观察者。
首先是数据管理服务这个主题接口:
public interface DataManageSubject {
/**
* 注册观察者
* @param observer 观察者
*/
void register(ICacheObserver observer);
/**
* 删除观察者
* @param observer 观察者
*/
void unregister(ICacheObserver observer);
/**
* 通知观察者
*/
void notifyObservers();
}
缓存类对应观察者接口:
public interface ICacheObserver {
void update();
}
主题实现类:
public class DataManageSubjectImpl implements DataManageSubject {
private List<ICacheObserver> observers = new ArrayList<>();
@Override
public void register(ICacheObserver observer) {
observers.add(observer);
}
@Override
public void unregister(ICacheObserver observer) {
observers.remove(observer);
}
@Override
public void notifyObservers() {
observers.forEach(ICacheObserver::update);
}
public void updateData() {
System.out.println("admin update mysql data...");
}
}
观察者实现类:
public class GoodsCache implements ICacheObserver {
@Override
public void update() {
System.out.println("query data from db and set into redis successfully");
}
}
public class OrderCache implements ICacheObserver {
@Override
public void update() {
System.out.println("query data from db and set into redis successfully");
}
}
当数据管理服务有了数据更新操作后,它需要主动通知缓存类。
public class DataManageEventPublisher {
public static void main(String[] args) {
DataManageSubject dataManage = new DataManageSubjectImpl();
// 注册观察者
GoodsCache goodsCache = new GoodsCache();
OrderCache orderCache = new OrderCache();
dataManage.register(goodsCache);
dataManage.register(orderCache);
// 更新数据
dataManage.updateData();
// 通知观察者
dataManage.notifyObservers();
}
}
以上代码只为简单描述观察者模式的数据结构。在 JDK 中有内置的观察者模式,java.util
包下有Observer
接口,和一个Observable
类,Observable
是一个抽象类,他就是主题,Observer
就是观察者。从这个代码中我们可以体会到观察者模式的设计思路。
但是它的缺点就是主题跟观察者强耦合了,其实可以把它们的耦合关系交给DataManageEventPublisher
管理,这样主题跟观察者就能解耦了。
在 Spring 中应用观察者模式
观察者思想的应用之一就是事件监听机制。事件监听机制应用广泛,事件(Event)就是一个主题,多个监听器(Listener)监听这个主题。JDK 也设计了事件监听的模型,java.util.EventObject
和java.uitl.EventListener
就是监听机制的顶级父类,Spring 中事件监听模型就是继承这两个类。
再来看事件监听机制中的角色:
- 事件及事件源:事件拥有事件源,它是通过事件对象的构造器传入的一个
Object
对象。事件源是逻辑上被认定为该事件最初发生的对象。说白了,事件源就是用来描述和初始化一个事件。最重要的是,是EventObject
定义了这么一个成员变量。 - 监听器:对应观察者模式中的观察者。监听特定事件,并在内部定义了事件发生后的响应逻辑。
- 事件发布器:事件发布器负责对外提供事件发布和增删监听器的接口,维护事件和监听器的映射关系,并在事件发生时通知监听器。也就是说,事件发布器维护了事件和监听器的关系,将它们解耦了。
接下来看代码实现:
1、服务启动时,加载数据到 Redis
/**
* 启动监听器
*/
@Component
public class StartupListener implements ApplicationListener<ContextRefreshedEvent> {
@Autowired(required=false)
private StartupCache[] startupCaches;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (event.getApplicationContext().getParent() == null) {
Objects.requireNonNull(startupCaches, "testBeans not initialized...");
for (StartupCache startupCache : startupCaches) {
startupCache.onStartup();
}
}
}
}
/**
* 启动缓存基类
*/
public abstract class StartupCache {
public abstract void onStartup();
}
2、数据变更事件和事件源
/**
* 数据管理服务数据变更事件
*/
public class DataChangeEvent extends ApplicationEvent {
public DataChangeEvent(DataChangeSource source) {
super(source);
}
}
/**
* 数据变更事件源
*/
@Data
public class DataChangeSource {
/**
* 更新的是哪一种数据
*/
private String type;
/**
* 操作类型
*/
private String operation;
}
3、监听器:缓存层的类都是监听器,随时监听数据管理服务的事件
/**
* 商品缓存,相当于监听器角色
*/
@Repository
public class GoodsCache extends StartupCache {
private static final String DATA_TYPE = "goods";
@Override
public void onStartup() {
System.out.println("server start: query all from db and set into redis");
}
@EventListener(DataChangeEvent.class)
public void refresh(DataChangeEvent event) {
DataChangeSource source = (DataChangeSource)event.getSource();
if (DATA_TYPE.equals(source.getType())) {
if ("update".equals(source.getOperation())) {
update();
}
}
}
public void update() {
System.out.println("goodsCache update redis cache successfully...");
}
}
注意:使用@EventListene
r跟实现ApplicationListener
是一样的效果,所以选择更简洁的注解方式。
4、商品服务类,模拟数据更新操作
/**
* 商品Service,模拟数据管理服务中对商品的数据更新操作
*/
@Service
public class GoodsService {
@Autowired
private ApplicationEventPublisher applicationEventPublisher;
public void update() {
// 模拟商品数据更新
System.out.println("data manager update goods data in db...");
// 发布事件
DataChangeSource dataChangeSource = new DataChangeSource();
dataChangeSource.setOperation("update");
dataChangeSource.setType("goods");
applicationEventPublisher.publishEvent(new DataChangeEvent(dataChangeSource));
}
}
5、测试类,当数据管理服务更新商品信息,缓存类同步更新缓存。
public class CacheModelTest {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring.xml");
// GoodsService更新数据
GoodsService goodsService = context.getBean(GoodsService.class);
goodsService.update();
}
}
server start: query all from db and set into redis
data manager update goods data in db...
goodsCache update redis cache successfully...
总结
这种写法也同样适用于 Spring Boot。
参考资料
深入理解Spring的容器内事件发布监听机制
Spring Events