@ Service("snapshotService")
@Slf4j
public class SnapshotServiceImpl implements SnapshotService {
@ Autowired
private RestHighLevelClient restHighLevelClient;
@ Autowired
private ElasticSearchServiceelasticSearchService;
@ Autowired
private IndicesRestoreMapper indicesRestoreMapper;
@ Autowired
private IndicesRestoreRecordMapper indicesRestoreRecordMapper;
public boolean isRepositoryExists( String repositoryName) throws IOException{
GetRepositoriesRequest repositoriesRequest= new GetRepositoriesRequest();
repositoriesRequest. repositories( new String[]{repositoryName});
boolean flag= false;
GetRepositoriesResponse repositoriesResponse =
restHighLevelClient. snapshot(). getRepository(repositoriesRequest,
RequestOptions. DEFAULT);
if (repositoriesResponse. repositories() != null&& !repositoriesResponse. repositories(). isEmpty()) {
flag= true;
)
return flag;
}
@ Override
public RestoreSnapshotIndicesResponse
restoreSnapshotIndices(RestoreSnapshotIndicesRequest request){
RestoreSnapshotIndicesResponse response = new RestoreSnapshotIndicesResponse();
if ( request == null II request. getSnapShotList()== null 11
request. getSnapShotList(). isEmpty()){
throw new TitanException("所传参数为空!");
}
List
for (SnapShotDTO snapShotDTO:snapShotDTOList){
try{
restoreSnapshot(snapShotDTO. getRepositoryName(), snapShotDTO. getSnapshotName(), snapShotDTO. getIndices(), snapShotDTO. getDelDate(), 0l);
response. setSuccess(0);
} catch (IOException ioe){
log. error("索引恢复发生异常,[快照名:{}] [RepositoryName:{}] [ indices:{}]",snapShotDTO. getSnapshotName(), snapShotDTO. getRepositoryName(),snapShotDTO. getIndices(), ioe);
response. setSuccess(-1);
}
}
return response;
}
private void restoreSnapshot( String repositoryName, String snapshotName, List< String> indicesList, Date delTime, Long id) throws IOException {
List
List
queryIndicesByRepoAndSnapshot(repositoryName, snapshotName);
if(!snapshotInfoList. isEmpty() && !snapshotInfoList. get(0). indices(). isEmpty()){
List< String> snapshotIndicesList =snapshotInfoList. get(0). indices();
indicesList = indicesList. stream(). filter(snapshotInfo →
snapshotIndicesList. contains(snapshotInfo)). collect( Collectors. toList());
if(!indicesList. isEmpty()){
List< String>notExistsIndicesList= indicesList. stream(). filter( index - >!elasticSearchService. isIndexExist( index)
). collect( Collectors. toList());
if( id 0 && !notExistsIndicesList. isEmpty()) {
IndicesRestoreRecord record= new IndicesRestoreRecord();
record. setRepositoryName(repositoryName);
record. setSnapshotName(snapshotName);
record. setIndicesName(JSON. toJSONString(notExistsIndicesList));
record. setIsSuccess( false);
record. setDelTime(delTime);
//插入记录
indicesRestoreRecordMapper. insert( record);
}
for ( String indices: notExistsIndicesList){
IndicesRestore restore= new IndicesRestore();
restore. setId( null);
restore. setDelFlag(0);
restore. setDelTime(delTime);
restore. setRestoredIndexName( indices + "_ restored " +LocalDate. now());
restore. setRestoredTime( new Date());
indicesRestores. add( restore);
}
if (CollectionUtils. isNotEmpty(notExistsIndicesList)){
RestoreSnapshotRequest request = new
RestoreSnapshotRequest(repositoryName, snapshotName);
request. indices(notExistsIndicesList);
log.info ("索引:{},开始恢复。",notExistsIndicesList);
Map< String, Object> map= Maps. newHashMap();
map. put("index. lifecycle. name", null);
request. indexSettings( map);
request. renamePattern("(.+)");
request. renameReplacement("$1 restored _"+LocalDate. now());
request. waitForCompletion( true);
ActionListener
@ Override
public void onResponse(RestoreSnapshotResponserestoreSnapshotResponse){
log.info ("快照恢复onResponse 返回 “ ,indicesRestores);
for (IndicesRestore restore: indicesRestores){
indicesRestoreMapper. insert( restore);
}
//更新回复成功记录
String indicesName =
JSON. toJSONString(notExistsIndicesList);
Example example = newExample(IndicesRestoreRecord. class);
example. createCriteria(). andEqualTo("indicesName",indicesName);
IndicesRestoreRecord indicesRestoreRecord = new IndicesRestoreRecord();
indicesRestoreRecord. setIsSuccess( true);
indicesRestoreRecordMapper. updateByExampleSelective(indicesRestoreRecord, example);
log.info ("快照恢复成功,[indicesList:{}]",indicesRestores);
}
@ Override
public void onFailure( Exception e){
AtomicBoolean flag = new AtomicBoolean( true);
log.info ("onFailure进入,开始校验快照恢复索引是否存在。");
indicesRestores. forEach(indicesRestore->{
log.info ("校验索引{},是否存在。",indicesRestore. getRestoredIndexName());
if
(!elasticSearchService. isIndexExist(indicesRestore. getRestoredIndexName())){
log.info ("快照恢复,索引:{},恢复失败。",indicesRestore. getRestoredIndexName());
flag. set( false);
}
});
log.info ("最终恢复结果:{}",flag.get());
if ( flag. get()){
for (IndicesRestore restore:indicesRestores){
indicesRestoreMapper. insert( restore);
}
//更新回复成功记录
String indicesName =
JSON,toJSONString(notExistsIndicesList);
Example example = new Example(IndicesRestoreRecord. class);
example. createCriteria(). andEqualTo("indicesName", indicesName);
IndicesRestoreRecord indicesRestoreRecord= new
IndicesRestoreRecord();
indicesRestoreRecord. setIsSuccess( true);
indicesRestoreRecordMapper. updateByExampleSelective(indicesRestoreRecord, example);
log.info ("快照恢复成功,[indicesList:{}]",indicesRestores);
} else{
log. error("快照恢复出现异常[indicesList:{}]",indicesRestores, e);
throw new TitanException("快照恢复出现异常");
)
)
};
restHighLevelClient. snapshot(). restoreAsync( request, RequestOptions. DEFAULT, listener);
}
} else{
log. error("传入待恢复的索引列表:{}在快照备份中不存在!",indicesList);
throw new TitanException("传入待恢复的索引列表在快照备份中不存在!");
)
}
}
@ Override
public QueryIndicesByRepoAndSnapshotResponse
queryIndicesByRepoAndSnapshot(QueryIndicesByRepoAndSnapshotRequest request){
QueryIndicesByRepoAndSnapshotResponse response = new QueryIndicesByRepoAndSnapshotResponse();
if ( request == null || StringUtils. isBlank( request. getRepositoryName()) ||StringUtils. isBlank( request. getSnapshotName())){
throw new TitanException("传入参数不可为空");
}
try{
List
queryIndicesByRepoAndSnapshot( request. getRepositoryName(),
request. getSnapshotName());
List< String> indices= new ArrayList<>();
for (SnapshotInfo info:snapshotInfoList){
for ( String indicesName: info. indices()) {
indices. add(indicesName);
}
}
response. setIndicesList( indices);
log.info ("查询索引快照成功{}",indices);
} catch (IOException e){
log. error(" 查 询 索 引 快 照 异 常 [ repository:{}] [ snapshot:{}]", request. getRepositoryName(), request. getSnapshotName(),e);
throw new TitanException("查询索引快照异常!");
}
return response;
}
@ Override
public QueryIndicesByRepoAndSnapshotWithIndicesResponse queryIndicesByRepoAndSnapshotWithIndices(QueryIndicesByRepoAndSnapshotWithIndicesRequest request){
QueryIndicesByRepoAndSnapshotWithIndicesResponse response = new QueryIndicesByRepoAndSnapshotWithIndicesResponse();
if ( request == null || StringUtils. isBlank( request. getRepositoryName()) ||StringUtils. isBlank( request. getSnapshotName())
ll request. getIndicesNameList() == null ll
request. getIndicesNameList(). isEmpty()){
throw new TitanException("传入参数不可为空");
}
try{
List
queryIndicesByRepoAndSnapshot( request. getRepositoryName(),
request. getSnapshotName());
List< String> indices= new ArrayList<>();
List< String>indicesList= new ArrayList<>();
request. getIndicesNameList(). forEach(indicesName->{
indicesList. add(indicesName. replace("*", ""));
});
List< String>snapIndexList=snapshotInfoList. get(0). indices();
for ( String indicesName:snapIndexList){
for ( String nIndicesName:indicesList){
if (indicesName. contains(nIndicesName)){
indices. add(indicesName);
break;
}
}
}
response. setIndicesList( indices);
log.info ("查询索引快照成功{}",indices);
} catch (IOExceptione){
log. error(" 查 询 索 引 快 照 异 常 [ repository:{}] [ snapshot:{}]",
request. getRepositoryName(), request. getSnapshotName(),e);
throw new TitanException("查询索引快照异常!");
}
return response;
}
@ Override
public QuerySnapshotByRepoResponse
querySnapshotByRepo(QuerySnapshotByRepoRequest request){
QuerySnapshotByRepoResponse repoResponse = new QuerySnapshotByRepoResponse();
if ( request
throw new TitanException("参数不可为空");
}
try{
repoResponse. setSnapshotNameList(querySnapshotByRepo( request. getRepositoryName()));
} catch (IOException e){
log. error(" 查 询 索 引 快 照 异 常 , [ repository:{}]",
request. getRepositoryName(), e);
throw new TitanException("查询快照列表异常");
}
return repoResponse;
}
private List
GetSnapshotsRequest request= new GetSnapshotsRequest();
request. repository(repositoryName);
request. snapshots( new String[]{snapshotName});
GetSnapshotsResponse response = restHighLevelClient. snapshot(). get( request, RequestOptions. DEFAULT);
return response. getSnapshots();
}
private List< String> querySnapshotByRepo( String repositoryName) throwsIOException {
GetSnapshotsRequest request = new GetSnapshotsRequest();
request. repository(repositoryName);
GetSnapshotsResponse response = restHighLevelClient. snapshot(). get( request, RequestOptions. DEFAULT);
List< String> list= Lists. newArrayList();
->
response. getSnapshots(). stream(). filter(snapshotInfo
SnapshotState. SUCCESS. equals(snapshotInfo. state())). forEach(snapshotInfo->{
list. add(snapshotInfo. snapshotId(). getName());
});
return list;
}
private CloseIndexResponse closeIndices( List< String>indicesList) throws IOException
{
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(indicesList. toArray( new String[indicesList. size()]));
return restHighLevelClient. indices(). close(closeIndexRequest,RequestOptions. DEFAULT);
}
private void deleteByIndicesName( List< String> indicesNameList) throws IOException
{
for ( String indicesName :indicesNameList){
DeleteIndexRequest request= new DeleteIndexRequest();
request. indices(indicesName);
try{
if (elasticSearchService. isIndexExist(indicesName)) {
AcknowledgedResponse deleteResponse =
restHighLevelClient. indices(). delete( request,RequestOptions. DEFAULT);
if (deleteResponse. isAcknowledged()) {
Example example = new Example(IndicesRestore. class);
example. createCriteria(). andEqualTo("restoredIndexName",
indicesName);
IndicesRestore restore= new IndicesRestore();
restore. setDelFlag(1);
indicesRestoreMapper. updateByExampleSelective( restore,
example);
}
}
} catch ( Exception e){
log. error("索引:{}删除失败",indicesName, e);
throw e;
}
}
}
public void snapshotDelete() throws IOException {
Example example = new Example(IndicesRestore. class);
example. createCriteria(). andEqualTo("delFlag", 0). andLessThan("delTime", new Date());
List
indicesRestoreMapper. selectByExample( example);
List< String>indicesNameList= new ArrayList<>();
for (IndicesRestore restore: restores){
indicesNameList. add( restore. getRestoredIndexName());
}
deleteByIndicesName(indicesNameList);
}
public void restoreIndices( Long id) {
Example example = new Example(IndicesRestoreRecord. class);
Example. Criteria criteria= example. createCriteria();
if( id != null&& id!=0){
criteria. andEqualTo("id", id);
}
criteria. andEqualTo("isSuccess", false);
List
indicesRestoreRecordMapper. selectByExample( example);
for (IndicesRestoreRecord restoreRecord: records){
String jsonIndices = restoreRecord. getIndicesName();
List< String> indicesList =JSONArray. parseArray(jsonIndices, String. class);
List< String>notExistsIndicesList= indicesList. stream(). filter( index - >
!elasticSearchService. isIndexExist( index)
). collect( Collectors. toList());
if(!notExistsIndicesList. isEmpty()) {
try{
restoreSnapshot(restoreRecord. getRepositoryName(),
restoreRecord. getSnapshotName(), notExistsIndicesList, restoreRecord. getDelTime(), restoreRecord. getId());
} catch (IOException e){
log. error("索引恢复发生异常,[快照名:{}] [RepositoryName:{}]
[ indices:{}]", restoreRecord. getSnapshotName(), restoreRecord. getRepositoryName(),notExistsIndicesList,e);
}
}
}
}
public PageInfo
Example example = new Example(IndicesRestoreRecord. class);
Example. Criteria criteria= example. createCriteria();
criteria. andLike("indicesName", "%"+ indexName+"%");
if ( type != null){
criteria. andEqualTo("isSuccess", type);
}
PageHelper. startPage(pageIndex, pageSize);
PageInfo
PageInfo<>(indicesRestoreRecordMapper. selectByExample( example));
return pageInfo;
}
}