首页 > 其他分享 >ES的BulkProcessor实现批量写入

ES的BulkProcessor实现批量写入

时间:2023-02-02 10:32:30浏览次数:36  
标签:String 写入 item elasticsearch BulkProcessor org import public ES


这是读取的文件

0,联想(Lenovo)拯救者Y7000P英特尔酷睿i7 15.6英寸游戏笔记本电脑(8核i7-10875H 16G 512G RTX2060 144Hz)灰,,7299.0,联想京东自营旗舰店,200000,电脑,1000
0,联想ThinkPad E15 2021款 酷睿版 英特尔酷睿i5/i7 轻薄笔记本电脑 人脸识别 i5-1135G7 16G 512G 1SCD,,5399.0,ThinkPad京东官方自营旗舰店,5000,电脑,1000
0,惠普(HP)战66四代 锐龙版 14英寸轻薄笔记本电脑(Zen3架构 8核 R7-5800U 16G 512G 400尼特高色域 一年上门),5099.0,惠普京东自营官方旗舰店,200000,电脑,1000
0,华硕无畏Pro14 标压锐龙版 2.8K OLED屏轻薄笔记本电脑(R7-5800H 16G 512G 133%sRGB高色域 600尼特 90Hz)银,4999.0,华硕京东自营官方旗舰店,5000,电脑,1000
0,联想YOGA 13s 2021款 锐龙版 13.3英寸全面屏超轻薄笔记本电脑(6核 R5-5600U 16G 512G 2.5K高色域屏)深空灰,4799.0,联想京东自营旗舰店,100000,电脑,1000
0,惠普(HP)星15 青春版 15.6英寸轻薄窄边框笔记本电脑(R7-4700U 16G 512GSSD UMA FHD IPS)银,4098.0,惠普京东自营官方旗舰店,20000,电脑,1000
0,宏碁(Acer)非凡S3超轻薄本 全新升级 14英寸办公笔记本电脑 高色域 Evo认证(11代酷睿i5 16G 512G 雷电4)银,4399.0,宏碁京东自营旗舰店,2000,电脑,1000
0,联想笔记本电脑ThinkPad X1 Carbon 2021款 英特尔Evo平台 14英寸 11代酷睿i5 16G 512G 高色域 /4G全时互联,9999.0,ThinkPad京东自营旗舰店,5000,电脑,1000
package com.example.Exe;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.SneakyThrows;
import org.apache.http.HttpHost;
import org.apache.logging.log4j.core.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;

import java.io.*;
import java.util.ArrayList;
import java.util.function.BiConsumer;
import java.util.logging.ErrorManager;


public class Create_Mapping {


public static void main(String[] args) {
Logger logger;

try {
File file=new File("D:\\JAVA_Workspace\\Java\\es\\src\\main\\java\\com\\example\\Exe\\zd");
// Create the low-level client


BulkProcessor bulkProcessor = bulkProcessor();


BufferedReader reader = new BufferedReader(new FileReader(file));

String temp=null;
int j=1;
while((temp=reader.readLine())!=null){
String[] line = temp.split(",");
Item_Detail item_detail=new Item_Detail();
item_detail.setName(line[1]);
item_detail.setUrl(line[2]);
item_detail.setPrice(Double.parseDouble(line[3]));
item_detail.setFactory(line[4]);
item_detail.setComment(Long.parseLong(line[5]));
item_detail.setKeyword(line[6]);
item_detail.setId(j);
item_detail.setNum(1000);
j=j+1;
System.out.println(item_detail.toString());
// 新增文档 - 请求对象
IndexRequest request = new IndexRequest();
// 设置索引及唯一性标识
request.index("item").id(String.valueOf(j));

ObjectMapper objectMapper = new ObjectMapper();
String productJson = objectMapper.writeValueAsString(item_detail);
// 添加文档数据,数据格式为 JSON 格式
request.source(productJson, XContentType.JSON);
bulkProcessor.add(new IndexRequest("item").id(String.valueOf(j)).source(productJson, XContentType.JSON));

//request.index("item").id(j);

}
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}


}
/*
* 创建bulkProcessor
* */
public static BulkProcessor bulkProcessor() {
org.apache.logging.log4j.Logger logger = null;


RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost("hadoop101", 9200, "http")));
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

return BulkProcessor.builder(bulkConsumer, new BulkProcessor.Listener() {

@Override
public void beforeBulk(long executionId, BulkRequest request) {
System.out.println("插入数据");
//在这儿你可以自定义执行同步之前执行什么
}

@SneakyThrows
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
System.out.println("写入完成");
//在这儿你可以自定义执行完同步之后执行什么
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
//写入失败后



logger.error("ES写入失败", failure);
}
}).setBulkActions(100).setFlushInterval(TimeValue.timeValueSeconds(10)).build(); // 达到刷新的条数
// 固定刷新的时间频率


}

}

实体类

package com.example.Exe;

public class Item_Detail {
long id;
long comment;
String keyword;
String factory;
double price;
String url;
String name;
long num;

public long getNum() {
return num;
}

@Override
public String toString() {
return "Item_Detail{" +
"id=" + id +
", comment=" + comment +
", keyword='" + keyword + '\'' +
", factory='" + factory + '\'' +
", price=" + price +
", url='" + url + '\'' +
", name='" + name + '\'' +
", num=" + num +
'}';
}

public void setNum(long num) {
this.num = num;
}

public long getId() {
return id;
}

public void setId(long id) {
this.id = id;
}

public long getComment() {
return comment;
}

public void setComment(long comment) {
this.comment = comment;
}

public String getKeyword() {
return keyword;
}

public void setKeyword(String keyword) {
this.keyword = keyword;
}

public String getFactory() {
return factory;
}

public void setFactory(String factory) {
this.factory = factory;
}

public double getPrice() {
return price;
}

public void setPrice(double price) {
this.price = price;
}

public String getUrl() {
return url;
}

public void setUrl(String url) {
this.url = url;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}


}


标签:String,写入,item,elasticsearch,BulkProcessor,org,import,public,ES
From: https://blog.51cto.com/u_15063934/6033063

相关文章