首页 > 其他分享 >Zookeeper 实现分布式配置管理实现 @Value 的动态变化 (二)

Zookeeper 实现分布式配置管理实现 @Value 的动态变化 (二)

时间:2022-12-14 18:34:09浏览次数:54  
标签:return String Zookeeper 配置管理 Value private key org import


    概述:

    前一篇 zookeeper 实现了的配置管理,但是最后的时候说过没有实现@Value 的动态变化,也就是说没有实现配置文件的动态变化, 今天在昨天的基础上,实现了配置文件的的动态更新,完善了zookeeper 作为配置中心的方案。

   实现思路:

   实现  BeanPostProcessor 的子类的实现,实现对类初始化的监听,并对@Value 注解的Field 进收集,并放到缓存中,当监听到zookeeper 节点发生变化时,进行配置数据的替换。

BeanPostProcessor  子类实现如下:

package com.hou.zk.demo.config;

import com.hou.zk.demo.config.Untils.PlaceholderHelper;
import com.hou.zk.demo.config.Untils.SpringValue;
import com.sun.org.apache.regexp.internal.RE;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.annotation.Configuration;

import java.lang.reflect.Field;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

@Configuration
public class SpringValueProcessor implements BeanPostProcessor {

public static final ConcurrentHashMap<String, SpringValue> cacheMap = new ConcurrentHashMap();

private final PlaceholderHelper placeholderHelper = new PlaceholderHelper();

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
Class obj = bean.getClass();
// 遍历所有属性
List<Field> fields = findAllField(obj);
for (Field field : fields){
// 用到@Value 注解
Value value = field.getAnnotation(Value.class);
if(value!= null){
// 登记下来---保存
Set<String> keys = placeholderHelper.extractPlaceholderKeys(value.value());
for (String key : keys){
SpringValue springValue = new SpringValue(key,value.value(),bean,beanName,field,false);
cacheMap.put(key,springValue);
}
}
}
return bean;
}

private List<Field> findAllField(Class clazz){

final List<Field> res = new LinkedList<>();
Field[] fields = clazz.getDeclaredFields();
for (int i =0;i<fields.length;i++){
res.add(fields[i]);
}
return res;

}
}

 依赖的类如下:

SpringValue.java (@Vaule 的管理bean)和  PlaceholderHelper.java (对@Vaule key 和值的解析) ,这两个类的实现来源与 携程的 apoll ,实现如下:
package com.hou.zk.demo.config.Untils;

import com.google.common.base.Strings;
import com.google.common.collect.Sets;
import java.util.Set;
import java.util.Stack;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanExpressionContext;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.beans.factory.config.Scope;
import org.springframework.util.StringUtils;

