网上很多  springboot  集成 elasticsearch 的教程但是大部分都是集成 elasticsearch-rest-high-level-client,   elasticsearch8.x  之后就不推荐这个客户端了,里面的方法也弃用了..最近开源卫士检查出高危也升级不了,狠狠心把它重构成 elasticsearch-java

增删改 api 都差不多 所以不再描述


描述一下 结合 动态索引, 分组 , 过滤 , 分页 , 的复杂查询逻辑

索引是每天创建一个 类似于 index-{2024-10-16}

类似mysql selcet * from where xxx = xxx  grop by xxx limit 0,10

引入依赖(因为springboot自己有版本控制所以这里不需要指定version)

<dependency>
    <groupId>co.elastic.clients</groupId>
    <artifactId>elasticsearch-java</artifactId>
</dependency>

配置账密连接信息

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import co.elastic.clients.util.ContentType;
import com.xx.xxx.es.autoconfigure.ElasticsearchProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.RestClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.List;

/**
 * @author yuanchangshuai
 */
@Configuration
@Slf4j
public class ElasticSearchConfig {

    @Autowired
    private ElasticsearchProperties elasticsearchProperties;
    /**
     * 初始化客户端
     * @return
     */
    @Bean
    public  ElasticsearchClient elasticsearchClient(){
        String hostname = elasticsearchProperties.getHost();
        int port = elasticsearchProperties.getPort();
        String username = elasticsearchProperties.getUserName();
        String password = elasticsearchProperties.getPassword();
        // 基本的用户名密码认证
        BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider();
        basicCredentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
        RestClient restClient = RestClient.builder(new HttpHost(hostname, port))
                .setHttpClientConfigCallback(httpClientBuilder
                        ->httpClientBuilder.setDefaultHeaders(
                                List.of(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())))
                        .setDefaultCredentialsProvider(basicCredentialsProvider)
                        .addInterceptorLast((HttpResponseInterceptor) (response, context)
                                -> response.addHeader("X-Elastic-Product", "Elasticsearch"))).build();
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());
        return new ElasticsearchClient(transport);
    }
}

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @PROJECT_NAME: taskcenter
 * @DESCRIPTION:
 * @author: yuanchangshuai
 * @DATE: 2024/1/18 11:26
 */
@ConfigurationProperties(prefix = "spring.elasticsearch")
@Data
public class ElasticsearchProperties {


    /**
     * host
     */
    private String host = "localhost";
    /**
     * 端口
     */
    private Integer port = 9200;

    /**
     * 用户名
     */
    private String userName = "elastic";
    /**
     * 密码
     */
    private String password = "elastic";
    /**
     * 请求方式
     */
    private String scheme = "http";
    /**
     * es集群名称
     */
    private String clusterName = "single-node-cluster";
    /**
     * es 连接超时时间
     */
    private Integer connectTimeOut = 1000;
    /**
     * es socket 连接超时时间
     */
    private Integer socketTimeOut = 30000;
    /**
     * es 请求超时时间
     */
    private Integer connectionRequestTimeOut = 500;
    /**
     * es 最大连接数
     */
    private Integer maxConnectNum = 100;
    /**
     * es 每个路由的最大连接数
     */
    private Integer maxConnectNumPerRoute = 100;

}

接下来启动项目就是已经集成成功了

新增文档:

   private void createIndex(XXX xxx) throws IOException {
       
            String indexName = esIndex + "_" + DateUtils.localDateToStr(LocalDate.now(), ElasticsearchConstant.YYYY_MM_DD);

            IndexRequest<XXX> indexRequest =
                    new IndexRequest.Builder<XXX>()
                            .index(indexName) // 索引
                            .document(xxx).build();  // 文档内容 (product)
            // 运行插入语句
            IndexResponse index = elasticsearchClient.index(indexRequest);
       
    }

查询文档:

    @Autowired
    private ElasticsearchClient elasticsearchClient;
 
