背景:记录一次代码优化,CreateIndex中实现ApplicationListener接口导致onApplicationEvent方法多次调用,方法里重复加载该注解的类. this.applicationContext.getBeansWithAnnotation(ESMetaData.class).
排查过程:首先在服务启动run方法打断点,在springboot在加载的过程中,会发布多个事件(event),根据过往所学,实现ApplicationListener接口 onApplicationEvent方法对这些事件(event)去监听,多次加载会出现.
public ConfigurableApplicationContext run(String... args) {
long startTime = System.nanoTime();
DefaultBootstrapContext bootstrapContext = this.createBootstrapContext();
ConfigurableApplicationContext context = null;
this.configureHeadlessProperty();
//1.0 第一次调用
SpringApplicationRunListeners listeners = this.getRunListeners(args);
listeners.starting(bootstrapContext, this.mainApplicationClass);
Throwable ex;
try {
ApplicationArguments applicationArguments = new DefaultApplicationArguments(args);
//2.0 第二次调用
ConfigurableEnvironment environment = this.prepareEnvironment(listeners, bootstrapContext, applicationArguments);
this.configureIgnoreBeanInfo(environment);
Banner printedBanner = this.printBanner(environment);
context = this.createApplicationContext();
context.setApplicationStartup(this.applicationStartup);
//3.0 第三次调用
this.prepareContext(bootstrapContext, context, environment, listeners, applicationArguments, printedBanner);
this.refreshContext(context);
this.afterRefresh(context, applicationArguments);
Duration timeTakenToStartup = Duration.ofNanos(System.nanoTime() - startTime);
if (this.logStartupInfo) {
(new StartupInfoLogger(this.mainApplicationClass)).logStarted(this.getApplicationLog(), timeTakenToStartup);
}
//4.0 第四次调用
listeners.started(context, timeTakenToStartup);
this.callRunners(context, applicationArguments);
} catch (Throwable var12) {
ex = var12;
this.handleRunFailure(context, ex, listeners);
throw new IllegalStateException(ex);
}
try {
Duration timeTakenToReady = Duration.ofNanos(System.nanoTime() - startTime);
//5.0 第五次调用
listeners.ready(context, timeTakenToReady);
return context;
} catch (Throwable var11) {
ex = var11;
//6.0 第六次调用(optional)
this.handleRunFailure(context, ex, (SpringApplicationRunListeners)null);
throw new IllegalStateException(ex);
}
}
解决办法:也是在调试springboot启动的代码中,发现启动最后回调CommandLineRunner或者ApplicationRunner接口的方法,将原有的ApplicationListener接口 替换成这两个当中的一个,即可实现一次调用的效果.
接下来详细分析下这个优化过程,并提供完整的解决方案:
1.0 原始代码(存在问题的实现):
@Component
public class CreateIndex implements ApplicationListener<ApplicationEvent> {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private RestHighLevelClient client;
@Override
public void onApplicationEvent(ApplicationEvent event) {
// 问题代码:会被多次调用
try {
// 获取所有带有@ESMetaData注解的类
Map<String, Object> beansWithAnnotation =
this.applicationContext.getBeansWithAnnotation(ESMetaData.class);
for (Object bean : beansWithAnnotation.values()) {
// 处理每个类的索引创建
processIndexCreation(bean);
}
} catch (Exception e) {
log.error("Create elasticsearch index error", e);
}
}
}
2.0 优化后的代码(使用CommandLineRunner):
@Component
@Slf4j
public class ESIndexInitializer implements CommandLineRunner {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private RestHighLevelClient client;
@Override
public void run(String... args) throws Exception {
initializeIndices();
}
private void initializeIndices() {
try {
log.info("Starting to initialize Elasticsearch indices...");
// 获取所有带有@ESMetaData注解的类
Map<String, Object> beansWithAnnotation =
this.applicationContext.getBeansWithAnnotation(ESMetaData.class);
for (Object bean : beansWithAnnotation.values()) {
Class<?> clazz = AopUtils.getTargetClass(bean);
ESMetaData esMetaData = clazz.getAnnotation(ESMetaData.class);
if (esMetaData != null) {
createOrUpdateIndex(clazz, esMetaData);
}
}
log.info("Elasticsearch indices initialization completed.");
} catch (Exception e) {
log.error("Failed to initialize Elasticsearch indices", e);
throw new RuntimeException("Index initialization failed", e);
}
}
private void createOrUpdateIndex(Class<?> clazz, ESMetaData esMetaData) throws IOException {
String indexName = esMetaData.indexName();
// 检查索引是否存在
boolean indexExists = client.indices()
.exists(new GetIndexRequest(indexName), RequestOptions.DEFAULT);
if (!indexExists) {
// 创建新索引
CreateIndexRequest createIndexRequest = buildCreateIndexRequest(clazz, esMetaData);
CreateIndexResponse createIndexResponse = client.indices()
.create(createIndexRequest, RequestOptions.DEFAULT);
if (createIndexResponse.isAcknowledged()) {
log.info("Successfully created index: {}", indexName);
} else {
log.warn("Failed to create index: {}", indexName);
}
} else {
// 更新现有索引的映射
updateIndexMapping(indexName, clazz);
}
}
private CreateIndexRequest buildCreateIndexRequest(Class<?> clazz, ESMetaData esMetaData) {
CreateIndexRequest request = new CreateIndexRequest(esMetaData.indexName());
// 设置索引配置
request.settings(Settings.builder()
.put("index.number_of_shards", esMetaData.shards())
.put("index.number_of_replicas", esMetaData.replicas())
.put("index.refresh_interval", "30s")
);
// 构建映射
XContentBuilder mapping = buildMapping(clazz);
request.mapping(mapping);
return request;
}
private XContentBuilder buildMapping(Class<?> clazz) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.startObject("properties");
// 处理所有字段
for (Field field : clazz.getDeclaredFields()) {
ESField esField = field.getAnnotation(ESField.class);
if (esField != null) {
builder.startObject(field.getName());
builder.field("type", esField.type());
if (esField.analyzer().length() > 0) {
builder.field("analyzer", esField.analyzer());
}
builder.endObject();
}
}
builder.endObject();
}
builder.endObject();
return builder;
}
private void updateIndexMapping(String indexName, Class<?> clazz) throws IOException {
PutMappingRequest request = new PutMappingRequest(indexName);
request.source(buildMapping(clazz));
AcknowledgedResponse putMappingResponse = client.indices()
.putMapping(request, RequestOptions.DEFAULT);
if (putMappingResponse.isAcknowledged()) {
log.info("Successfully updated mapping for index: {}", indexName);
} else {
log.warn("Failed to update mapping for index: {}", indexName);
}
}
}
3.0 相关注解定义:
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface ESMetaData {
String indexName();
int shards() default 3;
int replicas() default 1;
}
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ESField {
String type();
String analyzer() default "";
}
4.0 使用示例:
@ESMetaData(indexName = "users")
@Data
public class UserIndex {
@ESField(type = "keyword")
private String id;
@ESField(type = "text", analyzer = "ik_max_word")
private String name;
@ESField(type = "integer")
private Integer age;
@ESField(type = "date")
private LocalDateTime createTime;
}
5.0 配置类:
@Configuration
public class ElasticsearchConfig {
@Bean
public RestHighLevelClient elasticsearchClient(
@Value("${elasticsearch.hosts}") String[] hosts) {
HttpHost[] httpHosts = Arrays.stream(hosts)
.map(HttpHost::create)
.toArray(HttpHost[]::new);
return new RestHighLevelClient(
RestClient.builder(httpHosts)
.setRequestConfigCallback(requestConfigBuilder ->
requestConfigBuilder
.setConnectTimeout(5000)
.setSocketTimeout(60000))
.setMaxRetryTimeoutMillis(60000)
);
}
}
主要改进点:
- 初始化时机:
- 从ApplicationListener改为CommandLineRunner
- 确保只执行一次
- 在应用完全启动前执行
- 错误处理:
- 完善的异常处理
- 详细的日志记录
- 失败时快速失败
- 功能增强:
- 支持更新现有索引
- 灵活的映射配置
- 索引设置可配置
- 代码质量:
- 更清晰的结构
- 更好的可维护性
- 更完善的日志
启发:这个优化不仅解决了多次执行的问题,还提供了更完善的索引初始化功能,使代码更加健壮和可维护。
标签:ESMetaData,indexName,ApplicationListener,builder,onApplicationEvent,private,claz From: https://blog.csdn.net/weixin_38804310/article/details/144415943