首页 > 其他分享 >nacos——03

nacos——03

时间:2022-11-05 22:31:11浏览次数:49  
标签:event 03 return publisher nacos public subscriber final

今天来聊聊nacos的事件机制。

说起事件这块,肯定包含这几个要素:事件、事件生成者(即事件发布者)、事件消费者(事件订阅者)、通知。

nacos的事件相关核心类都在​​com.alibaba.nacos.common.notify​​包下

nacos——03_注册中心

事件

nacos有两种事件,一个是基础事件,一个是慢事件(名字觉得不好)。之所以称之为慢事件估计是因为共享一个队列的缘故吧。

Event

package com.alibaba.nacos.common.notify;
/**
* An abstract class for event.
* 抽象事件类,作为nacos的事件基类(其下还有一个SlowEvent),具体实现类不下几十个
*/
@SuppressWarnings({"PMD.AbstractClassShouldStartWithAbstractNamingRule"})
public abstract class Event implements Serializable {

private static final long serialVersionUID = -3731383194964997493L;

private static final AtomicLong SEQUENCE = new AtomicLong(0);

private final long sequence = SEQUENCE.getAndIncrement();

/**
* Event sequence number, which can be used to handle the sequence of events.
* 这里通过AtomicLong的getAndIncrement方法设置,初始为0,需要保证唯一性不能重复,涉及到事件处理顺序
* @return sequence num, It's best to make sure it's monotone.
*/
public long sequence() {
return sequence;
}

/**
* Event scope.
* 事件范围,基本都是默认null
* @return event scope, return null if for all scope
*/
public String scope() {
return null;
}

/**
* Whether is plugin event. If so, the event can be dropped when no publish and subscriber without any hint. Default
* false
* 插件事件没有研究
* @return {@code true} if is plugin event, otherwise {@code false}
*/
public boolean isPluginEvent() {
return false;
}
}


SlowEvent

package com.alibaba.nacos.common.notify;
/**
* This event share one event-queue.
* 抽象慢事件类,估计是因为共享一个队列吧
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class SlowEvent extends Event {

/**
* 重写后固定为0
* @return
*/
@Override
public long sequence() {
return 0;
}
}


慢事件的子类如下,其他为测试类

nacos——03_配置中心_02

发布者

这里有两个接口,EventPublisher、ShardedEventPublisher,分别有两个默认实现类DefaultPublisher、DefaultSharePublisher。关系如下图所示

nacos——03_nacos_03


EventPublisher

package com.alibaba.nacos.common.notify;
/**
* Event publisher.
* 事件发布者接口,主要方法有初始化、注册/取消订阅者、发布时间、通知订阅者
*/
public interface EventPublisher extends Closeable {

/**
* 初始化
*/
void init(Class<? extends Event> type, int bufferSize);

/**
* 当前事件数量
*/
long currentEventSize();

/**
* 添加订阅者
*/
void addSubscriber(Subscriber subscriber);

/**
* 取消订阅者的订阅
*/
void removeSubscriber(Subscriber subscriber);

/**
* 发布事件
*/
boolean publish(Event event);

/**
* 通知订阅者
*/
void notifySubscriber(Subscriber subscriber, Event event);

}

DefaultPublisher

package com.alibaba.nacos.common.notify;

/**
* The default event publisher implementation.
* 默认的事件发布者实现,实现 EventPublisher 的同时又继承了 Thread
* <p>Internally, use {@link ArrayBlockingQueue <Event/>} as a message staging queue.
* 在内部使用 ArrayBlockingQueue 作为消息暂存队列
*/
public class DefaultPublisher extends Thread implements EventPublisher {

protected static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);

private volatile boolean initialized = false;

private volatile boolean shutdown = false;

private Class<? extends Event> eventType;

/**
* nacos自己通过jdk的 ConcurrentHashMap 搞了一个 ConcurrentHashSet,存放所有订阅者
*/
protected final ConcurrentHashSet<Subscriber> subscribers = new ConcurrentHashSet<>();

/**
* 队列最大值,如果init方法赋值为-1时在最终会被赋值为 NotifyCenter.ringBufferSize,该值默认为16384
*/
private int queueMaxSize = -1;

/**
* 内部具体是实现是 ArrayBlockingQueue
*/
private BlockingQueue<Event> queue;

