记录是否存在
res = es.exists(index="ncbi_gene", id=_id)
插入记录
es_res = self.es.index(index="ncbi_gene", id=my_data['GeneID'], body=my_data) if es_res['result'] in ['created', 'updated']: logging.info('parse success:%s' % _id) else: logging.error('insert into es failed:%s' % es_res)
批量插入
import logging import os import shutil from elasticsearch import helpers from elasticsearch import Elasticsearch ES_SERVERS = [ "http://***:9200", "http://***:9200" ] class INSERTES(NCBIDown): def in_es(self): es = Elasticsearch(ES_SERVERS) files_lst = os.listdir(self.record_dir) files_lst = [f for f in files_lst if f.endswith('_parse.json')] index = 0 while index < len(files_lst): files = files_lst[index:index + 100] esitems = [] for esfile in files: try: with open(self.record_dir + "/" + esfile) as f: info = json.loads(f.read()) except Exception as ex: logging.error('load data failed: %s %s'%(esfile, str(ex))) raise ex item = {"_id": info['GeneID'], "_index":'ncbi_gene', "_source": info } esitems.append(item) helpers.bulk(es, esitems, request_timeout=1000) index += 100 logging.info('insert into es finish') if __name__ == '__main__': args = parser.parse_args() obj = INSERTES(args.name, args.apk_key, args.email, args.record_dir, args.log_dir, args.spos, args.epos, args.step, args.num, args.ids) obj.install_log() obj.insert2es()
标签:files,index,示例,args,lst,使用,import,es From: https://www.cnblogs.com/testzcy/p/17916179.html