背景
Elasticsearch升级到8.11了,对应的客户端插件也跟着升级,以为会兼容Nest的语法,发现自己Too Young Too Simple!
没办法,只能去官网找,结果官网只有最基本的增、删、改、查,只能继续查资料,发现网上资料很少,避免大家少走弯路,这里做个简单的分享。
分享
1.ElasticsearchClient
var esSetting = _baseConfig.Es;
if (esSetting == null || string.IsNullOrEmpty(esSetting.EsHosts))
{
throw new Exception("未配置EsHosts");
}
//读取ES集群配置
var nodeUris = GetAllNodes(esSetting);
services.AddSingleton<ElasticsearchClient>(provider =>
{
//这里用的StaticNodePool,单节点和集群都支持,大家根据自己实际情况选择
var pool = new StaticNodePool(nodeUris);
var settings = new ElasticsearchClientSettings(pool);
//如果设置了账号密码
if (!string.IsNullOrEmpty(esSetting.EsUser) && !string.IsNullOrEmpty(esSetting.EsPassword))
{
settings.Authentication(new BasicAuthentication(esSetting.EsUser, esSetting.EsPassword));
}
return new ElasticsearchClient(settings);
});
return services;
//settings.EsHosts示例:http://192.168.1.100:9200;http://192.168.1.101:9200;http://192.168.1.102:9200
private static List<Uri> GetAllNodes(EsConfig settings)
{
var nodes = settings.EsHosts.Trim();
string[] nodeshost = nodes.Split(';');
var nodUris = new List<Uri>();
for (int i = 0; i < nodeshost.Length; i++)
{
if (!string.IsNullOrEmpty(nodeshost[i]))
nodUris.Add(new Uri(nodeshost[i]));
}
return nodUris;
}
2.具体使用
2.1注入_client
public class ElasticSearchForInformalEssay : IElasticSearchForInformalEssay
{
private readonly string IndexName = "informal_essay_info";
private readonly ILogger<ElasticSearchForInformalEssay> _logger;
protected ElasticsearchClient _client { get; set; }
public ElasticSearchForInformalEssay(ILogger<ElasticSearchForInformalEssay> logger, ElasticsearchClient client)
{
_logger = logger;
_client = client;
}
}
2.2增加-增加跟Nest版本基本一致
/// <summary>
/// 添加单条数据
/// </summary>
/// <param name="InformalEssay"></param>
/// <returns></returns>
public OutPutResult AddIndex(informal_essay_info InformalEssay)
{
try
{
var resp = _client.Index<informal_essay_info>(InformalEssay, x => x.Index(IndexName));
if (resp.IsValidResponse)
{
return OutPutResult.ok();
}
else
{
return OutPutResult.error(resp.DebugInformation);
}
}
catch (Exception ex)
{
return OutPutResult.error(ex.Message);
}
}
/// <summary>
/// 添加多条数据
/// </summary>
/// <param name="InformalEssays"></param>
/// <returns></returns>
public OutPutResult BulkAddIndex(List<informal_essay_info> InformalEssays)
{
try
{
//var resp = _client.BulkAll<informal_essay_info>(InformalEssays, x => x.Index(IndexName).Size(1000));
var observableBulk = _client.BulkAll(InformalEssays, f => f
.MaxDegreeOfParallelism(8)
.BackOffTime(TimeSpan.FromSeconds(10))
.BackOffRetries(2)
.Size(1000)
.RefreshOnCompleted()
.Index(IndexName)
.BufferToBulk((r, buffer) => r.IndexMany(buffer))
);
var countdownEvent = new CountdownEvent(1);
var bulkAllObserver = new BulkAllObserver(
onNext: response =>
{
//_logger.LogInformation($"索引 {response.Page} 页,每页1000条,重试 {response.Retries} 次");
},
one rror: ex =>
{
_logger.LogInformation("添加索引异常 : {0}", ex);
countdownEvent.Signal();
},
() =>
{
_logger.LogInformation("添加索引完成");
countdownEvent.Signal();
});
observableBulk.Subscribe(bulkAllObserver);
//var resp = _client.IndexMany(InformalEssays, IndexName);
return OutPutResult.ok();
}
catch (Exception ex)
{
_logger.LogInformation("批量操作异常:", ex);
return OutPutResult.error(ex.Message);
}
}
2.3删除
/// <summary>
/// Id单条删除
/// </summary>
/// <param name="InformalEssayId"></param>
/// <returns></returns>
public OutPutResult DeleteById(string InformalEssayId)
{
try
{
var resp = _client.Delete<informal_essay_info>(InformalEssayId, x => x.Index(IndexName));
if (resp.IsValidResponse)
{
return OutPutResult.ok();
}
else
{
return OutPutResult.error(resp.DebugInformation);
}
}
catch (Exception ex)
{
return OutPutResult.error(ex.Message);
}
}
/// <summary>
/// Ids批量删除
/// </summary>
/// <param name="InformalEssayIds"></param>
/// <returns></returns>
public OutPutResult BulkDeleteById(string InformalEssayIds)
{
try
{
var ids = CommonHelpers.GetStringArray(InformalEssayIds).ToList();
List<FieldValue> terms = new();
ids.ForEach(x =>
{
terms.Add(x);
});
var resp = _client.DeleteByQuery<informal_essay_info>(IndexName, x => x.
Query(q => q.Terms(t => t.Field(f => f.id).Terms(new TermsQueryField(terms))))
.Refresh(true)
);
if (resp.IsValidResponse)
{
return OutPutResult.ok();
}
else
{
return OutPutResult.error(resp.DebugInformation);
}
}
catch (Exception ex)
{
return OutPutResult.error(ex.Message);
}
}
/// <summary>
/// 根据条件批量删除
/// </summary>
/// <param name="keyword"></param>
/// <param name="start"></param>
/// <param name="end"></param>
/// <returns></returns>
public OutPutResult BulkDeleteByQuery(string keyword, DateTime? start, DateTime? end)
{
try
{
var mustQueries = new List<Action<QueryDescriptor<informal_essay_info>>>();
if (start.HasValue || end.HasValue)
{
long startDate = CommonHelpers.DateTimeToTimeStamp(start ?? DateTime.MinValue);
long endDate = CommonHelpers.DateTimeToTimeStamp(end ?? DateTime.MaxValue);
mustQueries.Add(t => t.Range(r => r.NumberRange(n => n.Field(f => f.setdate).Gte(startDate).Lte(endDate))));
}
if (!string.IsNullOrEmpty(keyword))
{
mustQueries.Add(t => t.MultiMatch(m => m.Fields(
new Field("title", 10).And(new Field("content", 5)).And(new Field("author", 5))
).Operator(Operator.And).Query(keyword)));
}
Action<QueryDescriptor<informal_essay_info>> retQuery = q => q.Bool(b => b.Filter(mustQueries.ToArray()));
var resp = _client.DeleteByQuery<informal_essay_info>(IndexName, s => s
.Query(q => q.Bool(b => b.Must(retQuery)))
);
if (resp.IsValidResponse)
{
return OutPutResult.ok();
}
else
{
return OutPutResult.error(resp.DebugInformation);
}
}
catch (Exception ex)
{
return OutPutResult.error(ex.Message);
}
}
2.4修改
public OutPutResult UpdateIndex(informal_essay_info InformalEssay)
{
try
{
var resp = _client.Index<informal_essay_info>(InformalEssay, x => x.Index(IndexName).Refresh(Refresh.True));
if (resp.IsValidResponse)
{
return OutPutResult.ok();
}
else
{
return OutPutResult.error(resp.DebugInformation);
}
}
catch (Exception ex)
{
return OutPutResult.error(ex.Message);
}
}
/// <summary>
/// 批量修改
/// </summary>
/// <param name="InformalEssays"></param>
/// <returns></returns>
public OutPutResult BulkUpdateIndex(List<informal_essay_info> InformalEssays)
{
try
{
var observableBulk = _client.BulkAll(InformalEssays, f => f
.MaxDegreeOfParallelism(8)
.BackOffTime(TimeSpan.FromSeconds(10))
.BackOffRetries(2)
.Size(1000)
.RefreshOnCompleted()
.Index(IndexName)
.BufferToBulk((r, buffer) => r.IndexMany(buffer))
);
var bulkAllObserver = new BulkAllObserver(
onNext: response =>
{
_logger.LogInformation($"索引 {response.Page} 页,每页1000条,重试 {response.Retries} 次");
},
one rror: ex =>
{
_logger.LogInformation("更新索引异常 : {0}", ex);
},
() =>
{
_logger.LogInformation("完成更新索引");
});
observableBulk.Subscribe(bulkAllObserver);
return OutPutResult.ok();
}
catch (Exception ex)
{
return OutPutResult.error(ex.Message);
}
}
/// <summary>
/// 根据条件修改(稿件发布/取消发布后,更新ES)
/// </summary>
/// <param name="id"></param>
/// <param name="status">1:发布,0:取消发布,-1:删除</param>
/// <returns></returns>
public OutPutResult UpdateInformalEssayInfoAfterPublishOrCancel(string id, int status = 1)
{
try
{
var sql = $"ctx._source.status = params.status;";
var scriptParams = new Dictionary<string, object> { { "status", status } };
var req = new UpdateByQueryRequest(IndexName)
{
Query = new TermQuery("id") { Field = "id", Value = id },
Script = new Script(new InlineScript() { Source = sql, Params = scriptParams }),
Refresh = true
};
var resp = _client.UpdateByQuery(req);
if (resp.IsValidResponse)
{
return OutPutResult.ok();
}
else
{
return OutPutResult.error(resp.DebugInformation);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "稿件发布/取消发布后,更新ES:");
return OutPutResult.error("稿件发布/取消发布后,更新ES:" + ex.Message);
}
}
2.5查询
/// <summary>
/// id单条查询
/// </summary>
/// <param name="InformalEssayId"></param>
/// <returns></returns>
public OutPutResult SerachById(string InformalEssayId)
{
try
{
var search = _client.Get<informal_essay_info>(InformalEssayId, s => s.Index(IndexName));
var data = search.Source;
return OutPutResult.ok().put("data", data);
}
catch (Exception ex)
{
return OutPutResult.error(ex.Message);
}
}
/// <summary>
/// ids多条查询
/// </summary>
/// <param name="InformalEssayIds"></param>
/// <returns></returns>
public OutPutResult SerachByIds(string InformalEssayIds)
{
try
{
var ids = CommonHelpers.GetStringArray(InformalEssayIds).ToList();
List<FieldValue> terms = new();
ids.ForEach(x =>
{
terms.Add(x);
});
var search = _client.Search<informal_essay_info>(s => s.Index(IndexName).Query(
q => q.Terms(t => t.Field(f => f.id).Terms(new TermsQueryField(terms)))
));
var data = search.Documents.ToList();
return OutPutResult.ok().put("data", data);
}
catch (Exception ex)
{
return OutPutResult.error(ex.Message);
}
}
/// <summary>
/// 列表分页查询
///(不要说深度分页性能问题,用户说不用管,哈哈,就要全部查出来,慢点没关系)
/// 其实尝试过几种方案,具体使用人员就觉得直接分页最方便,最后直接max_result_window设置了1千万
/// (用户800万左右的数据,三台16核32GB的服务器部署的ES集群+Redis集群,深度分页1秒多,完全能接受)
/// </summary>
/// <param name="category"></param>
/// <param name="keyword"></param>
/// <param name="start"></param>
/// <param name="end"></param>
/// <param name="pageindex"></param>
/// <param name="rows"></param>
/// <param name="orderField"></param>
/// <param name="order"></param>
/// <returns></returns>
public OutPutResult SerachInformalEssays(string category, string keyword, DateTime? start, DateTime? end, int pageindex = 1, int rows = 20, int string orderField = "setdate", SortOrder order = SortOrder.Desc)
{
try
{
var mustQueries = new List<Action<QueryDescriptor<informal_essay_info>>>();
//状态,0为正常,-1为删除,1为发布
mustQueries.Add(t => t.Term(f => f.status, 1));
if (!string.IsNullOrEmpty(category))
{
mustQueries.Add(t => t.MatchPhrase(f => f.Field(fd => fd.category).Query(category)));
}
if (!string.IsNullOrEmpty(keyword))
{
mustQueries.Add(t => t.MultiMatch(m => m.Fields(
new Field("title", 10).And(new Field("content", 5)).And(new Field("author", 5)).And(new Field("keyword", 5))
).Operator(Operator.And).Query(keyword)));
}
if (start.HasValue || end.HasValue)
{
long startDate = CommonHelpers.DateTimeToTimeStamp(start ?? DateTime.MinValue);
long endDate = 0;
if (end != null && end.HasValue)
{
string endtime = string.Format("{0:d}", end) + " 23:59:59";
end = Convert.ToDateTime(endtime);
endDate = CommonHelpers.DateTimeToTimeStamp(end ?? DateTime.MaxValue);
}
else
{
endDate = CommonHelpers.DateTimeToTimeStamp(end ?? DateTime.MaxValue);
}
mustQueries.Add(t => t.Range(r => r.NumberRange(n => n.Field("setdate").Gte(startDate).Lte(endDate))));
}
var search = _client.Search<informal_essay_info>(s => s
.Index(IndexName)
.Query(q => q.Bool(b => b.Must(mustQueries.ToArray())))
.From((pageindex - 1) * rows)
.Size(rows)
.Sort(s => s.Field(orderField, new FieldSort { Order = order }))
);
return OutPutResult.ok().put("total", search.Total).put("data", search.Documents);
}
catch (Exception ex)
{
_logger.LogError(ex, "查询随笔异常:");
return OutPutResult.error(ex.Message);
}
}
标签:return,Elastic,OutPutResult,Nest,Clients,ex,var,new,resp
From: https://www.cnblogs.com/libian/p/18254894