protected volatile Long lastEventSequence = -1L;
/**
* 原子更新事件属性lastEventSequence
*/
private static final AtomicReferenceFieldUpdater<DefaultPublisher, Long> UPDATER = AtomicReferenceFieldUpdater
.newUpdater(DefaultPublisher.class, Long.class, "lastEventSequence");

@Override
public void init(Class<? extends Event> type, int bufferSize) {
setDaemon(true);
setName("nacos.publisher-" + type.getName());
this.eventType = type;
// queueMaxSize在start中为-1,会被赋值为 NotifyCenter.ringBufferSize,
// 但 bufferSize 不可能为-1,否则下面的 ArrayBlockingQueue 就会报 IllegalArgumentException 异常
this.queueMaxSize = bufferSize;
this.queue = new ArrayBlockingQueue<>(bufferSize);
start();
}

public ConcurrentHashSet<Subscriber> getSubscribers() {
return subscribers;
}

@Override
public synchronized void start() {
// 初始为false
if (!initialized) {
// start just called once
super.start();
if (queueMaxSize == -1) {
queueMaxSize = ringBufferSize;
}
// 启动完成后置为true
initialized = true;
}
}

@Override
public long currentEventSize() {
return queue.size();
}

/**
* 继承自Thread的方法,在start方法后执行
*/
@Override
public void run() {
openEventHandler();
}

void openEventHandler() {
try {

// This variable is defined to resolve the problem which message overstock in the queue.
int waitTimes = 60;
// To ensure that messages are not lost, enable EventHandler when
// waiting for the first Subscriber to register
// 这里如果当前线程未关闭且无订阅者且 waitTimes>0 就会在此for循环中等待(最长等待60秒),
// 直到条件被打破就会立即进入下方事件处理的循环中,避免发生事件积压
for (; ; ) {
if (shutdown || hasSubscriber() || waitTimes <= 0) {
break;
}
ThreadUtils.sleep(1000L);
waitTimes--;
}

for (; ; ) {
if (shutdown) {
break;
}
final Event event = queue.take();
// 接收事件并通知订阅者
receiveEvent(event);
UPDATER.compareAndSet(this, lastEventSequence, Math.max(lastEventSequence, event.sequence()));
}
} catch (Throwable ex) {
LOGGER.error("Event listener exception : ", ex);
}
}

private boolean hasSubscriber() {
return CollectionUtils.isNotEmpty(subscribers);
}

@Override
public void addSubscriber(Subscriber subscriber) {
subscribers.add(subscriber);
}

@Override
public void removeSubscriber(Subscriber subscriber) {
subscribers.remove(subscriber);
}

/**
* 在外部通过调用NotifyCenter.publishEvent发布事件,最终会进到这里
* @param event {@link Event}
* @return
*/
@Override
public boolean publish(Event event) {
// 检查是否已初始化完成
checkIsStart();
// 写入事件
boolean success = this.queue.offer(event);
if (!success) {
LOGGER.warn("Unable to plug in due to interruption, synchronize sending time, event : {}", event);
// 未能写入事件时需要将事件通知订阅者
receiveEvent(event);
return true;
}
return true;
}

void checkIsStart() {
if (!initialized) {
throw new IllegalStateException("Publisher does not start");
}
}

@Override
public void shutdown() {
this.shutdown = true;
this.queue.clear();
}

public boolean isInitialized() {
return initialized;
}

/**
* Receive and notifySubscriber to process the event.
*
* @param event {@link Event}.
*/
void receiveEvent(Event event) {
final long currentEventSequence = event.sequence();
// 此事件没有订阅者就直接返回了不需要后续处理
if (!hasSubscriber()) {
LOGGER.warn("[NotifyCenter] the {} is lost, because there is no subscriber.", event);
return;
}

// Notification single event listener
// 循环遍历所有订阅者,如果订阅了且未失效就通知该订阅者
for (Subscriber subscriber : subscribers) {
if (!subscriber.scopeMatches(event)) {
continue;
}

// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}

// Because unifying smartSubscriber and subscriber, so here need to think of compatibility.
// Remove original judge part of codes.
notifySubscriber(subscriber, event);
}
}

@Override
public void notifySubscriber(final Subscriber subscriber, final Event event) {

LOGGER.debug("[NotifyCenter] the {} will received by {}", event, subscriber);

final Runnable job = () -> subscriber.onEvent(event);
final Executor executor = subscriber.executor();
// 如订阅者有自己的执行器通过执行器异步执行run方法,否则直接执行run
if (executor != null) {
executor.execute(job);
} else {
try {
job.run();
} catch (Throwable e) {
LOGGER.error("Event callback exception: ", e);
}
}
}
}

