AWS OpenSearch 搜索排序常见用法

发布于:2025-07-23 ⋅ 阅读:(21) ⋅ 点赞:(0)

背景介绍

AWS OpenSearch是AWS的一个检索分析服务,是基于开源的Elasticsearch 7.x分支fork出来的独立的一个代码仓库,做了独立的维护,加入了一些自己的优化,本文在这里主要介绍是常见的基础用法

引入相关依赖

 <dependency>
     <groupId>org.opensearch.client</groupId>
     <artifactId>opensearch-java</artifactId>
     <version>2.17.0</version>
 </dependency>

查询返回指定属性字段

按照前端要求的返回字段(“productId”, “title”, “rating”, “images”,“productTags”)进行返回,而不是返回所有字段

SearchResponse<ProductVO> search = openSearchClient.search(
                    s -> s.index("product_index")
                            .source(c -> c.filter(e -> e.includes("productId", "title", "rating",  "images","productTags")))
                            .query(q -> q.terms(t -> t.field("productId").terms(ts -> ts.value(values)))),
                    ProductVO.class);
                

分页查询返回

根据前端传入的分页参数,当前页(pageNo)和每页的条数(pageSize)执行查询

  PageResult<ProductVO> pageResult = new PageResult<>();
 Integer pageIndex = requestVO.getPageNo();
 int pageSize = requestVO.getPageSize() != null ? requestVO.getPageSize() : 10;
 int from = (pageIndex - 1) * pageSize;
SearchResponse<ProductVO> search = openSearchClient.search(
                    s -> s.index("product_index")
                            .source(c -> c.filter(e -> e.includes("productId", "title", "rating",  "images","productTags")))
                            .from(from)
                            .size(pageSize)
                            .query(q -> q.terms(t -> t.field("productId").terms(ts -> ts.value(values)))),
                    ProductVO.class);
List<ProductVO> productList = Lists.newArrayList();                    
if (CollectionUtils.isNotEmpty(search.hits().hits())) {
                search.hits().hits().forEach(h -> {
                    ProductVO productVo = h.source();
                    productList.add(productVo);
                });
            }

   int total = Math.toIntExact(search.hits().total().value());
   pageResult.setCurrentPage(pageIndex);
   pageResult.setPageSize(pageSize);
   pageResult.setTotal(total);
   if (pageResult.getTotal() % pageResult.getPageSize() == 0) {
       pageResult.setTotalPage(pageResult.getTotal() / pageResult.getPageSize());
   } else {
       pageResult.setTotalPage((pageResult.getTotal() / pageResult.getPageSize()) + 1);
   }
	pageResult.setItems(productList);

复合查询

List<Query> mustQueryList = new ArrayList<>();
List<Query> mustNotQueryList = new ArrayList<>();

List<FieldValue> values = new ArrayList<>();
List<String> languages = List.of("zh");
languages.forEach(c -> {
    values.add(FieldValue.of(c));
});
int rating = Integer.parseInt("4.5");
Query ratingQuery = RangeQuery.of(r -> r.field("rating")
        .gte(JsonData.of(rating - 0.25)).lt(JsonData.of(rating + 0.75))).toQuery();
        
List<FieldValue> categoriesValues = new ArrayList<>();
categoriesValues.add(FieldValue.of("CA0001","CA0002"));
Query nestedQuery = NestedQuery.of(n -> n
        .path("categories")
        .query(q -> q
                .terms(r -> r
                        .field("categories.id")
                        .terms(t -> t.value(categoriesValues))
                        .boost(1000f)
                )
        )
).toQuery();
mustQueryList.add(TermsQuery.of(t -> t.field("language").terms(new TermsQueryField.Builder()
                .value(values).build())).toQuery());
mustQueryList.add(ratingQuery);
mustNotQueryList.add(nestedQuery);
Query complexQuery = BoolQuery.of(b -> b
                .must(mustQueryList)
                .mustNot(mustNotQueryList)).toQuery();
SearchResponse<ProductVO> search = openSearchClient.search(
                    s -> s.index("product_index")
                            .source(c -> c.filter(e -> e.includes("productId", "title", "rating",  "images","productTags")))
                            .from(from)
                            .size(pageSize)
                            .query(complexQuery),
                    ProductVO.class);

聚合统计

以下是聚合查询商品每个评分的数量

 Aggregation aggregation = Aggregation.of(a -> a.terms(ts -> ts.field("rating").size(1000)));
SearchResponse<ProductBO> search = openSearchClient.search(
                    s -> s.index("product_index")
                            .source(c -> c.filter(e -> e.includes("productId", "title", "rating")))
                            .aggregations("ratingAgg", aggregation)
                            .query(filterQuery),
                    ProductBO.class);
