文件上传之读取文件内容保存到ES

发布于:2025-09-08 ⋅ 阅读:(16) ⋅ 点赞:(0)

文件上传之读取文件内容保存到ES

摘要:
本文介绍了使用Tika和Tesseract实现文件内容解析并存储到Elasticsearch的技术方案。主要内容包括:1)Maven依赖配置,集成了Spring Boot WebFlux、Tika文档解析库和Tesseract OCR图片识别库;2)Tesseract的详细配置类,包含中文识别路径设置、语言包检查、引擎参数优化等;3)通过YML文件配置Tesseract数据目录。该方案支持图片和文档的内容提取,为后续存入Elasticsearch提供了基础解析能力。

前言:
1,针对图片或文档解析,读取里面的内容
2,使用的技术 图片解析Tesseract,文档解析 Tika

功能实现如下

1,maven依赖如下

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath/>
    </parent>
     <!-- 统一管理jar包版本 -->
    <properties>
        <java.version>1.8</java.version>
          <elasticsearch.version>7.17.9</elasticsearch.version>
    </properties>

        <!-- Reactor Netty (WebClient 的默认实现) -->
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty</artifactId>
        </dependency>

        <!-- Spring WebFlux -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
 <!-- Tika用于文档内容提取 -->
        <dependency>
            <groupId>org.apache.tika</groupId>
            <artifactId>tika-core</artifactId>
            <version>1.28.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.tika</groupId>
            <artifactId>tika-parsers</artifactId>
            <version>1.28.4</version>
        </dependency>

        <!-- Tesseract OCR用于图片文字识别 -->
        <dependency>
            <groupId>net.sourceforge.tess4j</groupId>
            <artifactId>tess4j</artifactId>
            <version>4.5.4</version>
        </dependency>

        <!-- ES 依赖 -->
        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>

        <!-- Elasticsearch 客户端 -->
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.elasticsearch.client</groupId>
                    <artifactId>elasticsearch-rest-client</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-client</artifactId>
            <version>${elasticsearch.version}</version>
        </dependency>

2,配置类

yml的配置如下

# 本地读取文档中文字描述的位置
tesseract:
  dataDir: D:\\tessdata

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
public class TessErActConfig {

    @Value("${tesseract.dataDir}")
    private String dataDir;

}

import lombok.extern.slf4j.Slf4j;
import net.sourceforge.tess4j.Tesseract;
import org.apache.tika.Tika;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.File;

/**
 * @author psd
 */
@Slf4j
@Configuration
public class ContentExtractionConfig {

    @Autowired
    TessErActConfig tessErActConfig;

    /**
     * 配置Tika Bean用于文档内容提取
     */
    @Bean
    public Tika tika() {
        return new Tika();
    }


    @Bean
    public Tesseract tesseract() {
        Tesseract tesseract = new Tesseract();

        try {
            // 使用指定的外部目录
            File tessDataDir = new File(tessErActConfig.getDataDir());

            // 检查目录是否存在
            if (!tessDataDir.exists() || !tessDataDir.isDirectory()) {
                throw new RuntimeException("Tesseract数据目录不存在或不是目录: " + tessErActConfig.getDataDir());
            }

            // 检查语言文件是否存在
            File chiSimFile = new File(tessDataDir, "chi_sim.traineddata");
            File engFile = new File(tessDataDir, "eng.traineddata");
            if (!chiSimFile.exists() || !engFile.exists()) {
                log.warn("语言文件不存在: {} 或 {}", chiSimFile.getAbsolutePath(), engFile.getAbsolutePath());
                // 可以选择抛出异常或使用默认语言
            }

            // 设置数据路径
            tesseract.setDatapath(tessErActConfig.getDataDir());

            // 设置识别语言(中文简体+英文)
            tesseract.setLanguage("chi_sim+eng");

            // 设置OCR引擎模式(使用默认值3)
            tesseract.setOcrEngineMode(3);

            // 设置页面分割模式(PSM_AUTO = 3)
            tesseract.setPageSegMode(6);
            // 设置DPI
            tesseract.setTessVariable("user_defined_dpi", "300");
            // 禁用调试输出
            tesseract.setTessVariable("debug_file", "/dev/null");
            log.info("Tesseract配置成功,使用数据路径: {}", tessErActConfig.getDataDir());

        } catch (Exception e) {
            log.error("Tesseract配置失败", e);
            throw new RuntimeException("Tesseract配置失败: " + e.getMessage(), e);
        }

        return tesseract;
    }

}

3,提取图片内容保存到ES

