Elasticsearch:spring2.x集成elasticsearch8.x

发布于:2025-06-06 ⋅ 阅读:(19) ⋅ 点赞:(0)

相关安装就不介绍了直接代码集成

<!-- elasticsearch版本需要和你安装的版本一致 -->

<properties>
    <elasticsearch.version>8.11.1</elasticsearch.version>
    <jakarta-json.version>2.1.2</jakarta-json.version>
    <logstash.version>7.2</logstash.version>
    <jakarta-json-bind.version>3.0.0</jakarta-json-bind.version>
</properties>


<dependencies>
    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch</artifactId>
        <version>${elasticsearch.version}</version>
    </dependency>

    <dependency>
        <groupId>co.elastic.clients</groupId>
        <artifactId>elasticsearch-java</artifactId>
        <version>${elasticsearch.version}</version>
        <exclusions>
            <exclusion>
                <artifactId>jakarta.json-api</artifactId>
                <groupId>jakarta.json</groupId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>jakarta.json</groupId>
        <artifactId>jakarta.json-api</artifactId>
        <version>${jakarta-json.version}</version>
    </dependency>

    <dependency>
        <groupId>jakarta.json.bind</groupId>
        <artifactId>jakarta.json.bind-api</artifactId>
        <version>${jakarta-json-bind.version}</version>
    </dependency>

    <dependency>
        <groupId>net.logstash.logback</groupId>
        <artifactId>logstash-logback-encoder</artifactId>
        <version>${logstash.version}</version>
    </dependency>
</dependencies>
elasticsearch:
  # 集群配置(多个节点时使用,用逗号分隔)
  hosts: 127.0.0.1:9201,127.0.0.1:9202,127.0.0.1:9203
  
  # 认证信息
  username: kingxxxx
  password: xxxxx
  
  # API密钥(可选,与用户名/密码认证互斥)
  apikey:
  
  # 已注释的旧配置项(建议移除)
  host: 127.0.0.1
  port: 9200
package com.kingbal.config;

import cn.hutool.core.util.StrUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.ClassPathResource;
import org.springframework.util.StringUtils;

import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
import java.util.stream.Stream;
import static java.util.stream.Collectors.toList;

/**
 * <b>Function: </b> todo
 *
 * @program: ElasticSearchConfig
 * @Package: com.kingbal.config
 * @author: chocho
 * @date: 2025/06/04
 * @version: 1.0
 * @Copyright: 2025 www.kingbal.com Inc. All rights reserved.
 */
@Data
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "gospel.elasticsearch")
public class ElasticSearchConfig {

    private String host;

    private int port;

    private String hosts;

    private String username;

    private String password;

    private String apikey;

    /**
     * 单节点没密码连接
     *
     * @return
     */
    @Bean
    @Primary
    public ElasticsearchClient client() {
        ElasticsearchTransport transport = null;
        // 不是集群时
        if (hosts.split(",").length == 1) {
            // 无账号、密码
            if (StrUtil.isEmpty(username) && StrUtil.isEmpty(password)) {
                RestClient client = RestClient.builder(new HttpHost(host, port, "http")).build();
                transport = new RestClientTransport(client, new JacksonJsonpMapper());
            } else {
                // 账号密码的配置
                final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
                // 自签证书的设置,并且还包含了账号密码
                RestClientBuilder.HttpClientConfigCallback callback = httpAsyncClientBuilder -> httpAsyncClientBuilder
                        .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
                        .setDefaultCredentialsProvider(credentialsProvider)
                        . addInterceptorLast(
                                (HttpResponseInterceptor)
                                        (response, context) ->
                                                response.addHeader("X-Elastic-Product", "Elasticsearch"));

                RestClient client = RestClient.builder(new HttpHost(host, port, "http"))
                        .setHttpClientConfigCallback(callback)
                        .build();
                transport = new RestClientTransport(client, new JacksonJsonpMapper());
            }
        } else {
            // 无账号、密码
            if (StrUtil.isEmpty(username) && StrUtil.isEmpty(password)) {
                transport = getElasticsearchTransport(toHttpHost());
            } else {
                transport = getElasticsearchTransport(username, password, toHttpHost());
            }
        }
        return new ElasticsearchClient(transport);
    }

    /**
     * ESes自签证书连接
     *
     * @return
     */
    @Bean
    public ElasticsearchClient clientByApiKey() {
        ElasticsearchTransport transport = null;
        if (StrUtil.isNotEmpty(apikey)) {
            transport = getElasticsearchTransport(apikey, toHttpHost());
        }
        return new ElasticsearchClient(transport);
    }


    private HttpHost[] toHttpHost() {
        if (!StringUtils.hasLength(hosts)) {
            throw new RuntimeException("invalid elasticsearch configuration");
        }
        String[] hostArray = hosts.split(",");
        HttpHost[] httpHosts = new HttpHost[hostArray.length];
        HttpHost httpHost;
        for (int i = 0; i < hostArray.length; i++) {
            String[] strings = hostArray[i].split(":");
            httpHost = new HttpHost(strings[0], Integer.parseInt(strings[1]), "http");
            httpHosts[i] = httpHost;
        }
        return httpHosts;
    }