/**
* Placeholder helper functions.
*/
public class PlaceholderHelper {

private static final String PLACEHOLDER_PREFIX = "${";
private static final String PLACEHOLDER_SUFFIX = "}";
private static final String VALUE_SEPARATOR = ":";
private static final String SIMPLE_PLACEHOLDER_PREFIX = "{";
private static final String EXPRESSION_PREFIX = "#{";
private static final String EXPRESSION_SUFFIX = "}";

/**
* Resolve placeholder property values, e.g.
* <br />
* <br />
* "${somePropertyValue}" -> "the actual property value"
*/
public Object resolvePropertyValue(ConfigurableBeanFactory beanFactory, String beanName, String placeholder) {
// resolve string value
String strVal = beanFactory.resolveEmbeddedValue(placeholder);

BeanDefinition bd = (beanFactory.containsBean(beanName) ? beanFactory
.getMergedBeanDefinition(beanName) : null);

// resolve expressions like "#{systemProperties.myProp}"
return evaluateBeanDefinitionString(beanFactory, strVal, bd);
}

private Object evaluateBeanDefinitionString(ConfigurableBeanFactory beanFactory, String value,
BeanDefinition beanDefinition) {
if (beanFactory.getBeanExpressionResolver() == null) {
return value;
}
Scope scope = (beanDefinition != null ? beanFactory
.getRegisteredScope(beanDefinition.getScope()) : null);
return beanFactory.getBeanExpressionResolver()
.evaluate(value, new BeanExpressionContext(beanFactory, scope));
}

/**
* Extract keys from placeholder, e.g.
* <ul>
* <li>${some.key} => "some.key"</li>
* <li>${some.key:${some.other.key:100}} => "some.key", "some.other.key"</li>
* <li>${${some.key}} => "some.key"</li>
* <li>${${some.key:other.key}} => "some.key"</li>
* <li>${${some.key}:${another.key}} => "some.key", "another.key"</li>
* <li>#{new java.text.SimpleDateFormat('${some.key}').parse('${another.key}')} => "some.key", "another.key"</li>
* </ul>
*/
public Set<String> extractPlaceholderKeys(String propertyString) {
Set<String> placeholderKeys = Sets.newHashSet();

if (Strings.isNullOrEmpty(propertyString) || (!isNormalizedPlaceholder(propertyString) && !isExpressionWithPlaceholder(propertyString))) {
return placeholderKeys;
}

Stack<String> stack = new Stack<>();
stack.push(propertyString);

while (!stack.isEmpty()) {
String strVal = stack.pop();
int startIndex = strVal.indexOf(PLACEHOLDER_PREFIX);
if (startIndex == -1) {
placeholderKeys.add(strVal);
continue;
}
int endIndex = findPlaceholderEndIndex(strVal, startIndex);
if (endIndex == -1) {
// invalid placeholder?
continue;
}

String placeholderCandidate = strVal.substring(startIndex + PLACEHOLDER_PREFIX.length(), endIndex);

// ${some.key:other.key}
if (placeholderCandidate.startsWith(PLACEHOLDER_PREFIX)) {
stack.push(placeholderCandidate);
} else {
// some.key:${some.other.key:100}
int separatorIndex = placeholderCandidate.indexOf(VALUE_SEPARATOR);

if (separatorIndex == -1) {
stack.push(placeholderCandidate);
} else {
stack.push(placeholderCandidate.substring(0, separatorIndex));
String defaultValuePart =
normalizeToPlaceholder(placeholderCandidate.substring(separatorIndex + VALUE_SEPARATOR.length()));
if (!Strings.isNullOrEmpty(defaultValuePart)) {
stack.push(defaultValuePart);
}
}
}

// has remaining part, e.g. ${a}.${b}
if (endIndex + PLACEHOLDER_SUFFIX.length() < strVal.length() - 1) {
String remainingPart = normalizeToPlaceholder(strVal.substring(endIndex + PLACEHOLDER_SUFFIX.length()));
if (!Strings.isNullOrEmpty(remainingPart)) {
stack.push(remainingPart);
}
}
}

return placeholderKeys;
}

private boolean isNormalizedPlaceholder(String propertyString) {
return propertyString.startsWith(PLACEHOLDER_PREFIX) && propertyString.contains(PLACEHOLDER_SUFFIX);
}

private boolean isExpressionWithPlaceholder(String propertyString) {
return propertyString.startsWith(EXPRESSION_PREFIX) && propertyString.contains(EXPRESSION_SUFFIX)
&& propertyString.contains(PLACEHOLDER_PREFIX) && propertyString.contains(PLACEHOLDER_SUFFIX);
}

private String normalizeToPlaceholder(String strVal) {
int startIndex = strVal.indexOf(PLACEHOLDER_PREFIX);
if (startIndex == -1) {
return null;
}
int endIndex = strVal.lastIndexOf(PLACEHOLDER_SUFFIX);
if (endIndex == -1) {
return null;
}

return strVal.substring(startIndex, endIndex + PLACEHOLDER_SUFFIX.length());
}

private int findPlaceholderEndIndex(CharSequence buf, int startIndex) {
int index = startIndex + PLACEHOLDER_PREFIX.length();
int withinNestedPlaceholder = 0;
while (index < buf.length()) {
if (StringUtils.substringMatch(buf, index, PLACEHOLDER_SUFFIX)) {
if (withinNestedPlaceholder > 0) {
withinNestedPlaceholder--;
index = index + PLACEHOLDER_SUFFIX.length();
} else {
return index;
}
} else if (StringUtils.substringMatch(buf, index, SIMPLE_PLACEHOLDER_PREFIX)) {
withinNestedPlaceholder++;
index = index + SIMPLE_PLACEHOLDER_PREFIX.length();
} else {
index++;
}
}
return -1;
}
}
package com.hou.zk.demo.config.Untils;

