SPI学习记录
场景
采用Netty实现一个RPC框架,对于其中的服务发现机制,我们希望具有较强的扩展性,因此采用SPI机制来实现。
- 首先定义一个SPI注解,所有带有SPI注解的类支持SPI机制。
SPI注解类
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface SPI {
}
服务发现接口
@SPI
public interface ServiceDiscovery {
/**
* lookup service by rpcServiceName
*
* @param rpcRequest rpc service pojo
* @return service address
*/
InetSocketAddress lookupService(RpcRequest rpcRequest);
}
- 定义扩展加载类
其中包含3个static final修饰的类变量,分别对应服务发现的目录、扩展类加载器缓存、扩展类缓存。
对于每个类加载器实例对象,需要维护3个信息:目标加载类class对象(对应接口类)、目标加载类的实例对象(这里用holder封装了一层)、目标加载类的class对象(对应实际加载类)
扩展加载类
public final class ExtensionLoader<T> {
private static final String SERVICE_DIRECTORY = "META-INF/extensions/";
private static final Map<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();
private static final Map<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>();
private final Class<?> type;
private final Map<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();
- 通过类加载器ExtensionLoader,加载指定的类即可直接使用
客户端加载服务发现机制
public NettyRpcClient() {
// initialize resources such as EventLoopGroup, Bootstrap
eventLoopGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
// The timeout period of the connection.
// If this time is exceeded or the connection cannot be established, the connection fails.
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
// If no data is sent to the server within 15 seconds, a heartbeat request is sent
p.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));
p.addLast(new RpcMessageEncoder());
p.addLast(new RpcMessageDecoder());
p.addLast(new NettyRpcClientHandler());
}
});
this.serviceDiscovery = ExtensionLoader.getExtensionLoader(ServiceDiscovery.class).getExtension("zk");
this.unprocessedRequests = SingletonFactory.getInstance(UnprocessedRequests.class);
this.channelProvider = SingletonFactory.getInstance(ChannelProvider.class);
}