springboot3.3.4 集成 elasticsearch-java 动态索引
springboot3.3.4 集成 elasticsearch-java 动态索引 和 复杂查询
·
网上很多 springboot 集成 elasticsearch 的教程但是大部分都是集成 elasticsearch-rest-high-level-client, elasticsearch8.x 之后就不推荐这个客户端了,里面的方法也弃用了..最近开源卫士检查出高危也升级不了,狠狠心把它重构成 elasticsearch-java
增删改 api 都差不多 所以不再描述
描述一下 结合 动态索引, 分组 , 过滤 , 分页 , 的复杂查询逻辑
索引是每天创建一个 类似于 index-{2024-10-16}
类似mysql selcet * from where xxx = xxx grop by xxx limit 0,10
引入依赖(因为springboot自己有版本控制所以这里不需要指定version)
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
</dependency>
配置账密连接信息
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import co.elastic.clients.util.ContentType;
import com.xx.xxx.es.autoconfigure.ElasticsearchProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.List;
/**
* @author yuanchangshuai
*/
@Configuration
@Slf4j
public class ElasticSearchConfig {
@Autowired
private ElasticsearchProperties elasticsearchProperties;
/**
* 初始化客户端
* @return
*/
@Bean
public ElasticsearchClient elasticsearchClient(){
String hostname = elasticsearchProperties.getHost();
int port = elasticsearchProperties.getPort();
String username = elasticsearchProperties.getUserName();
String password = elasticsearchProperties.getPassword();
// 基本的用户名密码认证
BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
basicCredentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestClient restClient = RestClient.builder(new HttpHost(hostname, port))
.setHttpClientConfigCallback(httpClientBuilder
->httpClientBuilder.setDefaultHeaders(
List.of(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())))
.setDefaultCredentialsProvider(basicCredentialsProvider)
.addInterceptorLast((HttpResponseInterceptor) (response, context)
-> response.addHeader("X-Elastic-Product", "Elasticsearch"))).build();
ElasticsearchTransport transport = new RestClientTransport(
restClient, new JacksonJsonpMapper());
return new ElasticsearchClient(transport);
}
}
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
/**
* @PROJECT_NAME: taskcenter
* @DESCRIPTION:
* @author: yuanchangshuai
* @DATE: 2024/1/18 11:26
*/
@ConfigurationProperties(prefix = "spring.elasticsearch")
@Data
public class ElasticsearchProperties {
/**
* host
*/
private String host = "localhost";
/**
* 端口
*/
private Integer port = 9200;
/**
* 用户名
*/
private String userName = "elastic";
/**
* 密码
*/
private String password = "elastic";
/**
* 请求方式
*/
private String scheme = "http";
/**
* es集群名称
*/
private String clusterName = "single-node-cluster";
/**
* es 连接超时时间
*/
private Integer connectTimeOut = 1000;
/**
* es socket 连接超时时间
*/
private Integer socketTimeOut = 30000;
/**
* es 请求超时时间
*/
private Integer connectionRequestTimeOut = 500;
/**
* es 最大连接数
*/
private Integer maxConnectNum = 100;
/**
* es 每个路由的最大连接数
*/
private Integer maxConnectNumPerRoute = 100;
}
接下来启动项目就是已经集成成功了
新增文档:
private void createIndex(XXX xxx) throws IOException {
String indexName = esIndex + "_" + DateUtils.localDateToStr(LocalDate.now(), ElasticsearchConstant.YYYY_MM_DD);
IndexRequest<XXX> indexRequest =
new IndexRequest.Builder<XXX>()
.index(indexName) // 索引
.document(xxx).build(); // 文档内容 (product)
// 运行插入语句
IndexResponse index = elasticsearchClient.index(indexRequest);
}
查询文档:
@Autowired
private ElasticsearchClient elasticsearchClient;
public PageData<ResponseDto> page (PageDto dto) throws IOException {
//分页对象
PageData<ResponseDto> pageData = new PageData<>();
//返回值
List<ResponseDto> restList = new ArrayList<>();
//起始页
int from = (dto.getCurrent() - 1) * dto.getSize();
//构建请求对象
SearchRequest searchRequest = SearchRequest.of(s -> s
//动态索引
.index(getEsIndexs(dto))
//起始页
.from(from)
//一页数据条数
.size(dto.getSize())
// 这里相当于mysql里面的 where 条件
.query(q -> q
//一个过滤组
.bool(
builder -> {
//时间范围查询 相当于 mysql =
BoolQuery.Builder boolBuild = builder.must(mq -> mq
.range(r -> r.field("timestamp").from(dto.getStartTime()).to(dto.getEndTime()))
);
// 某个字段 模糊查询 相当于 mysql : and like % % 只是这里会分词
if (StrUtil.isNotBlank(dto.getName())) {
boolBuild. must(mq -> mq
.matchPhrase(e->e.field("name").query(dto.getName())
)
);
}
// 相当于 mysql and (xxx in ())
if (CollUtil.isNotEmpty(dto.getChannel())) {
for (String channel : dto.getChannel()) {
boolBuild.must(mq -> mq.bool(x->x.should(h -> h.matchPhrase(u -> u.field("channel").query(channel)))));
}
}
return boolBuild;
}
)
)
//构建分组 相当于 mysql : group by
.aggregations("group",
/*
为什么要.keyword?
默认情况下, Elasticsearch 对 text 类型的字段(field)禁用了 fielddata;
text 类型的字段在创建索引时会进行分词处理, 而聚合操作必须基于字段的原始值进行分析;
所以如果要对 text 类型的字段进行聚合操作, 就需要存储其原始值 —— 创建mapping时指定fielddata=true, 以便通过反转倒排索引(即正排索引)将索引数据加载至内存中.
Elasticsearch 5.x 版本开始支持通过text的内置字段keyword作精确查询、聚合分析:
*/
a -> a.terms(t -> t.field("分组字段.keyword")
//返回最大组数量
.size(Integer.MAX_VALUE)
//分组后 根据每组数量 排序一下
.order(List.of(
NamedValue.of("_count", SortOrder.Desc)
))
)
)
//这个是 查询数据集的排序:
.sort(List.of(SortOptions.of(e -> e.field(x -> x.field("timestamp")))))
// 因为 在es里面 外层 查询数据集 和分组 数据是互不影响的,所以即使 分组了也不会影响 外层数据, 所以这里对 分组的 字段 去重然后分页,不然还是会返回没分组的数据
.collapse(c -> c.field("分组字段.keyword"))
);
// 执行搜索
SearchResponse<XXX> searchResponse = elasticsearchClient.search(searchRequest, XXX.class);
//这个是获取外层去重后分组的数据
HitsMetadata<XXX> hits = searchResponse.hits();
// 处理分组结果
Aggregate aggregate = searchResponse.aggregations().get("group");
StringTermsAggregate stringTermsAggregate = (StringTermsAggregate) aggregate._get();
hits.hits().forEach(hit -> {
XXX source = hit.source();
ResponseDto convert = BeanConvertUtils.convert(source, ResponseDto.class);
stringTermsAggregate.buckets().array().forEach(bucket -> {
// 处理每个桶
FieldValue key = bucket.key();
long docCount = bucket.docCount();
//如果分组字段和分组集合里面对的上那就获取分组的总数
if (key.stringValue().equals(convert.get分组字段())){
convert.setNumber(docCount);
}
});
restList.add(convert);
});
//求分组后的总条数 这里不能用 外层的数据数据量,因为外层总数是分页后的 所以用 分了多少组其实就是 多少总数
int total = stringTermsAggregate.buckets().array().size();
// 注意:totalBuckets不是你可以对查询结果分页的“页面”数量,而是不同age值的数量。
// 要计算总页数,你应该使用totalHits和pageSize:
int totalPages = (int) Math.ceil((double) total / dto.getSize());
pageData.setRecords(restList);
pageData.setPages((long) totalPages);
pageData.setTotal(total);
pageData.setCurrent(dto.getCurrent());
pageData.setSize(dto.getSize());
return pageData;
}
private String getEsIndexs(PageDto dto) {
String index = esIndex;
if (StrUtil.isEmpty(dto.getStartTime()) || StrUtil.isEmpty(dto.getEndTime())) {
index += "_" + DateUtils.localDateToStr(LocalDate.now(), ElasticsearchConstant.YYYY_MM_DD);
} else {
index += "*";
}
return index;
}
更多推荐
所有评论(0)