    private static ElasticsearchTransport getElasticsearchTransport(String username, String password, HttpHost... hosts) {
        // 账号密码的配置
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));

        // 自签证书的设置,并且还包含了账号密码
        RestClientBuilder.HttpClientConfigCallback callback = httpAsyncClientBuilder -> httpAsyncClientBuilder
                .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
                .setDefaultCredentialsProvider(credentialsProvider)
                .addInterceptorLast(
                        (HttpResponseInterceptor)
                                (response, context) ->
                                        response.addHeader("X-Elastic-Product", "Elasticsearch"))
                .addInterceptorLast((HttpResponseInterceptor) (response, context)
                        -> response.addHeader("X-Elastic-Product", "Elasticsearch"));
        // 用builder创建RestClient对象
        RestClient client = RestClient
                .builder(hosts)
                .setHttpClientConfigCallback(callback)
                .build();

        return new RestClientTransport(client, new JacksonJsonpMapper());
    }

    private static ElasticsearchTransport getElasticsearchTransport(HttpHost... hosts) {
        // 用builder创建RestClient对象
        RestClient client = RestClient
                .builder(hosts)
                .build();

        return new RestClientTransport(client, new JacksonJsonpMapper());
    }

    private static ElasticsearchTransport getElasticsearchTransport(String apiKey, HttpHost... hosts) {
        // 将ApiKey放入header中
        Header[] headers = new Header[]{new BasicHeader("Authorization", "ApiKey " + apiKey)};

        // es自签证书的设置
        RestClientBuilder.HttpClientConfigCallback callback = httpAsyncClientBuilder -> httpAsyncClientBuilder
                .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
                .addInterceptorLast(
                        (HttpResponseInterceptor)
                                (response, context) ->
                                        response.addHeader("X-Elastic-Product", "Elasticsearch"));
        // 用builder创建RestClient对象
        RestClient client = RestClient
                .builder(hosts)
                .setHttpClientConfigCallback(callback)
              //  .setDefaultHeaders(headers)
                .build();

        return new RestClientTransport(client, new JacksonJsonpMapper());
    }

}
package com.kingbal.common.util;

import co.elastic.clients.elasticsearch._types.Refresh;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.DeleteIndexResponse;
import co.elastic.clients.elasticsearch.indices.GetIndexResponse;
import co.elastic.clients.transport.endpoints.BooleanResponse;
import co.elastic.clients.elasticsearch._types.Result;
import com.gospel.config.ElasticSearchConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;
import java.util.Map;

/**
 * <b>Function: </b> todo
 *
 * @program: ESUtils
 * @Package: com.kingbal.common.util
 * @author: chocho
 * @date: 2025/06/04
 * @version: 1.0
 * @Copyright: 2025 www.kingbal.com Inc. All rights reserved.
 */
@Slf4j
@Component
public class ESUtils<T> {

    @Autowired
    private ElasticSearchConfig config;

    /**
     * 增加index
     * @throws IOException
     */
    public void createIndex(String index) throws IOException {
        //写法比RestHighLevelClient更加简洁
        CreateIndexResponse indexResponse = config.client().indices().create(c -> c.index(index));
    }

    /**
     * 查询Index
     * @throws IOException
     */
    public GetIndexResponse queryIndex(String index) throws IOException {
        return config.client().indices().get(i -> i.index(index));
    }

    /**
     *  判断index是否存在
     * @return
     * @throws IOException
     */
    public boolean existsIndex(String index) throws IOException {
        BooleanResponse booleanResponse = config.client().indices().exists(e -> e.index(index));
        return booleanResponse.value();
    }

    /**
     * 删除index
     * @param index
     * @return
     * @throws IOException
     */
    public DeleteIndexResponse deleteIndex(String index) throws IOException {
        return config.client().indices().delete(d -> d.index(index));
    }

    /**
     * 插入数据
     */
    public void addDocument(String index, T t) throws IOException {
        try {
            // 确保索引存在
            this.existsIndex(index);
            IndexResponse response = config.client().index(i -> i
                    .index(index)
                    .document(t)
                    .refresh(Refresh.True)
            );

            if (isSuccessResponse(response.result())) {
                log.warn("Document added successfully. ID: {}" , response.id());
            } else {
                log.info("Failed to add document. Result: {}", response.result());
            }
        } catch (IOException e) {
            log.error("IO error while adding document", e);
        } catch (Exception e) {
            log.error("Unexpected error while adding document", e);
        }
    }

    /**
     * 批量插入Document
     */
    public BulkResponse addDocumentAll(String index, List<BulkOperation> bulkOperationArrayList) throws IOException {
        return config.client().bulk(b -> b.index(index).operations(bulkOperationArrayList));
    }