ShardedEventPublisher

package com.alibaba.nacos.common.notify;

/**
* 共享事件发布者接口(此发布者可以支持多事件),继承了 EventPublisher,新增带有事件类型的注册/取消订阅者两个方法
*/
public interface ShardedEventPublisher extends EventPublisher {

/**
* Add listener for default share publisher.
*/
void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);

/**
* Remove listener for default share publisher.
*/
void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType);
}

DefaultSharePublisher

package com.alibaba.nacos.common.notify;
/**
* 默认共享发布者,继承了 DefaultPublisher,同时实现了 ShardedEventPublisher
* 比较简单,就三个方法,带有事件类型的注册/取消订阅者,及接收事件并通知订阅者方法
*/
public class DefaultSharePublisher extends DefaultPublisher implements ShardedEventPublisher {
/**
* 这里表明一种事件可以有多个订阅者
*/
private final Map<Class<? extends SlowEvent>, Set<Subscriber>> subMappings = new ConcurrentHashMap<>();

private final Lock lock = new ReentrantLock();

@Override
public void addSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
// Actually, do a classification based on the slowEvent type.
// 注意这里只处理 SlowEvent 类型事件
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
// For stop waiting subscriber, see {@link DefaultPublisher#openEventHandler}.
subscribers.add(subscriber);
// 如果此事件之前有订阅者直接取出订阅者集合并将当前订阅者加入,否则生成一个新的订阅者集合并将当前订阅者加入
lock.lock();
try {
Set<Subscriber> sets = subMappings.get(subSlowEventType);
if (sets == null) {
Set<Subscriber> newSet = new ConcurrentHashSet<>();
newSet.add(subscriber);
subMappings.put(subSlowEventType, newSet);
return;
}
sets.add(subscriber);
} finally {
lock.unlock();
}
}

@Override
public void removeSubscriber(Subscriber subscriber, Class<? extends Event> subscribeType) {
// Actually, do a classification based on the slowEvent type.
// 注意这里只处理 SlowEvent 类型事件
Class<? extends SlowEvent> subSlowEventType = (Class<? extends SlowEvent>) subscribeType;
// For removing to parent class attributes synchronization.
subscribers.remove(subscriber);
// 取出对应事件的订阅者集合从中删除当前订阅者
lock.lock();
try {
Set<Subscriber> sets = subMappings.get(subSlowEventType);

if (sets != null) {
sets.remove(subscriber);
}
} finally {
lock.unlock();
}
}

@Override
public void receiveEvent(Event event) {

final long currentEventSequence = event.sequence();
// get subscriber set based on the slow EventType.
final Class<? extends SlowEvent> slowEventType = (Class<? extends SlowEvent>) event.getClass();

// Get for Map, the algorithm is O(1).
Set<Subscriber> subscribers = subMappings.get(slowEventType);
if (null == subscribers) {
LOGGER.debug("[NotifyCenter] No subscribers for slow event {}", slowEventType.getName());
return;
}

// Notification single event subscriber
for (Subscriber subscriber : subscribers) {
// Whether to ignore expiration events
if (subscriber.ignoreExpireEvent() && lastEventSequence > currentEventSequence) {
LOGGER.debug("[NotifyCenter] the {} is unacceptable to this subscriber, because had expire",
event.getClass());
continue;
}
// 这里调用父类的通知方法进行通知
// Notify single subscriber for slow event.
notifySubscriber(subscriber, event);
}
}
}

订阅者


Subscriber

package com.alibaba.nacos.common.notify.listener;
/**
* 订阅接口的抽象类,事件必须是 Event 的子类
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class Subscriber<T extends Event> {

/**
* 事件回调方法
*/
public abstract void onEvent(T event);

/**
* 订阅的事件类型,这里仅能是单个
*/
public abstract Class<? extends Event> subscribeType();

/**
* 如果自己有执行器就可以异步在执行器中执行,否则就是同步
*/
public Executor executor() {
return null;
}

/**
* 是否忽略失效的事件
*/
public boolean ignoreExpireEvent() {
return false;
}

/**
* 事件范围使用默认 null
*/
public boolean scopeMatches(T event) {
return true;
}
}

SmartSubscriber

