Elasticsearch:对 Java 对象的 ES|QL 查询

发布于:2024-05-09 ⋅ 阅读:(29) ⋅ 点赞:(0)

作者:Laura Trotta

ES|QL 是 Elasticsearch 引入的一种新的查询语言,它将简化的语法与管道操作符结合起来,使用户能够直观地推断和操作数据。官方 Java 客户端的新版本 8.13.0 引入了对 ES|QL 查询的支持,提供了一个新的 API,允许轻松执行查询,并自动将结果翻译为 Java 对象。

先决条件

  • Elasticsearch 版本 >= 8.11.0
  • Java 版本 >= 17

摄取数据

在开始查询之前,我们需要有一些可用的数据:我们将使用 Java 客户端中提供的 BulkIngester 实用程序类将此 csv 文件存储到 Elasticsearch 中。 该 csv 列出了 Amazon Books Reviews 数据集中的书籍,并使用以下标题行对它们进行分类:

Title;Description;Author;Year;Publisher;Ratings

首先,我们必须创建索引以正确映射字段:

if (!client.indices().exists(ex -> ex.index("books")).value()) {
    client.indices().create(c -> c
        .index("books")
        .mappings(mp -> mp
            .properties("title", p -> p.text(t -> t))
            .properties("description", p -> p.text(t -> t))
            .properties("author", p -> p.text(t -> t))
            .properties("year", p -> p.short_(s -> s))
            .properties("publisher", p -> p.text(t -> t))
            .properties("ratings", p -> p.halfFloat(hf -> hf))
        ));
}

然后是书籍的 Java 类:

public record Book(
    String title,
    String description,
    String author,
    Integer year,
    String publisher,
    Float ratings
){}

我们将使用 Jackson 的 CSV 映射器来读取该文件,所以让我们对其进行配置:

CsvMapper csvMapper = new CsvMapper();
CsvSchema schema = CsvSchema.builder()
    .addColumn("title") // same order as in the csv
    .addColumn("description")
    .addColumn("author")
    .addColumn("year")
    .addColumn("publisher")
    .addColumn("ratings")
    .setColumnSeparator(';')
    .setSkipFirstDataRow(true)
    .build();

MappingIterator<Book> iter = csvMapper
    .readerFor(Book.class)
    .with(schema)
    .readValues(new FileReader("/path/to/file/books.csv"));

然后我们将逐行读取 csv 文件并使用 BulkIngester 优化摄取:

BulkIngester ingester = BulkIngester.of(bi -> bi
    .client(client)
    .maxConcurrentRequests(20)
    .maxOperations(5000));

boolean hasNext = true;
while (hasNext) {
    try {
        Book book = iter.nextValue();
        ingester.add(BulkOperation.of(b -> b
            .index(i -> i
            .index("books")
            .document(book))));
        hasNext = iter.hasNextValue();
    } catch (JsonParseException | InvalidFormatException e) {
        // ignore malformed data
    }
}

ingester.close();

索引编制大约需要 15 秒,但完成后,我们的图书索引将包含约 80K 文档,可供查询。

ES|QL

现在是时候从书籍数据中提取一些信息了。 假设我们想要找到阿西莫夫作品的最新重印本:

String queryAuthor =
    """
    from books
    | where author == "Isaac Asimov"
    | sort year desc
    | limit 10
    """;
List<Book> queryRes = (List<Book>) client.esql()
    .query(ObjectsEsqlAdapter.of(Book.class),queryAuthor);

感谢使用 Book.class 作为目标的 ObjectsEsqlAdapter,我们可以忽略 ES|QL 查询的 json 结果是什么,而只关注客户端自动返回的更熟悉的书籍列表。

对于那些习惯 SQL 查询和 JDBC 接口的人来说,客户端还提供了 ResultSetEsqlAdapter,可以以同样的方式使用它,而是返回一个 java.sql.ResultSet。

ResultSet resultSet = esClient.esql()
    .query(ResultSetEsqlAdapter.INSTANCE,queryAuthor);

另一个例子,我们现在想要找出企鹅图书中评分最高的书籍:

String queryPublisher =
    """
    from books
    | where publisher == "Penguin"
    | sort ratings desc
    | limit 10
    | sort title asc
    """;

queryRes = (List<Book>) client.esql()
    .query(ObjectsEsqlAdapter.of(Book.class), queryPublisher);

用于检索数据的 Java 代码保持不变,因为结果仍然是书籍列表。 当然也有例外,例如,如果查询使用 eval 命令添加新列,则应修改 Java 类以表示新结果。

本文的完整代码可以在官方客户端存储库中找到。 如有任何疑问或问题,请随时通过讨论联系。

准备好将 RAG 构建到您的应用程序中了吗? 想要尝试使用矢量数据库的不同 LLMs?
Github 上查看我们的 LangChain、Cohere 等示例 notebooks,并参加即将开始的 Elasticsearch 工程师培训

原文:ES|QL queries to Java objects — Elastic Search Labs