@Override
    public void inserTemergencyProcessingMessage(TemergencyProcessingEntity processingEntity, Set<Long> flIds) {
        log.info("开始处理ES索引,紧急处理文档ID: {}, 文件ID: {}", processingEntity.getId(), flIds);
        try {
            // 1.获取文件信息
            List<FileListEntity> fileListEntities = ossSmartFileClient.queryFileListByIds(flIds);
            
            // ... 业务逻辑
           
            // key:fileId value:FileInfo
            Map<Long, FileInfo> fileInfoMap = fileListEntities.stream()
                    .collect(Collectors.toMap(FileListEntity::getId, x -> FileInfo.builder().filePath(x.getUrl()).originalName(x.getFileName()).build()));
            // 2.异步提取所有文件的内容
            Map<Long, String> fileContentsMap = fileContentExtractorService.batchExtractContent(fileInfoMap);
            // 3.构建索引文档
            TemergencyProcessingEsDocument esDocument = buildEsDocument(processingEntity, fileListEntities, fileContentsMap);
            // 4.索引到ES
            IndexRequest indexRequest = new IndexRequest("temergencn_plans").id(processingEntity.getId().toString());
            String processingEsDocJson = objectMapper.writeValueAsString(esDocument);
            // 添加文档数据,数据转换为Json
            indexRequest.source(processingEsDocJson, XContentType.JSON);

            IndexResponse response = esClient.index(indexRequest, RequestOptions.DEFAULT);
            log.info("新增x x xES的结果是:{},id是:{}", response.getResult(), response.getId());
        } catch (IOException e) {
            log.error("新增xxxES失败,知识文库的id是:{}", processingEsDocJson.getId(), e);
        }
    }

4,解析文件方法


import xx.config.FileDownloader;
import x x.config.ImagePreprocessor;
import com.xx.xx.entity.info.FileInfo;
import com.xx.xxi.service.FileContentExtractorService;
import lombok.extern.slf4j.Slf4j;
import net.sourceforge.tess4j.Tesseract;
import org.apache.commons.io.FilenameUtils;
import org.apache.tika.Tika;
import org.apache.tika.exception.TikaException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.io.*;
import java.nio.file.Files;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author psd
 */
@Slf4j
@Service
public class FileContentExtractorServiceImpl implements FileContentExtractorService {

    @Autowired
    private Tika tika;

    @Autowired
    private Tesseract tesseract;

    @Resource
    private ThreadPoolExecutor threadCustomPoolExecutor;

    @Autowired
    private ImagePreprocessor imagePreprocessor;

    @Autowired
    private FileDownloader fileDownloader;

    /**
     * 限制并发OCR任务数量,避免资源竞争
     */
    private final Semaphore ocrSemaphore = new Semaphore(5);