package com.alibaba.nacos.common.notify.listener;
/**
* 聪明的订阅者,继承了 Subscriber,支持订阅多个事件
*/
@SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
public abstract class SmartSubscriber extends Subscriber {

/**
* 这里是感兴趣的事件集合
*/
public abstract List<Class<? extends Event>> subscribeTypes();

/**
* final修饰,不允许子类修改
*/
@Override
public final Class<? extends Event> subscribeType() {
return null;
}

/**
* final修饰,不允许子类修改
*/
@Override
public final boolean ignoreExpireEvent() {
return false;
}
}

通知

NotifyCenter

package com.alibaba.nacos.common.notify;
/**
* Unified Event Notify Center.
* 统一事件通知中心
* 主要方法包括注添加订阅者、取消发布者、获取发布者、发布事件、注册订阅者、注册/取消事件与发布者关联
*
* @author <a href="mailto:[email protected]">liaochuntao</a>
* @author zongtanghu
*/
public class NotifyCenter {

private static final Logger LOGGER = LoggerFactory.getLogger(NotifyCenter.class);
/**
* 内部 ArrayBlockingQueue 缓冲区大小,高写入吞吐量的应用程序,需要适当增加该值,默认值为16384
*/
public static int ringBufferSize;

/**
* 共享发布者的消息暂存队列缓冲区的大小,默认值为1024
*/
public static int shareBufferSize;

private static final AtomicBoolean CLOSED = new AtomicBoolean(false);
/**
* DefaultPublisher
*/
private static final EventPublisherFactory DEFAULT_PUBLISHER_FACTORY;
/**
* 单例
*/
private static final NotifyCenter INSTANCE = new NotifyCenter();
/**
* DefaultSharePublisher 发布者
*/
private DefaultSharePublisher sharePublisher;
/**
* DefaultPublisher 发布者类
*/
private static Class<? extends EventPublisher> clazz;

/**
* Publisher management container.
* DefaultPublisher 发布者容器
*/
private final Map<String, EventPublisher> publisherMap = new ConcurrentHashMap<>(16);

static {
// Internal ArrayBlockingQueue buffer size. For applications with high write throughput,
// this value needs to be increased appropriately. default value is 16384
// 内部 ArrayBlockingQueue 缓冲区大小,高写入吞吐量的应用程序,需要适当增加该值,默认值为16384
String ringBufferSizeProperty = "nacos.core.notify.ring-buffer-size";
ringBufferSize = Integer.getInteger(ringBufferSizeProperty, 16384);

// The size of the public publisher's message staging queue buffer
// 共享发布者的消息暂存队列缓冲区的大小,默认值为1024
String shareBufferSizeProperty = "nacos.core.notify.share-buffer-size";
shareBufferSize = Integer.getInteger(shareBufferSizeProperty, 1024);
// 通过SPI机制加载事件发布者
final Collection<EventPublisher> publishers = NacosServiceLoader.load(EventPublisher.class);
Iterator<EventPublisher> iterator = publishers.iterator();
// 拿到第一个,如果没有就返回 DefaultPublisher
if (iterator.hasNext()) {
clazz = iterator.next().getClass();
} else {
clazz = DefaultPublisher.class;
}
// 1、生成并初始化 DefaultPublisher
DEFAULT_PUBLISHER_FACTORY = (cls, buffer) -> {
try {
EventPublisher publisher = clazz.newInstance();
publisher.init(cls, buffer);
return publisher;
} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
throw new NacosRuntimeException(SERVER_ERROR, ex);
}
};

try {

// Create and init DefaultSharePublisher instance.
// 2、生成并初始化 DefaultSharePublisher
INSTANCE.sharePublisher = new DefaultSharePublisher();
INSTANCE.sharePublisher.init(SlowEvent.class, shareBufferSize);

} catch (Throwable ex) {
LOGGER.error("Service class newInstance has error : ", ex);
}
// 3、注册关闭钩子
ThreadUtils.addShutdownHook(NotifyCenter::shutdown);
}

@JustForTest
public static Map<String, EventPublisher> getPublisherMap() {
return INSTANCE.publisherMap;
}

@JustForTest
public static EventPublisher getPublisher(Class<? extends Event> topic) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, topic)) {
return INSTANCE.sharePublisher;
}
return INSTANCE.publisherMap.get(topic.getCanonicalName());
}

@JustForTest
public static EventPublisher getSharePublisher() {
return INSTANCE.sharePublisher;
}