import java.lang.ref.WeakReference;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Type;
import org.springframework.core.MethodParameter;

/**
* Spring @Value method info
*
* @author github.com/zhegexiaohuozi [email protected]
* @since 2018/2/6.
*/
public class SpringValue {

private MethodParameter methodParameter;
private Field field;
private WeakReference<Object> beanRef;
private String beanName;
private String key;
private String placeholder;
private Class<?> targetType;
private Type genericType;
private boolean isJson;

public SpringValue(String key, String placeholder, Object bean, String beanName, Field field, boolean isJson) {
this.beanRef = new WeakReference<>(bean);
this.beanName = beanName;
this.field = field;
this.key = key;
this.placeholder = placeholder;
this.targetType = field.getType();
this.isJson = isJson;
if(isJson){
this.genericType = field.getGenericType();
}
}

public SpringValue(String key, String placeholder, Object bean, String beanName, Method method, boolean isJson) {
this.beanRef = new WeakReference<>(bean);
this.beanName = beanName;
this.methodParameter = new MethodParameter(method, 0);
this.key = key;
this.placeholder = placeholder;
Class<?>[] paramTps = method.getParameterTypes();
this.targetType = paramTps[0];
this.isJson = isJson;
if(isJson){
this.genericType = method.getGenericParameterTypes()[0];
}
}

public void update(Object newVal) throws IllegalAccessException, InvocationTargetException {
if (isField()) {
injectField(newVal);
} else {
injectMethod(newVal);
}
}

private void injectField(Object newVal) throws IllegalAccessException {
Object bean = beanRef.get();
if (bean == null) {
return;
}
boolean accessible = field.isAccessible();
field.setAccessible(true);
field.set(bean, newVal);
field.setAccessible(accessible);
}

private void injectMethod(Object newVal)
throws InvocationTargetException, IllegalAccessException {
Object bean = beanRef.get();
if (bean == null) {
return;
}
methodParameter.getMethod().invoke(bean, newVal);
}

public String getBeanName() {
return beanName;
}

public Class<?> getTargetType() {
return targetType;
}

public String getPlaceholder() {
return this.placeholder;
}

public MethodParameter getMethodParameter() {
return methodParameter;
}

public boolean isField() {
return this.field != null;
}

public Field getField() {
return field;
}

public Type getGenericType() {
return genericType;
}

public boolean isJson() {
return isJson;
}

boolean isTargetBeanValid() {
return beanRef.get() != null;
}

@Override
public String toString() {
Object bean = beanRef.get();
if (bean == null) {
return "";
}
if (isField()) {
return String
.format("key: %s, beanName: %s, field: %s.%s", key, beanName, bean.getClass().getName(), field.getName());
}
return String.format("key: %s, beanName: %s, method: %s.%s", key, beanName, bean.getClass().getName(),
methodParameter.getMethod().getName());
}
}

调用部分如下:

package com.hou.zk.demo.config;