    @Override
    public Map<Long, String> batchExtractContent(Map<Long, FileInfo> fileInfoMap) {
        log.info("开始批量解析上传文件的数据,获取中文,英文:{}",fileInfoMap);
        Map<Long, String> resultMaps = new ConcurrentHashMap<>();
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        for (Map.Entry<Long, FileInfo> infoEntry : fileInfoMap.entrySet()) {
            Long fileId = infoEntry.getKey();
            FileInfo fileInfo = infoEntry.getValue();
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                try {
                    // 获取信号量许可,限制并发数
                    ocrSemaphore.acquire();
                    String context = safeExtractContent(fileId, fileInfo.getOriginalName(), fileInfo.getFilePath());
                    resultMaps.put(fileId, context);
                    log.info("成功提取文件ID: {} 的内容,长度: {}", fileId, context.length());
                } catch (Exception e) {
                    log.error("提取文件内容失败,fileId: {}", fileId, e);
                    resultMaps.put(fileId, "提取失败: " + e.getMessage());
                } finally {
                    // 释放信号量
                    ocrSemaphore.release();
                }
            }, threadCustomPoolExecutor);
            futures.add(future);
        }
        try {
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        } catch (Exception e) {
            log.error("批量提取内容任务执行异常", e);
        }
        return resultMaps;
    }

    public String safeExtractContent(Long fileId, String originalName, String filePath) {
        File file = null;
        // 标记是否为临时文件
        boolean isTempFile  = false;
        try {
            String extension = FilenameUtils.getExtension(originalName).toLowerCase();
            log.info("文件 extension 扩展名是:{}" , extension);
            String content = "";
            // 3.获取文件
            // 处理网络文件
            if (filePath.startsWith("http://") || filePath.startsWith("https://")) {
                file = fileDownloader.downloadFile(filePath,originalName);
                // 标记为临时文件
                isTempFile = true;
                log.info("下载的临时文件路径: {}", file != null ? file.getAbsolutePath() : "null");
            }
            if (null == file || !file.exists()){
                log.warn("文件不存在:{}", filePath);
                return "文件不存在";
            }
            log.info("file 的文件是:{}" ,file);
            if (!file.exists()) {
                log.warn("文件不存在:{}", filePath);
                return "文件不存在";
            }
            // 4.判断文件类型
            if (Arrays.asList("jpg", "jpeg", "png", "bmp", "gif", "tiff").contains(extension)) {
                log.info("开始处理图片类型的文件:{}",file);
                content = safeExtractTextFromImage(file);
                content = originalName;
                log.info("获取到 图片类型 content 的信息是:{}" , content);
            }
            // 处理文档文件
            else if (Arrays.asList("pdf", "doc", "docx", "xls", "xlsx", "ppt", "pptx", "txt").contains(extension)) {
                log.info("开始处理文档文件 :{}" ,file);
                content = extractTextFromDocument(file);
                log.info("获取的 文档 content 的信息是:{}" , content);
            }
            // 处理音频文件
            else if (Arrays.asList("mp3", "wav", "ogg").contains(extension)) {
//                content = extractTextFromAudio(file);
                // TODO:后面调用公司大模型接口
                content = originalName;
                log.info("开始 处理音频文件 ,当前只返回音频的名字:{}" , content);
            }

            // 处理视频文件
            else if (Arrays.asList("mp4", "avi", "webm", "mov", "wmv").contains(extension)) {
//                content = extractTextFromVideo(file);
                // TODO:后面调用公司大模型接口
                content = originalName;
                log.info("开始 处理视频文件 ,当前只返回视频的名字:{}" , content);
            }
            // 其他文件类型
            else {
                // 不支持的文件类型
                content = "";
                log.info("不支持内容提现的文件类型:{}", extension);
            }

            return content.trim();
        } catch (Exception e) {
            log.error("提取文件内容失败,fileId:{}", fileId, e);
            return "";
        } finally {
            // 清理临时文件
            try {
                if (null != file && file.exists() && isTempFile){
                    boolean isDelete = file.delete();
                    if (isDelete){
                        log.info("删除临时文件成功:{}", file.getAbsolutePath());
                    } else {
                        log.warn("临时文件删除失败: {}", file.getAbsolutePath());
                        // 尝试强制删除
                        // 尝试强制删除
                        System.gc(); // 建议垃圾回收
                        // 稍等片刻
                        Thread.sleep(100);
                        if (file.delete()) {
                            log.info("临时文件强制删除成功: {}", file.getAbsolutePath());
                        } else {
                            log.error("临时文件强制删除也失败: {}", file.getAbsolutePath());
                        }
                    }
                }
            } catch (Exception e) {
                log.error("删除临时文件时发生异常: {}", file.getAbsolutePath(), e);
            }
        }
    }

    private String safeExtractTextFromImage(File imageFile) {

        File processedImage = null;
        try {
            // 预处理图片
            processedImage = imagePreprocessor.preprocessImage(imageFile);

            // 使用同步块确保线程安全
            synchronized (this) {
                // 设置Tesseract参数
                tesseract.setLanguage("chi_sim+eng");
                // 假定为统一文本块
                tesseract.setPageSegMode(6);
                // 默认引擎
                tesseract.setOcrEngineMode(3);
                // 设置DPI
                tesseract.setTessVariable("user_defined_dpi", "300");

                return tesseract.doOCR(processedImage);
            }
        } catch (Exception e) {
            log.error("OCR提取失败: {}", imageFile.getName(), e);
            // 备用方案:尝试使用原始图片和更保守的设置
            try {
                synchronized (this) {
                    // 单行文本
                    tesseract.setPageSegMode(7);
                    // 仅使用传统引擎
                    tesseract.setOcrEngineMode(1);
                    // 使用原始图片
                    String doOCR = tesseract.doOCR(imageFile);
                    log.info("备用的方案 doOCR 的数据是: {}" ,doOCR);
                    return doOCR;
                }
            } catch (Exception ex) {
                log.error("备用OCR提取也失败: {}", imageFile.getName(), ex);
                return "OCR处理失败";
            }
        } finally {
            // 清理临时文件
            if (processedImage != null && processedImage.exists()) {
                processedImage.delete();
            }
        }
    }

    /**
     * 提取音频文件内容(语音转文字)
     */
    private String extractTextFromAudio(File audioFile) {
        // 这里需要集成语音识别服务,如阿里云、腾讯云的语音识别API
        // 由于实现复杂,这里只做简单示例
        log.info("开始处理音频文件: {}", audioFile.getName());

        try {
            // 调用语音识别服务
            // return "音频内容待识别";
            return audioFile.getName();
        } catch (Exception e) {
            log.error("音频识别失败", e);
            return "";
        }
    }

    /**
     * 提取视频文件内容(包含字幕和语音识别)
     * 
     * @param videoFile
     *            视频文件
     * @return 提取内容
     */
    private String extractTextFromVideo(File videoFile) {
        log.info("开始处理视频文件: {}", videoFile.getName());

        try {
            // TODO:后面调用公司大模型接口
            return videoFile.getName();
        } catch (Exception e) {
            log.error("视频处理失败", e);
            return "";
        }
    }

    /**
     * 提取视频中的字幕
     * 
     * @param videoFile
     *            videoFile
     * @return 字幕文本
     */
    private String extractSubtitlesFromVideo(File videoFile) {
        // 使用FFmpeg提取字幕
        // 实现略,需要集成FFmpeg
        // return "";
        return videoFile.getName();
    }

    /**
     * 从视频中提取音频
     */
    private File extractAudioFromVideo(File videoFile) throws IOException, InterruptedException {
        String outputPath = videoFile.getPath() + "/audio_" + System.currentTimeMillis() + ".wav";
        File outputFile = new File(outputPath);

        // 使用FFmpeg提取音频
        ProcessBuilder pb = new ProcessBuilder("ffmpeg", "-i", videoFile.getAbsolutePath(), "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
                outputPath);

        Process process = pb.start();
        int exitCode = process.waitFor();

        if (exitCode != 0) {
            throw new IOException("FFmpeg处理失败,退出码: " + exitCode);
        }

        return outputFile;
    }

    /**
     * 使用Tika提取文档文本
     *
     * @param documentFile
     *            文档型文件
     * @return 文档内容
     */
    private String extractTextFromDocument(File documentFile) throws IOException, TikaException {
        try (InputStream stream = Files.newInputStream(documentFile.toPath());) {
            return tika.parseToString(stream);
        }
    }



}