/**
* Shutdown the several publisher instance which notify center has.
*/
public static void shutdown() {
if (!CLOSED.compareAndSet(false, true)) {
return;
}
LOGGER.warn("[NotifyCenter] Start destroying Publisher");
// 1、关闭已注册的 DefaultPublisher
for (Map.Entry<String, EventPublisher> entry : INSTANCE.publisherMap.entrySet()) {
try {
EventPublisher eventPublisher = entry.getValue();
eventPublisher.shutdown();
} catch (Throwable e) {
LOGGER.error("[EventPublisher] shutdown has error : ", e);
}
}
// 2、关闭 DefaultSharePublisher
try {
INSTANCE.sharePublisher.shutdown();
} catch (Throwable e) {
LOGGER.error("[SharePublisher] shutdown has error : ", e);
}

LOGGER.warn("[NotifyCenter] Destruction of the end");
}

/**
* Register a Subscriber. If the Publisher concerned by the Subscriber does not exist, then PublihserMap will
* preempt a placeholder Publisher with default EventPublisherFactory first.
*
* @param consumer subscriber
*/
public static void registerSubscriber(final Subscriber consumer) {
registerSubscriber(consumer, DEFAULT_PUBLISHER_FACTORY);
}

/**
* Register a Subscriber. If the Publisher concerned by the Subscriber does not exist, then PublihserMap will
* preempt a placeholder Publisher with specified EventPublisherFactory first.
* 注册订户,如果订阅服务器所关注的发布服务器不存在,则PublihserMap将首先使用指定的EventPublisherFactory抢占占位符发布服务器。
* @param consumer subscriber
* @param factory publisher factory.
*/
public static void registerSubscriber(final Subscriber consumer, final EventPublisherFactory factory) {
// If you want to listen to multiple events, you do it separately,
// based on subclass's subscribeTypes method return list, it can register to publisher.
// 如想侦听多个事件,可以根据子类的subscribeTypes方法返回列表单独执行,它可以向发布者注册。
// SmartSubscriber 类型订阅者注册
if (consumer instanceof SmartSubscriber) {
// 循环遍历事件集合,根据事件类型进行注册
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
// For case, producer: defaultSharePublisher -> consumer: smartSubscriber.
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
// DefaultSharePublisher 中添加 SlowEvent
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
} else {
// For case, producer: defaultPublisher -> consumer: subscriber.
// 向 DefaultPublisher 中注册 Event
addSubscriber(consumer, subscribeType, factory);
}
}
return;
}

// Subscriber 类型订阅者注册
final Class<? extends Event> subscribeType = consumer.subscribeType();
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.addSubscriber(consumer, subscribeType);
return;
}
// 向 DefaultPublisher 中注册 Event
addSubscriber(consumer, subscribeType, factory);
}

/**
* Add a subscriber to publisher.
*
* @param consumer subscriber instance.
* @param subscribeType subscribeType.
* @param factory publisher factory.
*/
private static void addSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType,
EventPublisherFactory factory) {

final String topic = ClassUtils.getCanonicalName(subscribeType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, subscribeType, ringBufferSize);
}
EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher instanceof ShardedEventPublisher) {
((ShardedEventPublisher) publisher).addSubscriber(consumer, subscribeType);
} else {
publisher.addSubscriber(consumer);
}
}

/**
* Deregister subscriber.
* 注销订阅者
* @param consumer subscriber instance.
*/
public static void deregisterSubscriber(final Subscriber consumer) {
// SmartSubscriber 类型订阅者取消注册
if (consumer instanceof SmartSubscriber) {
// 循环遍历事件集合,根据事件类型进行注销
for (Class<? extends Event> subscribeType : ((SmartSubscriber) consumer).subscribeTypes()) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
// DefaultSharePublisher 中删除 SlowEvent
INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);
} else {
// DefaultPublisher 中删除 Event
removeSubscriber(consumer, subscribeType);
}
}
return;
}
// Subscriber 类型订阅者取消注册
final Class<? extends Event> subscribeType = consumer.subscribeType();
if (ClassUtils.isAssignableFrom(SlowEvent.class, subscribeType)) {
INSTANCE.sharePublisher.removeSubscriber(consumer, subscribeType);
return;
}

// DefaultPublisher 中删除 Event
if (removeSubscriber(consumer, subscribeType)) {
return;
}
throw new NoSuchElementException("The subscriber has no event publisher");
}

