首页 > 其他分享 >snapshot源代码

snapshot源代码

时间:2024-11-17 10:56:37浏览次数:1  
标签:indicesName String List request snapshot indices new 源代码

@ Service("snapshotService")
@Slf4j
public class SnapshotServiceImpl implements SnapshotService {
@ Autowired
private RestHighLevelClient restHighLevelClient;
@ Autowired
private ElasticSearchServiceelasticSearchService;
@ Autowired
private IndicesRestoreMapper indicesRestoreMapper;
@ Autowired
private IndicesRestoreRecordMapper indicesRestoreRecordMapper;
public boolean isRepositoryExists( String repositoryName) throws IOException{
GetRepositoriesRequest repositoriesRequest= new GetRepositoriesRequest();
repositoriesRequest. repositories( new String[]{repositoryName});
boolean flag= false;
GetRepositoriesResponse repositoriesResponse =
restHighLevelClient. snapshot(). getRepository(repositoriesRequest,
RequestOptions. DEFAULT);
if (repositoriesResponse. repositories() != null&& !repositoriesResponse. repositories(). isEmpty()) {
flag= true;
)
return flag;
}
@ Override
public RestoreSnapshotIndicesResponse
restoreSnapshotIndices(RestoreSnapshotIndicesRequest request){
RestoreSnapshotIndicesResponse response = new RestoreSnapshotIndicesResponse();
if ( request == null II request. getSnapShotList()== null 11
request. getSnapShotList(). isEmpty()){
throw new TitanException("所传参数为空!");
}
List snapShotDTOList= request. getSnapShotList();
for (SnapShotDTO snapShotDTO:snapShotDTOList){
try{
restoreSnapshot(snapShotDTO. getRepositoryName(), snapShotDTO. getSnapshotName(), snapShotDTO. getIndices(), snapShotDTO. getDelDate(), 0l);
response. setSuccess(0);
} catch (IOException ioe){
log. error("索引恢复发生异常,[快照名:{}] [RepositoryName:{}] [ indices:{}]",snapShotDTO. getSnapshotName(), snapShotDTO. getRepositoryName(),snapShotDTO. getIndices(), ioe);
response. setSuccess(-1);
}
}
return response;
}
private void restoreSnapshot( String repositoryName, String snapshotName, List< String> indicesList, Date delTime, Long id) throws IOException {
ListindicesRestores= new ArrayList<>();
List snapshotInfoList =
queryIndicesByRepoAndSnapshot(repositoryName, snapshotName);
if(!snapshotInfoList. isEmpty() && !snapshotInfoList. get(0). indices(). isEmpty()){
List< String> snapshotIndicesList =snapshotInfoList. get(0). indices();
indicesList = indicesList. stream(). filter(snapshotInfo →
snapshotIndicesList. contains(snapshotInfo)). collect( Collectors. toList());
if(!indicesList. isEmpty()){
List< String>notExistsIndicesList= indicesList. stream(). filter( index - >!elasticSearchService. isIndexExist( index)
). collect( Collectors. toList());
if( id 0 && !notExistsIndicesList. isEmpty()) {
IndicesRestoreRecord record= new IndicesRestoreRecord();
record. setRepositoryName(repositoryName);
record. setSnapshotName(snapshotName);
record. setIndicesName(JSON. toJSONString(notExistsIndicesList));
record. setIsSuccess( false);
record. setDelTime(delTime);
//插入记录
indicesRestoreRecordMapper. insert( record);
}
for ( String indices: notExistsIndicesList){
IndicesRestore restore= new IndicesRestore();
restore. setId( null);
restore. setDelFlag(0);
restore. setDelTime(delTime);
restore. setRestoredIndexName( indices + "_ restored " +LocalDate. now());
restore. setRestoredTime( new Date());
indicesRestores. add( restore);
}
if (CollectionUtils. isNotEmpty(notExistsIndicesList)){
RestoreSnapshotRequest request = new
RestoreSnapshotRequest(repositoryName, snapshotName);
request. indices(notExistsIndicesList);
log.info ("索引:{},开始恢复。",notExistsIndicesList);
Map< String, Object> map= Maps. newHashMap();
map. put("index. lifecycle. name", null);
request. indexSettings( map);
request. renamePattern("(.+)");
request. renameReplacement("$1
restored _"+LocalDate. now());
request. waitForCompletion( true);
ActionListener listener = new ActionListener(){
@ Override
public void onResponse(RestoreSnapshotResponserestoreSnapshotResponse){
log.info ("快照恢复onResponse 返回 “ ,indicesRestores);
for (IndicesRestore restore: indicesRestores){
indicesRestoreMapper. insert( restore);
}
//更新回复成功记录
String indicesName =
JSON. toJSONString(notExistsIndicesList);
Example example = newExample(IndicesRestoreRecord. class);
example. createCriteria(). andEqualTo("indicesName",indicesName);
IndicesRestoreRecord indicesRestoreRecord = new IndicesRestoreRecord();
indicesRestoreRecord. setIsSuccess( true);
indicesRestoreRecordMapper. updateByExampleSelective(indicesRestoreRecord, example);
log.info ("快照恢复成功,[indicesList:{}]",indicesRestores);
}
@ Override
public void onFailure( Exception e){
AtomicBoolean flag = new AtomicBoolean( true);
log.info ("onFailure进入,开始校验快照恢复索引是否存在。");
indicesRestores. forEach(indicesRestore->{
log.info ("校验索引{},是否存在。",indicesRestore. getRestoredIndexName());
if
(!elasticSearchService. isIndexExist(indicesRestore. getRestoredIndexName())){
log.info ("快照恢复,索引:{},恢复失败。",indicesRestore. getRestoredIndexName());
flag. set( false);
}
});
log.info ("最终恢复结果:{}",flag.get());
if ( flag. get()){
for (IndicesRestore restore:indicesRestores){
indicesRestoreMapper. insert( restore);
}
//更新回复成功记录
String indicesName =
JSON,toJSONString(notExistsIndicesList);
Example example = new Example(IndicesRestoreRecord. class);
example. createCriteria(). andEqualTo("indicesName", indicesName);
IndicesRestoreRecord indicesRestoreRecord= new
IndicesRestoreRecord();
indicesRestoreRecord. setIsSuccess( true);
indicesRestoreRecordMapper. updateByExampleSelective(indicesRestoreRecord, example);
log.info ("快照恢复成功,[indicesList:{}]",indicesRestores);
} else{
log. error("快照恢复出现异常[indicesList:{}]",indicesRestores, e);
throw new TitanException("快照恢复出现异常");
)
)
};
restHighLevelClient. snapshot(). restoreAsync( request, RequestOptions. DEFAULT, listener);
}
} else{
log. error("传入待恢复的索引列表:{}在快照备份中不存在!",indicesList);
throw new TitanException("传入待恢复的索引列表在快照备份中不存在!");
)
}
}
@ Override
public QueryIndicesByRepoAndSnapshotResponse
queryIndicesByRepoAndSnapshot(QueryIndicesByRepoAndSnapshotRequest request){
QueryIndicesByRepoAndSnapshotResponse response = new QueryIndicesByRepoAndSnapshotResponse();
if ( request == null || StringUtils. isBlank( request. getRepositoryName()) ||StringUtils. isBlank( request. getSnapshotName())){
throw new TitanException("传入参数不可为空");
}
try{
List snapshotInfoList =
queryIndicesByRepoAndSnapshot( request. getRepositoryName(),
request. getSnapshotName());
List< String> indices= new ArrayList<>();
for (SnapshotInfo info:snapshotInfoList){
for ( String indicesName: info. indices()) {
indices. add(indicesName);
}
}
response. setIndicesList( indices);
log.info ("查询索引快照成功{}",indices);
} catch (IOException e){
log. error(" 查 询 索 引 快 照 异 常 [ repository:{}] [ snapshot:{}]", request. getRepositoryName(), request. getSnapshotName(),e);
throw new TitanException("查询索引快照异常!");
}
return response;
}
@ Override
public QueryIndicesByRepoAndSnapshotWithIndicesResponse queryIndicesByRepoAndSnapshotWithIndices(QueryIndicesByRepoAndSnapshotWithIndicesRequest request){
QueryIndicesByRepoAndSnapshotWithIndicesResponse response = new QueryIndicesByRepoAndSnapshotWithIndicesResponse();
if ( request == null || StringUtils. isBlank( request. getRepositoryName()) ||StringUtils. isBlank( request. getSnapshotName())
ll request. getIndicesNameList() == null ll
request. getIndicesNameList(). isEmpty()){
throw new TitanException("传入参数不可为空");
}
try{
List snapshotInfoList =
queryIndicesByRepoAndSnapshot( request. getRepositoryName(),
request. getSnapshotName());
List< String> indices= new ArrayList<>();
List< String>indicesList= new ArrayList<>();
request. getIndicesNameList(). forEach(indicesName->{
indicesList. add(indicesName. replace("*", ""));
});
List< String>snapIndexList=snapshotInfoList. get(0). indices();
for ( String indicesName:snapIndexList){
for ( String nIndicesName:indicesList){
if (indicesName. contains(nIndicesName)){
indices. add(indicesName);
break;
}
}
}
response. setIndicesList( indices);
log.info ("查询索引快照成功{}",indices);
} catch (IOExceptione){
log. error(" 查 询 索 引 快 照 异 常 [ repository:{}] [ snapshot:{}]",
request. getRepositoryName(), request. getSnapshotName(),e);
throw new TitanException("查询索引快照异常!");
}
return response;
}
@ Override
public QuerySnapshotByRepoResponse
querySnapshotByRepo(QuerySnapshotByRepoRequest request){
QuerySnapshotByRepoResponse repoResponse = new QuerySnapshotByRepoResponse();
if ( request
null|| StringUtils. isBlank( request. getRepositoryName())){
throw new TitanException("参数不可为空");
}
try{
repoResponse. setSnapshotNameList(querySnapshotByRepo( request. getRepositoryName()));
} catch (IOException e){
log. error(" 查 询 索 引 快 照 异 常 , [ repository:{}]",
request. getRepositoryName(), e);
throw new TitanException("查询快照列表异常");
}
return repoResponse;
}
private ListqueryIndicesByRepoAndSnapshot( String repositoryName, String snapshotName) throws IOException {
GetSnapshotsRequest request= new GetSnapshotsRequest();
request. repository(repositoryName);
request. snapshots( new String[]{snapshotName});
GetSnapshotsResponse response = restHighLevelClient. snapshot(). get( request, RequestOptions. DEFAULT);
return response. getSnapshots();
}
private List< String> querySnapshotByRepo( String repositoryName) throwsIOException {
GetSnapshotsRequest request = new GetSnapshotsRequest();
request. repository(repositoryName);
GetSnapshotsResponse response = restHighLevelClient. snapshot(). get( request, RequestOptions. DEFAULT);
List< String> list= Lists. newArrayList();
->
response. getSnapshots(). stream(). filter(snapshotInfo
SnapshotState. SUCCESS. equals(snapshotInfo. state())). forEach(snapshotInfo->{
list. add(snapshotInfo. snapshotId(). getName());
});
return list;
}
private CloseIndexResponse closeIndices( List< String>indicesList) throws IOException
{
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(indicesList. toArray( new String[indicesList. size()]));
return restHighLevelClient. indices(). close(closeIndexRequest,RequestOptions. DEFAULT);
}
private void deleteByIndicesName( List< String> indicesNameList) throws IOException
{
for ( String indicesName :indicesNameList){
DeleteIndexRequest request= new DeleteIndexRequest();
request. indices(indicesName);
try{
if (elasticSearchService. isIndexExist(indicesName)) {
AcknowledgedResponse deleteResponse =
restHighLevelClient. indices(). delete( request,RequestOptions. DEFAULT);
if (deleteResponse. isAcknowledged()) {
Example example = new Example(IndicesRestore. class);
example. createCriteria(). andEqualTo("restoredIndexName",
indicesName);
IndicesRestore restore= new IndicesRestore();
restore. setDelFlag(1);
indicesRestoreMapper. updateByExampleSelective( restore,
example);
}
}
} catch ( Exception e){
log. error("索引:{}删除失败",indicesName, e);
throw e;
}
}
}
public void snapshotDelete() throws IOException {
Example example = new Example(IndicesRestore. class);
example. createCriteria(). andEqualTo("delFlag", 0). andLessThan("delTime", new Date());
List restores =
indicesRestoreMapper. selectByExample( example);
List< String>indicesNameList= new ArrayList<>();
for (IndicesRestore restore: restores){
indicesNameList. add( restore. getRestoredIndexName());
}
deleteByIndicesName(indicesNameList);
}
public void restoreIndices( Long id) {
Example example = new Example(IndicesRestoreRecord. class);
Example. Criteria criteria= example. createCriteria();
if( id != null&& id!=0){
criteria. andEqualTo("id", id);
}
criteria. andEqualTo("isSuccess", false);
List records =
indicesRestoreRecordMapper. selectByExample( example);
for (IndicesRestoreRecord restoreRecord: records){
String jsonIndices = restoreRecord. getIndicesName();
List< String> indicesList =JSONArray. parseArray(jsonIndices, String. class);
List< String>notExistsIndicesList= indicesList. stream(). filter( index - >
!elasticSearchService. isIndexExist( index)
). collect( Collectors. toList());
if(!notExistsIndicesList. isEmpty()) {
try{
restoreSnapshot(restoreRecord. getRepositoryName(),
restoreRecord. getSnapshotName(), notExistsIndicesList, restoreRecord. getDelTime(), restoreRecord. getId());
} catch (IOException e){
log. error("索引恢复发生异常,[快照名:{}] [RepositoryName:{}]
[ indices:{}]", restoreRecord. getSnapshotName(), restoreRecord. getRepositoryName(),notExistsIndicesList,e);
}
}
}
}
public PageInfo search( String indexName, Boolean type, intpageIndex, int pageSize){
Example example = new Example(IndicesRestoreRecord. class);
Example. Criteria criteria= example. createCriteria();
criteria. andLike("indicesName", "%"+ indexName+"%");
if ( type != null){
criteria. andEqualTo("isSuccess", type);
}
PageHelper. startPage(pageIndex, pageSize);
PageInfo pageInfo = new
PageInfo<>(indicesRestoreRecordMapper. selectByExample( example));
return pageInfo;
}
}

标签:indicesName,String,List,request,snapshot,indices,new,源代码
From: https://www.cnblogs.com/lmzzr24/p/18550355

相关文章

  • 源代码
    importcom.example.entity.Department;importcom.example.mapper.DepartmentMapper;importcom.example.request.DelDepartmentPermissionRequest;importcom.example.response.DelDepartmentPermissionResponse;importcom.example.service.DepartmentPermissionServic......
  • GoldenGate抽取进程延迟严重,论FETCHOPTIONS NOUSESNAPSHOT的重要性
    1、案例概述同事新搭建的一套GoldenGate环境,刚刚搭建时,Extract抽取进程就已经出现延迟现象,当时想着可能很快就能追平,所以最开始也没当回事。结果两天时间,延迟现象没有缓解,已经累积延迟30多个小时。通过info或者stats等命令查看进程状态信息,发现该Extract抽取进程仍然在工作,只......
  • 如何查看 SAP ABAP Kernel Module 的源代码
    StackOverflow上有个网友提问,想查看ABAP系统生成UUID方法,在KernelModulepf_create_uuid16c32里实现的源代码:https://stackoverflow.com/questions/42110195/how-to-open-kernel-module-in-abap笔者之前的文章提到,类似上图高亮这种ABAPKernelModule,通过C语......
  • 接口1源代码分析
    当然可以,下面是对addDepartmentPermission方法的详细解释,包括每一行代码的功能和作用:方法签名@Override@Transactional(rollbackFor=Exception.class)publicAddDepartmentPermissionResponseaddDepartmentPermission(AddDepartmentPermissionRequestrequest){@Over......
  • Ogre默认资源代码中加载
    //OgreResources Ogre::ResourceGroupManager::getSingleton().addResourceLocation(QString(OGRE_PLUGIN_DIR).toStdString()+"/../Media/Main","FileSystem","OgreInternal"); Ogre::ResourceGroupManager::getSingleton().addResour......
  • JAVA毕业设计198—基于Java+Springboot+vue3的健身房管理系统(源代码+数据库)
    毕设所有选题:https://blog.csdn.net/2303_76227485/article/details/131104075基于Java+Springboot+vue3的健身房管理系统(源代码+数据库)198一、系统介绍本项目前后端分离(可以改为ssm版本),分为用户、管理员两种角色1、用户:注册、登录、公告、论坛交流、健身课程购买......
  • AI带货主播背景替换与虚拟场景融合的源代码!
    AI带货主播作为新兴的直播形式,正在逐步改变我们的购物体验,借助先进的人工智能技术,带货主播可以在虚拟环境中进行直播,背景可以实时替换,场景也可以灵活变换。这不仅增强了直播的趣味性,也大大提升了观众的沉浸感,接下来,本文将分享一些关于AI带货主播背景替换与虚拟场景融合的源代......
  • 基于51单片机温湿度采集数码管手机app显示+源代码程序+proteus仿真+dht11温湿度传感器
    一、设计简介本项目是简单的物联网(IoT)应用,使用51单片机STC89C52与温湿度传感器DHT11相结合,测量温湿度数据显示在八位数码管,同时通过WiFi模块ESP8266(ESP-01S)将测得的温湿度数据发送到手机APP上,而手机APP则使用AppInventor进行开发。二、功能设计1、数码管实时显示当前温湿度......
  • 【matlab代码】3个模型的IMM例程(匀速、左转、右转),附源代码(可复制粘贴)
    文章目录3个模型的IMM源代码运行结果代码介绍总结3个模型的IMM代码实现了基于IMMIMMIMM(InteractingMultipleModel)算法的目标跟踪。它使用三种不同的运动模型(匀速直线运动、左转弯和右转弯)来预测目标的位置,并通过卡尔曼滤波进行状......
  • Eagle + PlantUML,轻松绘制源代码图表(下)
    快速掌握大型代码文件的对象框架结构及各对象之间的调用关系,我们通常会用到类图和时序图。如何快速理解代码,并生成UML图表呢?网友“车到山前必有路”前阵子使用Eagle辅助画图。仅用了半个小时,就完成了一张类图和一张时序图的绘制。要知道这在过去有可能会消耗掉一个专家级......