public PageData<ResponseDto> page (PageDto dto) throws IOException {
       //分页对象
        PageData<ResponseDto> pageData = new PageData<>();
       //返回值
        List<ResponseDto> restList = new ArrayList<>();

        //起始页
        int from = (dto.getCurrent() - 1) * dto.getSize();
        //构建请求对象
        SearchRequest searchRequest = SearchRequest.of(s -> s
                //动态索引
                .index(getEsIndexs(dto))
                //起始页
                .from(from)
                //一页数据条数
                .size(dto.getSize())
                // 这里相当于mysql里面的 where 条件
                .query(q -> q
                         //一个过滤组
                        .bool(
                                builder -> {
                                   //时间范围查询 相当于 mysql =
                                    BoolQuery.Builder boolBuild = builder.must(mq -> mq
                                            .range(r -> r.field("timestamp").from(dto.getStartTime()).to(dto.getEndTime()))
                                    );
                                     // 某个字段 模糊查询 相当于 mysql : and like % % 只是这里会分词
                                    if (StrUtil.isNotBlank(dto.getName())) {
                                        boolBuild. must(mq -> mq
                                                .matchPhrase(e->e.field("name").query(dto.getName())
                                                )
                                        );
                                    }
                                    // 相当于 mysql and (xxx in ())
                                    if (CollUtil.isNotEmpty(dto.getChannel())) {
                                        for (String channel : dto.getChannel()) {
                                            boolBuild.must(mq -> mq.bool(x->x.should(h -> h.matchPhrase(u -> u.field("channel").query(channel)))));
                                        }
                                    }
                                    return boolBuild;
                                }
                        )
                )
                 //构建分组 相当于 mysql : group by
                .aggregations("group",
                        /*
                        为什么要.keyword?
                        默认情况下, Elasticsearch 对 text 类型的字段(field)禁用了 fielddata;
                        text 类型的字段在创建索引时会进行分词处理, 而聚合操作必须基于字段的原始值进行分析;
                        所以如果要对 text 类型的字段进行聚合操作, 就需要存储其原始值 —— 创建mapping时指定fielddata=true, 以便通过反转倒排索引(即正排索引)将索引数据加载至内存中.
                        Elasticsearch 5.x 版本开始支持通过text的内置字段keyword作精确查询、聚合分析:
                         */
                        
                        a -> a.terms(t -> t.field("分组字段.keyword")
                                //返回最大组数量
                                .size(Integer.MAX_VALUE)
                                //分组后 根据每组数量 排序一下 
                                .order(List.of(
                                        NamedValue.of("_count", SortOrder.Desc)

                                ))
                        )
                )
                //这个是 查询数据集的排序: 
                .sort(List.of(SortOptions.of(e -> e.field(x -> x.field("timestamp")))))
                // 因为 在es里面 外层 查询数据集 和分组 数据是互不影响的,所以即使 分组了也不会影响 外层数据, 所以这里对 分组的 字段 去重然后分页,不然还是会返回没分组的数据
                .collapse(c -> c.field("分组字段.keyword"))
        );
        // 执行搜索
        SearchResponse<XXX> searchResponse = elasticsearchClient.search(searchRequest, XXX.class);

        //这个是获取外层去重后分组的数据
        HitsMetadata<XXX> hits = searchResponse.hits();

        // 处理分组结果
        Aggregate aggregate = searchResponse.aggregations().get("group");

        StringTermsAggregate stringTermsAggregate = (StringTermsAggregate) aggregate._get();
            
        hits.hits().forEach(hit -> {
            XXX source = hit.source();
            ResponseDto convert = BeanConvertUtils.convert(source, ResponseDto.class);
            stringTermsAggregate.buckets().array().forEach(bucket -> {
                // 处理每个桶
                FieldValue key = bucket.key();
                long docCount = bucket.docCount();
                //如果分组字段和分组集合里面对的上那就获取分组的总数
                if (key.stringValue().equals(convert.get分组字段())){
                    convert.setNumber(docCount);
                }
            });
            restList.add(convert);
        });
        //求分组后的总条数 这里不能用 外层的数据数据量,因为外层总数是分页后的 所以用 分了多少组其实就是 多少总数

        int total = stringTermsAggregate.buckets().array().size();
        // 注意:totalBuckets不是你可以对查询结果分页的“页面”数量,而是不同age值的数量。
        // 要计算总页数,你应该使用totalHits和pageSize:
        int totalPages = (int) Math.ceil((double) total / dto.getSize());
        pageData.setRecords(restList);
        pageData.setPages((long) totalPages);
        pageData.setTotal(total);
        pageData.setCurrent(dto.getCurrent());
        pageData.setSize(dto.getSize());
        return pageData;
    }

    private String getEsIndexs(PageDto dto) {
        String index = esIndex;
        if (StrUtil.isEmpty(dto.getStartTime()) || StrUtil.isEmpty(dto.getEndTime())) {
            index += "_" + DateUtils.localDateToStr(LocalDate.now(), ElasticsearchConstant.YYYY_MM_DD);
        } else {
            index += "*";
        }
        return index;
    }

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