5,下载文件

这里是因为文件在另一个微服务,不能直接访问,需要下载到本地再删除【如果可以访问服务这一步可删除】


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.io.Resource;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.scheduler.Schedulers;
import reactor.netty.http.client.HttpClient;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.time.Duration;

/**
 * @author psd 用于下载临时文件
 */
@Slf4j
@Component
public class FileDownloader {

    private final WebClient webClient;

    @Value("${smart.file.url}")
    private String baseUrl;

        public FileDownloader() {
        // 增加内存缓冲区的大小 512MB   【图片最大支持 100MB 视频最大512MB】
        final int bufferSize = 1024 * 1024 * 512;
        final ExchangeStrategies strategies = ExchangeStrategies.builder()
                .codecs(codecs -> codecs.defaultCodecs().maxInMemorySize(bufferSize))
                .build();

        // 配置连接超时和响应超时
        HttpClient httpClient =  HttpClient.create()
                .responseTimeout(Duration.ofSeconds(30));

        this.webClient = WebClient.builder()
                .baseUrl(baseUrl)
                // 配置缓冲区
                .exchangeStrategies(strategies)
                // 配置超时
                .clientConnector(new ReactorClientHttpConnector(httpClient))
                .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
                .build();
    }

    public File downloadFile(String fileUrl, String originalName) throws IOException {
        // 创建临时文件
        String tempFileName = "temp_" + System.currentTimeMillis() + "_" + originalName;
        File tempFile = File.createTempFile(tempFileName, null);

        try {
            webClient.get()
                    .uri(fileUrl)
                    .retrieve()
                    // 用于表示可读取的资源
                    .bodyToMono(Resource.class)
                    // 提供响应式调度的线程池
                    .subscribeOn(Schedulers.boundedElastic())
                    .map(resource -> {
                        try (InputStream inputStream = resource.getInputStream()) {
                            Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
                            log.info("文件下载成功: {}", tempFile.getAbsolutePath());
                            return tempFile;
                        } catch (IOException e) {
                            throw new RuntimeException("文件写入失败", e);
                        }
                    })
                    // 阻塞直到完成
                    .block();
            return tempFile;
        } catch (Exception e) {
            log.error("文件下载失败: {}", fileUrl, e);
            if (tempFile.exists()) {
                tempFile.delete();
            }
            throw new IOException("文件下载失败", e);
        }
    }
}

遇到的问题

1,直接访问文件的地址访问不到,需要新下载到本地
2,下载文件的大小,要重新配置下,不然大文件会报错
3,配置OCR maven是 4.5.4 ,服务器仓库没有这个版本,用的是4.x版本也是适用【要把 中文英文解析配置文件copy到config的配置目录,4.1x没这个配置】

喜欢我的文章记得点个在看,或者点赞,持续更新中ing…


网站公告

今日签到

点亮在社区的每一天
去签到