import com.hou.zk.demo.config.Untils.PlaceholderHelper;
import com.hou.zk.demo.config.Untils.SpringValue;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.retry.RetryOneTime;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.env.EnvironmentPostProcessor;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class ZookeeperEnvironmentPostProcessor implements EnvironmentPostProcessor {




@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {

try {

System.out.println("获取ev对象,初始化配置");
String configCenterUrl = environment.getProperty("config.zookeeper.url");
String configNodename = environment.getProperty("config.zookeeper.nodename");

CuratorFramework zkClient = CuratorFrameworkFactory.newClient(configCenterUrl,new RetryOneTime(1000));
zkClient.start();
// 1. TODO 获取节点下的字节点,下面的每一个子节点都代表一项配置。
List<String> strings = null;

Map<String, Object> configMap = new HashMap<>();
List<String> configNames = zkClient.getChildren().forPath("/"+configNodename);
for (String configName:configNames) {
byte[] value = zkClient.getData().forPath("/"+configNodename+"/"+configName);
configMap.put(configName,new String(value));
System.out.println(configName+":"+ new String(value));
}
// 2 TODO 放到 spring 容器里面
MapPropertySource propertySource = new MapPropertySource("zkPropertySource",configMap);
environment.getPropertySources().addFirst(propertySource);

// 3 TODO 如何动态获取zookper 配置
TreeCache treeCache = new TreeCache(zkClient,"/"+configNodename);
treeCache.start();
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {

switch(treeCacheEvent.getType()){
case NODE_UPDATED:
System.out.println("收到了数据变化通知"+treeCacheEvent.getData());
String configName = treeCacheEvent.getData().getPath().replace("/"+configNodename+"/","");
String newValue = new String(treeCacheEvent.getData().getData());
// TODO spring 已经创建对象, @value 注入的值如何改变
SpringValueProcessor.cacheMap.get(configName).update(newValue);
break;
default:
break;
}
}
});

}catch (Exception ex){
ex.printStackTrace();
}

}
}

总结说明:

到此两篇实现了zookeeper 实现分布式配置文件的动态切换。

 

 

标签:return,String,Zookeeper,配置管理,Value,private,key,org,import
From: https://blog.51cto.com/u_15461374/5938170

相关文章

  • Eureka和Zookeeper比较
    SpringCloud中,除了可以使用Eureka作为注册中心外,还可以通过配置的方式使用Zookeeper作为注册中心。ZooKeeper是Apache软件基金会的一个软件项目,它为大型分布式计算提供......
  • zookeeper+kafka
    一、Zookeeper概述1、Zookeeper定义Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。2、Zookeeper工作机制Zookeeper从设计模式角度来理解:......
  • zookeeper数据迁移及恢复
    在应用系统上云迁移的时候经常会遇到中间件的数据迁移,今天就和大家介绍一下zookeeper的数据如何迁移与恢复。第一步:从原系统环境zookeeper服务器的数据目录下复制最新的日......
  • 基于zookeeper的kafka中间件
    一、Zookeeper概述1.1Zookeeper定义Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。1.2Zookeeper工作机制Zookeeper从设计模式角度来理解:是......
  • javascript如何获取对象的key和value
    1、获取key值获取对象所有key的方法,需要使用Object.keys(obj)方法,Object.keys(obj)方返回一个数组,这个数组包含obj对象中的所有key。其中obj就是你写的对象,具体使用方......
  • 基于zookeeper的kafka中间件
    一、Zookeeper概述1、Zookeeper定义Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目。2、Zookeeper工作机制Zookeeper从设计模式角度来理解:是......
  • The XML for Analysis request timed out before it was completed. Timeout value: 1
        数据源错误:TheXMLforAnalysisrequesttimedoutbeforeitwascompleted.Timeoutvalue:18000sec.群集URI:WABI-EAST-ASIA-A-PRIMARY-redirect......
  • Zookeeper集群部署
    一、Zookeeper概述1、Zookeeper定义Zookeeper是一个开源的分布式的,为分布式框架提供协调服务的Apache项目2、Zookeeper工作机制Zookeeper从设计模式角度来理解:是一个......
  • Zookeeper+Kafka集群
    一、Zookeeper概述1、Zookeeper概述Zookeeper:是一个分布式的、开源的程序协调服务,是hadoop项目下的一个子项目。他提供的主要功能包括:配置管理、名字服务、分布式锁、......
  • zookeeper和消息队列kafka
    一、Zookeeper是什么?1、Zookeeper服务集群的条件2、Zookeeper工作机制3、Zookeeper数据结构4、Zookeper特点5、Zookeeper选举机制6、Zookeeper应用场景二、Zookeepe......