/**
* Remove subscriber.
*
* @param consumer subscriber instance.
* @param subscribeType subscribeType.
* @return whether remove subscriber successfully or not.
*/
private static boolean removeSubscriber(final Subscriber consumer, Class<? extends Event> subscribeType) {

final String topic = ClassUtils.getCanonicalName(subscribeType);
EventPublisher eventPublisher = INSTANCE.publisherMap.get(topic);
if (null == eventPublisher) {
return false;
}
if (eventPublisher instanceof ShardedEventPublisher) {
((ShardedEventPublisher) eventPublisher).removeSubscriber(consumer, subscribeType);
} else {
eventPublisher.removeSubscriber(consumer);
}
return true;
}

/**
* Request publisher publish event Publishers load lazily, calling publisher. Start () only when the event is
* actually published.
* 请求发布者 发布 事件发布者延迟加载,仅当事件实际发布时才调用发布者 start 方法。
* @param event class Instances of the event.
*/
public static boolean publishEvent(final Event event) {
try {
return publishEvent(event.getClass(), event);
} catch (Throwable ex) {
LOGGER.error("There was an exception to the message publishing : ", ex);
return false;
}
}

/**
* Request publisher publish event Publishers load lazily, calling publisher.
* 请求发布者 发布 事件发布者延迟加载
* @param eventType class Instances type of the event type.
* @param event event instance.
*/
private static boolean publishEvent(final Class<? extends Event> eventType, final Event event) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher.publish(event);
}

final String topic = ClassUtils.getCanonicalName(eventType);

EventPublisher publisher = INSTANCE.publisherMap.get(topic);
if (publisher != null) {
return publisher.publish(event);
}
if (event.isPluginEvent()) {
return true;
}
LOGGER.warn("There are no [{}] publishers for this event, please register", topic);
return false;
}

/**
* Register to share-publisher.
*
* @param eventType class Instances type of the event type.
* @return share publisher instance.
*/
public static EventPublisher registerToSharePublisher(final Class<? extends SlowEvent> eventType) {
return INSTANCE.sharePublisher;
}

/**
* Register publisher with default factory.
* 通过默认工厂注册发布者
* @param eventType class Instances type of the event type.
* @param queueMaxSize the publisher's queue max size.
*/
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType, final int queueMaxSize) {
return registerToPublisher(eventType, DEFAULT_PUBLISHER_FACTORY, queueMaxSize);
}

/**
* Register publisher with specified factory.
* 通过指定工厂注册发布者
* @param eventType class Instances type of the event type.
* @param factory publisher factory.
* @param queueMaxSize the publisher's queue max size.
*/
public static EventPublisher registerToPublisher(final Class<? extends Event> eventType,
final EventPublisherFactory factory, final int queueMaxSize) {
if (ClassUtils.isAssignableFrom(SlowEvent.class, eventType)) {
return INSTANCE.sharePublisher;
}

final String topic = ClassUtils.getCanonicalName(eventType);
synchronized (NotifyCenter.class) {
// MapUtils.computeIfAbsent is a unsafe method.
MapUtil.computeIfAbsent(INSTANCE.publisherMap, topic, factory, eventType, queueMaxSize);
}
return INSTANCE.publisherMap.get(topic);
}

/**
* Register publisher.
* DefaultPublisher容器 publisherMap 中添加 Event:EventPublisher
* @param eventType class Instances type of the event type.
* @param publisher the specified event publisher
*/
public static void registerToPublisher(final Class<? extends Event> eventType, final EventPublisher publisher) {
if (null == publisher) {
return;
}
final String topic = ClassUtils.getCanonicalName(eventType);
synchronized (NotifyCenter.class) {
INSTANCE.publisherMap.putIfAbsent(topic, publisher);
}
}

/**
* Deregister publisher.
* DefaultPublisher容器 publisherMap 中删除 Event:EventPublisher,并将 EventPublisher 关闭
* @param eventType class Instances type of the event type.
*/
public static void deregisterPublisher(final Class<? extends Event> eventType) {
final String topic = ClassUtils.getCanonicalName(eventType);
EventPublisher publisher = INSTANCE.publisherMap.remove(topic);
try {
publisher.shutdown();
} catch (Throwable ex) {
LOGGER.error("There was an exception when publisher shutdown : ", ex);
}
}

}

标签:event,03,return,publisher,nacos,public,subscriber,final
From: https://blog.51cto.com/u_15847681/5826247

相关文章