if (null != search.aggregations()) {
    Collection<Aggregate> aggregateCollection = search.aggregations().values();
    List<FacetHit> facetHitsList = Lists.newArrayList();
    aggregateCollection.forEach(aggregate -> {
       String kind = aggregate._kind().name();
       log.info("The aggregation type is {}", kind);
       switch (kind) {
           case "Nested" -> {
               Collection<Aggregate> nestedAggregateCollection = aggregate.nested().aggregations().values();
               nestedAggregateCollection.forEach(nestedAggregate -> {
                   addFacetHits(nestedAggregate, facetHitsList);
               });
           }
           case "Sterms" -> {
               addFacetHits(aggregate, facetHitsList);
           }
           case "Dterms" -> {
               Buckets<DoubleTermsBucket> buckets = aggregate.dterms().buckets();
               buckets.array().forEach(bucket -> {
                   String key = String.valueOf(bucket.key());
                   FacetHit facetHit = new FacetHit();
                   facetHit.setCount(Math.toIntExact(bucket.docCount()));
                   facetHit.setValue(key);
                   facetHitsList.add(facetHit);
               });
           }
           default -> log.warn("Unrecognized type:{} cannot be processed", kind);
       }
    });
}


private void addFacetHits(Aggregate aggregate, List<FacetHit> facetHitsList) {
    Buckets<StringTermsBucket> buckets = aggregate.sterms().buckets();
    List<StringTermsBucket> stringTermsBuckets = buckets.array();
    stringTermsBuckets.forEach(s -> {
        String key = s.key();
        long docCount = s.docCount();
        FacetHit facetHit = new FacetHit();

        facetHit.setCount(Math.toIntExact(docCount));

        Aggregate parentAggregate = s.aggregations().get("parent_docs");
        if (null != parentAggregate && AggregateConstants.KIND_REVERSE_NESTED.equals(parentAggregate._kind().name())) {
            ReverseNestedAggregate reverseNested = parentAggregate.reverseNested();
            if (reverseNested != null) {
                long parentDocCount = reverseNested.docCount();
                facetHit.setCount(Math.toIntExact(parentDocCount));
            }
        }
        facetHit.setValue(key);
        facetHitsList.add(facetHit);
    });
}

基础排序

按照定义的排序字段进行排序

SearchResponse<ProductVO> search = openSearchClient.search(
                    s -> s.index("product_index")
                            .source(c -> c.filter(e -> e.includes("productId", "title", "rating",  "images","productTags")))
                            .from(from)
                            .size(pageSize)
                            .query(q -> q.terms(t -> t.field("productId").terms(ts -> ts.value(values))))
                            .sort(t -> t.field(f -> f.field("publishDate").order(SortOrder.Desc))) 
                            .sort(t -> t.field(f -> f.field("rating").order(SortOrder.Desc))),
                    ProductVO.class);

高阶排序

按照特定的一批商品排在查询结果的最前面

List<String> topProductIdList = List.of("1","2");//特定的商品编号
SearchResponse<ProductVO> search = openSearchClient.search(
                    s -> s.index("product_index")
                            .source(c -> c.filter(e -> e.includes("productId", "title", "rating",  "images","productTags")))
                            .from(from)
                            .size(pageSize)
                            .query(q -> q.terms(t -> t.field("productId").terms(ts -> ts.value(values))))
                            .sort(getHightSortOptions(topProductIdList)), 
                    ProductVO.class);

private List<SortOptions> getHightSortOptions(List<String> topProductIdList) {
        List<SortOptions> sortOptions = Lists.newArrayList();
        sortOptions.add(SortOptions.of(f -> f.script(st -> st.type(ScriptSortType.Number)
                   .script(Script.of(sf -> sf.inline(ie -> ie
                           .source("params.topProductIds.indexOf(doc['productId'].value) >= 0 ? params.topProductIds.indexOf(doc['productId'].value) : params.topProductIds.size()")
                           .lang("painless")
                           .params(Map.of("topProductIds", JsonData.of(topProductIdList)))
                   )))
                   .order(SortOrder.Asc))));
        
        sortOptions.add(SortOptions.of(t -> t.field(f -> f.field("rating").order(SortOrder.Desc))));
        sortOptions.add(SortOptions.of(t -> t.field(f -> f.field("publishDate").order(SortOrder.Desc))));
        return sortOptions;
    }

查看OpenSearch的数据

可以通过OpenSearch dashboard查看,如下图所示:
在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到