    /**
     * 更新Document
     * @throws IOException
     */
    public UpdateResponse<T> updateDocumentIndex(String index, String id, T t) throws IOException {
        return config.client().update(u -> u.index(index).id(id).doc(t), this.getDocumentClass());
    }

    /**
     * 判断Document是否存在
     * @throws IOException
     */
    public BooleanResponse existDocumentIndex(String index) throws IOException {
        return config.client().exists(e -> e.index(index).id("1"));
    }


    /**
     * 查询Document
     * @throws IOException
     */
    public GetResponse<T> getDocumentIndex(String index, String id) throws IOException {
        return config.client().get(g -> g.index(index).id(id), this.getDocumentClass());
    }

    /**
     * 分页查询[精确查找]
     * @param index
     * @param query
     * @param page
     * @param size
     * @return
     * @throws IOException
     */
    public SearchResponse<T> searchPage(String index, Map<String, Object> query, int page, int size) throws IOException {
        System.out.println("Elasticsearch 服务版本: " + config.client().info().version().number());
        return config.client().search(s -> s
                .index(index)
                .query(q -> q.matchAll(m -> m))
                .from((page - 1) * size)
                .size(size), this.getDocumentClass());
    }

    /**
     * 删除Document
     * @throws IOException
     */
    public DeleteResponse deleteDocumentIndex(String index, String id) throws IOException {
        return config.client().delete(d -> d.index(index).id(id).refresh(Refresh.True));
    }

    /**
     * 获取文档类型
     * 注意:由于Java泛型擦除,这里返回Object.class
     * 如果需要具体类型,建议通过构造函数传入Class对象
     */
    @SuppressWarnings("unchecked")
    private Class<T> getDocumentClass() {
        // 由于Java泛型擦除,这里只能返回Object.class
        // 实际使用中可以通过构造函数传入Class<T>参数
        return (Class<T>) Object.class;
    }

    /**
     * 判断响应结果是否成功
     */
    private boolean isSuccessResponse(Result result) {
        return result == Result.Created || result == Result.Updated;
    }

}
package com.kingbal.modules.es;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * <b>Function: </b> todo
 *
 * @program: User
 * @Package: com.kingbal.modules.es
 * @author: chocho
 * @date: 2025/06/04
 * @version: 1.0
 * @Copyright: 2025 www.kingbal.com Inc. All rights reserved.
 */
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class EsUser implements Serializable {

    private static final long serialVersionUID = -5139036572222038978L;

    public String userName;
    public int age;

    public EsUser() {}
    
}
package com.kingbal.modules.es;

import cn.hutool.core.collection.CollUtil;
import co.elastic.clients.elasticsearch.core.BulkResponse;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.search.Hit;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.gospel.common.util.ESUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.poi.ss.formula.functions.T;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * <b>Function: </b> todo
 *
 * @program: TestEsTest
 * @Package: com.kingbal.modules.es
 * @author: chocho
 * @date: 2025/06/04
 * @version: 1.0
 * @Copyright: 2025 www.kingbal.com Inc. All rights reserved.
 */
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class TestEsTest {

    @Autowired
    private ESUtils<EsUser> esUtil;

    @Test
    public void test() throws Exception{
        List<EsUser> userList = Lists.newArrayList();
        for (int i = 0; i < 12; i++){
            EsUser user = new EsUser();
            user.setUserName("aaa" + i);
            user.setAge(12 +i);
            userList.add(user);
        }
        String index = "test-all";
        if (CollUtil.isNotEmpty(userList)) {
            if (!esUtil.existsIndex(index)) {
                esUtil.createIndex(index);
            }
            List<BulkOperation> bulkOperationArrayList = new ArrayList<>();
            //遍历添加到bulk中
            for (EsUser obj : userList) {
                bulkOperationArrayList.add(BulkOperation.of(o -> o.index(i -> i.document(obj))));
            }
            BulkResponse bulkResponse = esUtil.addDocumentAll(index, bulkOperationArrayList);
            System.out.println("took:" + bulkResponse.took());
            System.out.println(bulkResponse.items());

            Map<String, Object> params = Maps.newHashMap();
            params.put("userName", "bb");

            EsUser esUser = new EsUser();
            esUser.setUserName("bbbb");
            esUser.setAge(2222);
            System.out.println("******************");
            esUtil.addDocument(index,  esUser);
            System.out.println("******************");
            SearchResponse<EsUser> getResponse = esUtil.searchPage(index, params, 1, 10);
            System.out.println(getResponse);
            assert getResponse.hits().total() != null;
            System.out.println("总数:" + getResponse.hits().total().value());
            if(getResponse.hits().total().value() > 0){
                getResponse.hits().hits().forEach(hit -> {
                    System.out.println(hit.index());
                    System.out.println(hit.id());
                    System.out.println(hit.source());
                });
            }

        }
    }

}

这样集成即完成,如果在启动的时候会去读取默认配置信息我们只需要在启动类中添加以下代码即可:

@SpringBootApplication(exclude = {  

org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration.class
})

网站公告

今日签到

点亮在社区的每一天
去签到