Elasticsearch8.x java api 批量插入上万条数据
elasticsearch8.x 批量插入数据
·
1.maven依赖
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.12.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.17.0</version>
</dependency>
2.批量插入
@Autowired
ElasticsearchClient esClient;
//单批次插入数量
public static int BATCH_SIZE = 1000;
//批量插入时最大大小,当此值够大时上面的数量才有空间,单位:byte 10485760 = 10M, 1073741824 = 1G
public static int BYTE_SIZE = 10485760;
//并发请求数量
public static int REQUEST_SIZE = 10;
/**
* 批量插入
*
* @param list
* @throws IOException
*/
public boolean bulk2ES(List<EmpiPro> list) {
try {
//通过数据总量和单批次插入数量计算并发请求数
log.info("并发请求数量:{},单批次插入数量:{},最大请求大小:{}", REQUEST_SIZE, BATCH_SIZE, BYTE_SIZE);
BulkListener<String> listener = getBulkIngester();
BulkIngester<String> bulkIngester = BulkIngester.of(b -> b
.client(esClient)
.maxOperations(BATCH_SIZE)
.maxSize(BYTE_SIZE)
.maxConcurrentRequests(REQUEST_SIZE)
.flushInterval(60, TimeUnit.SECONDS)
.listener(listener)
);
for (int i = 0; i < list.size(); i++) {
EmpiPro empiPro = list.get(i);
IndexOperation<EmpiPro> indexOperation = new IndexOperation.Builder<EmpiPro>()
// 索引
.index(EMPI_INDEX_NAME)
// 文档id
.id(empiPro.getEmpi())
// 文档内容
.document(empiPro)
.build();
BulkOperation bulkOperation = new BulkOperation.Builder()
.index(indexOperation)
.build();
bulkIngester.add(bulkOperation);
}
bulkIngester.close();
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}
/**
* bulk监听器
* @return
* @throws Exception
*/
public BulkListener<String> getBulkIngester() throws Exception {
BulkListener<String> listener = new BulkListener<String>() {
/**
*
* @param executionId 此请求的id
* @param request 将发送的批量请求
* @param contexts 数据集
*/
@Override
public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) {
log.info("【beforeBulk】批次[{}】 携带 【{}】 请求数量", executionId, contexts.size());
}
/**
* 批量请求之后调用
* @param executionId 此请求的id
* @param request 将发送的批量请求
* @param contexts 数据集
* @param response 返回值
*/
@Override
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) {
log.info("【afterBulk】批次[{}】 提交数据量【{}】 提交结果【{}】", executionId, contexts.size(), response.errors() ? "失败" : "成功");
}
/**
* 当批量请求无法发送到Elasticsearch时调用
* @param executionId 此请求的id
* @param request 将发送的批量请求
* @param contexts 数据集
* @param failure 异常信息
*/
@Override
public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) {
log.error("Bulk request " + executionId + " failed", failure);
}
};
return listener;
}
补充es的配置类:
@Configuration
public class EsConfig {
@Value("${spring.data.elasticsearch.host}")
private String host;
@Value("${spring.data.elasticsearch.scheme}")
private String scheme;
@Value("${spring.data.elasticsearch.port}")
private int port;
@Value("${spring.data.elasticsearch.user}")
private String userName;
@Value("${spring.data.elasticsearch.pwd}")
private String password;
@Bean
public ElasticsearchClient elasticsearchClient(){
//ElasticsearchClient client = new ElasticsearchClient(null);
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
//设置账号密码
credentialsProvider.setCredentials(
AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
RestClient restClient = RestClient.builder(new HttpHost(host, port,scheme))
// 异步httpclient配置
.setHttpClientConfigCallback( httpClientBuilder -> {
// 账号密码登录
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// httpclient保活策略
httpClientBuilder.setKeepAliveStrategy(((response, context) -> Duration.ofMinutes(5).toMillis()));
return httpClientBuilder;
}).build();
ElasticsearchTransport transport = new RestClientTransport(restClient,new JacksonJsonpMapper());
// And create the API client
return new ElasticsearchClient(transport);
}
}
更多推荐
所有评论(0)