SpringBatch+Mysql+hanlp简版智能搜索

发布于:2025-06-04 ⋅ 阅读:(21) ⋅ 点赞:(0)

        资源条件有限,需要支持智搜的数据量也不大,上es搜索有点大材小用了,只好写个简版mysql的智搜,处理全文搜素,支持拼音搜索,中文分词,自定义分词断词,地图范围搜索,周边搜索,自定义多边形搜索。

      通过设置定时SpringBatch抽取需要进行检索的表,并将检索的标题,概要,内容等存入检索表中。通过检索表进行全文检索。

代码主要包:通过网盘分享的文件:mysql智搜.zip
链接: https://pan.baidu.com/s/1MMhmyVD8o56Grp1Aa5IiKQ 提取码: 0530

1、提供的接口



@RestController
@Tag(name = "智搜全局搜索API")
@RequestMapping(path = "/smart/serach")
public class SearchIndexController {

    @Resource
    private SearchIndexService searchIndexService;

    @PostMapping("/smartSearch")
    @Operation(summary = "智搜数据查询")
    @Log(title = "一张图智搜数据查询", businessType = BusinessType.QUERY)
    public Response<Page<SearchIndexDTO>> getSearchIndexDataByPage(@RequestBody SearchIndexParamDTO searchIndexParam) {
        return Response.with(searchIndexService.getSearchIndexDataByPage(searchIndexParam));
    }

    @PostMapping("/getSearchIndexByPage")
    @Operation(summary = "普通智搜内容分页查询")
    @Log(title = "普通智搜内容分页查询", businessType = BusinessType.QUERY)
    public Response<Page<SearchIndexDTO>> getSearchIndexByPage(@RequestBody SearchIndexParamDTO searchIndexParam) {
        return Response.with(searchIndexService.getSearchIndexByPage(searchIndexParam));
    }

    @PostMapping("/getSearchIndexById")
    @Operation(summary = "通过主键获取检索详细信息")
    @Log(title = "普通智搜内容分页查询", businessType = BusinessType.QUERY)
    public Response<SearchIndexDTO> getSearchIndexById(@RequestBody SearchIndexParamDTO searchIndexParam) {
        return Response.with(searchIndexService.getSearchIndexById(searchIndexParam));
    }

    @PostMapping("/smartSearchRiver")
    @Operation(summary = "智搜溯源河流数据查询")
    @Log(title = "一张图智搜溯源河流数据查询", businessType = BusinessType.QUERY)
    public Response<String> getSmartSearchRiver(@RequestBody SearchPointParamDTO searchIndexParam) {
        return Response.with(searchIndexService.getSmartSearchRiver(searchIndexParam));
    }

}

2、service

public interface SearchIndexService extends IService<SearchIndex> {
    Page<SearchIndexDTO> getSearchIndexByPage(SearchIndexParamDTO searchIndexParamDTO);
    SearchIndexDTO getSearchIndexById(SearchIndexParamDTO searchIndexParamDTO);

    Page<SearchIndexDTO> getSearchIndexDataByPage(SearchIndexParamDTO searchIndexParamDTO);
    String getSmartSearchRiver(SearchPointParamDTO searchIndexParam);

}
@Service
@Slf4j
@RequiredArgsConstructor
@DS("master")
public class SearchIndexServiceImpl extends ServiceImpl<SearchIndexMapper, SearchIndex> implements SearchIndexService {

    @Resource
    private SearchIndexMapper searchIndexMapper;

    @Resource
    private DataSourceService dataSourceService;

    @Resource
    private DynamicDataSourcesService dynamicDataSourcesService;

    @Override
    public Page<SearchIndexDTO> getSearchIndexByPage(SearchIndexParamDTO searchIndexParamDTO) {
        // 处理拼音查询
        if (searchIndexParamDTO.getSearchContent() != null) {
            boolean isPingyin = KeyWordUtils.isContainsPinyin(searchIndexParamDTO.getSearchContent());
            if (isPingyin) {
                searchIndexParamDTO.setSearchContent(KeyWordUtils.getPinyin(searchIndexParamDTO.getSearchContent(), CommonConstant.SPACE));
            } else {
                searchIndexParamDTO.setSearchContent(HanLpAdvancedUtil.extractKeywords(searchIndexParamDTO.getSearchContent()));
            }
        }
        // 处理自定义区域搜索
        // 从GeoJSON中提取多边形坐标
        if (ObjectUtil.isNotEmpty(searchIndexParamDTO.getGeoJson())) {
            List<List<Double>> polygonCoordinates = GeoJsonUtil.extractPolygonCoordinates(searchIndexParamDTO.getGeoJson());

            // 转换为WKT格式
            String polygonWKT = GeoJsonUtil.convertGeoJsonToWKT(polygonCoordinates);
            searchIndexParamDTO.setPolygonWKT(polygonWKT);
        }
        return searchIndexMapper.getSearchIndexByPage(searchIndexParamDTO.build(), searchIndexParamDTO);
    }

    @Override
    public SearchIndexDTO getSearchIndexById(SearchIndexParamDTO searchIndexParamDTO) {
        // 处理拼音查询
       if (StrUtil.isEmpty(searchIndexParamDTO.getPkId())) {
           throw new CoreException(ErrorCodeEnum.SYS_ERROR, "查询参数缺失");
       }
       // 查找指定ID数据
        SearchIndex searchIndex = searchIndexMapper.selectById(searchIndexParamDTO.getPkId());
       // 构造来源数据查询sql
        String sql = "select * from " + searchIndex.getEntityTable() + " where  " + searchIndex.getEntityKeyColumn() + " = ?";
        // 获取表结构信息
        DataSourceDTO dataSourceDTO = dataSourceService.findById(searchIndex.getEntitySourceId());
        List<Column> columnList  = dynamicDataSourcesService.getDbColumnList(dataSourceDTO, searchIndex.getEntityTable());
        List<Object> jdbcParamValues = new ArrayList<>();
        jdbcParamValues.add(searchIndex.getEntityId());
        List<Map<String, Object>> listInfo = dynamicDataSourcesService.executeListSql(dataSourceDTO, sql, jdbcParamValues);
        SearchIndexDTO searchIndexDTO = new SearchIndexDTO();
        BeanUtilCopy.copyProperties(searchIndex, searchIndexDTO);
        searchIndexDTO.setColumnList(columnList);
        searchIndexDTO.setDetailInfo(CollUtil.isNotEmpty(listInfo)? listInfo.get(0) : null);
        return searchIndexDTO;
    }

    @Override
    public Page<SearchIndexDTO> getSearchIndexDataByPage(SearchIndexParamDTO searchIndexParamDTO) {
        // 处理拼音查询
        if (searchIndexParamDTO.getSearchContent() != null) {
            boolean isPingyin = KeyWordUtils.isContainsPinyin(searchIndexParamDTO.getSearchContent());
            if (isPingyin) {
                searchIndexParamDTO.setSearchContent(KeyWordUtils.getPinyin(HanLpAdvancedUtil.extractKeywords(searchIndexParamDTO.getSearchContent()), CommonConstant.SPACE));
            } else {
                searchIndexParamDTO.setSearchContent(HanLpAdvancedUtil.segmentToString(searchIndexParamDTO.getSearchContent(), Boolean.TRUE));
            }
        }
        // 处理自定义区域搜索
        // 从GeoJSON中提取多边形坐标
        if (ObjectUtil.isNotEmpty(searchIndexParamDTO.getGeoJson())) {
            List<List<Double>> polygonCoordinates = GeoJsonUtil.extractPolygonCoordinates(searchIndexParamDTO.getGeoJson());

            // 转换为WKT格式
            String polygonWKT = GeoJsonUtil.convertGeoJsonToWKT(polygonCoordinates);
            searchIndexParamDTO.setPolygonWKT(polygonWKT);
        }
        if (CollUtil.isEmpty(searchIndexParamDTO.getEntityTypes())) {
            List<String> entityTypes = Arrays.asList("3","4","5","6","7");
            searchIndexParamDTO.setEntityTypes(entityTypes);
        }
        return searchIndexMapper.getSearchIndexByPage(searchIndexParamDTO.build(), searchIndexParamDTO);
    }


    /**
     * 获取河流子集合
     *
     * @param searchIndexParam
     * @return
     */
    @Override
    public String getSmartSearchRiver(SearchPointParamDTO searchIndexParam) {
        if (searchIndexParam.getMaxDistanceKm() == null) {
            searchIndexParam.setMaxDistanceKm(5D);
        }
        return GeoJsonUtil.extractLineSubset(searchIndexParam.getLongitudeStart(), searchIndexParam.getLatitudeStart(), searchIndexParam.getLongitudeEnd(), searchIndexParam.getLatitudeEnd(), searchIndexParam.getMaxDistanceKm());
        //return JSON.toJSONString(LineStringSubsetExtractor.extractSubsetBetweenPoints(searchIndexParam.getLongitudeStart(), searchIndexParam.getLatitudeStart(), searchIndexParam.getLongitudeEnd(), searchIndexParam.getLatitudeEnd(), searchIndexParam.getMaxDistanceKm()));
    }
}

3、数据库表设计

数据库表
DROP TABLE IF EXISTS `ads_search_index_table`;
CREATE TABLE `ads_search_index_table`  (
  `pk_id` int NOT NULL AUTO_INCREMENT COMMENT '主键',
  `entity_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '分类',
  `entity_type_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '分类名称',
  `entity_key_column` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '主键字段',
  `entity_table` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '实体对应的表名',
  `entity_table_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体对应的表名中文',
  `entity_source_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体所在数据源,目前只支持当前库',
  `title_column` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '标题抽入字段,逗号分割',
  `sumary_column` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '摘要信息字段,逗号分割',
  `content_column` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '内容content需要抽入的字段英文逗号分割',
  `keywords_default` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '关键词默认值',
  `keywords_column` varchar(200) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '关键词keywords要抽入的字段英文逗号分割',
  `weight_default` int NULL DEFAULT NULL COMMENT '默认权重',
  `update_flag` varchar(10) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '1' COMMENT '是否更新1是0否',
  `lng_column` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '经度字段',
  `lat_column` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '纬度字段',
  `delete_column` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '逻辑删除字段',
  `region_code_column` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区划编码',
  `file_url_column` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '文件地址字段',
  `deploy_time_column` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据发布时间字段',
  `source_from` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据来源',
  `revision` int NULL DEFAULT 0 COMMENT '乐观锁',
  `created_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '创建人',
  `created_time` datetime NULL DEFAULT NULL COMMENT '创建时间',
  `updated_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '更新人',
  `updated_time` datetime NULL DEFAULT NULL COMMENT '更新时间',
  `deleted` int NULL DEFAULT 0 COMMENT '删除标志:0-未删除;1-已删除',
  PRIMARY KEY (`pk_id`) USING BTREE,
  FULLTEXT INDEX `ft_search`(`title_column`, `keywords_column`) WITH PARSER `ngram`
) ENGINE = InnoDB AUTO_INCREMENT = 14 CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '全局搜索表数据来源信息' ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of ads_search_index_table
-- ----------------------------
INSERT INTO `ads_search_index_table` VALUES (8, '8', '新闻', 'pk_id', 'test_environmental_news', '新闻表', '1', 'news_title', 'news_sumary', 'news_content', '新闻', 'news_title,news_content', 80, '0', '', '', '', 'region_code', 'attachment_url', 'publish_time', '某某单位', 19, NULL, NULL, 'xxxx@qq.com', '2025-05-26 16:22:58', 0);
INSERT INTO `ads_search_index_table` VALUES (9, '9', '数据', 'PK_ID', 'ads_t_app_interface', '某接口表', '1', 'interface_name', 'interface_name', 'interface_url', '某接口', 'interface_name', 60, '0', NULL, NULL, '', NULL, NULL, 'created_time', '某信息中心', 5, NULL, NULL, 'xxxx@qq.com', '2025-05-20 17:08:28', 0);
INSERT INTO `ads_search_index_table` VALUES (10, '10', '文件', 'pk_id', 'test_environmental_news', '新闻表', '1', 'news_title', 'news_sumary', 'news_content', '文件', 'news_title,news_content', 60, '0', '', '', '', 'region_code', 'attachment_url', 'publish_time', '某某单位', 6, NULL, NULL, 'xxxx@qq.com', '2025-05-26 16:22:58', 0);
INSERT INTO `ads_search_index_table` VALUES (11, '11', '地图', 'id', 'test_dust_emission_source', '扬尘源', '1', 'project_name', 'project_name', 'project_name', '地图', 'project_name', 60, '0', 'longitude', 'latitude', '', 'region_code', '', 'create_time', '某某单位', 5, NULL, NULL, 'xxx@qq.com', '2025-05-20 17:08:28', 0);
INSERT INTO `ads_search_index_table` VALUES (12, '12', '应用', 'app_id', 'ads_t_sys_app', '应用', '1', 'name', 'name', 'description', '应用', 'name', 60, '0', '', '', '', '', '', 'created_time', '某某单位', 7, NULL, NULL, 'xxxx@qq.com', '2025-05-26 16:22:58', 0);
INSERT INTO `ads_search_index_table` VALUES (13, '13', '公告', 'pk_id', 'test_environmental_announcement', '公告表', '1', 'title', 'summary', 'content', '公告', 'title,content', 60, '0', '', '', '', 'region_code', 'attachment_url', 'publish_time', '某某单位', 5, NULL, NULL, 'xxxx@qq.com', '2025-05-26 16:22:58', 0);

DROP TABLE IF EXISTS `ads_search_index`;
CREATE TABLE `ads_search_index`  (
  `pk_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '主键',
  `entity_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '分类',
  `entity_type_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '分类名称',
  `entity_key_column` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '主键字段',
  `entity_id` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '对应实体表的主键',
  `entity_table` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体对应的表名',
  `entity_table_name` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体对应的表中文名',
  `entity_source_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '实体所在数据源',
  `title` varchar(300) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '标题',
  `sumary` varchar(500) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '摘要',
  `content` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '内容',
  `keywords` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '关键词或标签',
  `pingyin_keywords` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '拼音分词',
  `weight` int NULL DEFAULT NULL COMMENT '权重或排序',
  `lng` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '经度',
  `lat` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '纬度',
  `region_code` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区划编码',
  `region_name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '行政区划名称',
  `file_url` text CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL COMMENT '文件地址逗号分割',
  `deploy_time` datetime NULL DEFAULT NULL COMMENT '数据来源发布时间',
  `source_from` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '数据来源',
  `revision` int NULL DEFAULT 0 COMMENT '乐观锁',
  `created_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '创建人',
  `created_time` datetime NULL DEFAULT NULL COMMENT '创建时间',
  `updated_by` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NULL DEFAULT NULL COMMENT '更新人',
  `updated_time` datetime NULL DEFAULT NULL COMMENT '更新时间',
  `deleted` int NULL DEFAULT 0 COMMENT '删除标志:0-未删除;1-已删除',
  `location_point` point NULL,
  PRIMARY KEY (`pk_id`) USING BTREE,
  INDEX `idx_search_index`(`entity_type`, `entity_table`, `deleted`) USING BTREE,
  FULLTEXT INDEX `ft_search`(`title`, `content`, `keywords`, `pingyin_keywords`)
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_general_ci COMMENT = '全局搜索表' ROW_FORMAT = DYNAMIC;

4、SpringBatch抽取数据

依赖
 <dependency>
            <groupId>org.springframework.batch</groupId>
            <artifactId>spring-batch-core</artifactId>
            <version>4.3.10</version>
 </dependency>
<dependency>
            <groupId>com.belerweb</groupId>
            <artifactId>pinyin4j</artifactId>
            <version>2.5.1</version>
        </dependency>
        <!--地图-->
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-shapefile</artifactId>
            <version>${geotools.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.geotools</groupId>
                    <artifactId>gt-main</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-main</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-geojson</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-swing</artifactId>
            <version>${geotools.version}</version>
        </dependency>
        <dependency>
            <groupId>org.geotools</groupId>
            <artifactId>gt-epsg-hsql</artifactId>
            <version>${geotools.version}</version> <!-- 与项目中其他GeoTools版本一致 -->
        </dependency>
        <!-- JTS几何库 -->
        <dependency>
            <groupId>org.locationtech.jts</groupId>
            <artifactId>jts-core</artifactId>
            <version>1.18.1</version>
        </dependency>
        <!-- Reactor Netty -->
        <dependency>
            <groupId>io.projectreactor.netty</groupId>
            <artifactId>reactor-netty-http</artifactId>
            <version>1.0.39</version>
        </dependency>
        <!-- IK 分词器-->
        <dependency>
            <groupId>com.janeluo</groupId>
            <artifactId>ikanalyzer</artifactId>
            <version>2012_u6</version>
        </dependency>
        <dependency>
            <groupId>com.hankcs</groupId>
            <artifactId>hanlp</artifactId>
            <version>portable-1.8.4</version>
        </dependency>
分词hanlp.properties配置
# HanLP 根路径
#root=D:/tools/hanlp/
root=/data/hanlp/

# 核心词典
CoreDictionaryPath=data/dictionary/CoreNatureDictionary.txt

# 自定义词典路径
CustomDictionaryPath=data/dictionary/custom/CustomDictionary.txt;data/dictionary/custom/hanyucidianDic.txt;data/dictionary/custom/siteDic.txt

# 断词词典路径
CoreStopWordDictionaryPath=data/dictionary/stopwords.txt

# 分词线程数据
SegmentThreadNumber=4

# 是否显示词性标注信息
ShowTermNature=false

# 日志级别
HanLPLogLevel=WARN
SpringBatch配置
application.yxml

Spring
  batch:
    job:
      enabled: false   #启动时不启动job
    jdbc:
      initialize-schema: always
    #是否定时任务执行
    scheduler:
      enabled: true
  sql:
    init:
      schema-locations: classpath:/org/springframework/batch/core/schema-mysql.sql

// 定时任务
@Component
public class ScheduledBatchTask {

    
    /*
     * 创建索引任务
     */
    @Resource
    @Qualifier("searchIndexCreateTaskJob")
    private Job searchIndexCreateTaskJob;

    @Resource
    JobLauncher jobLauncher;

    @Value("${spring.batch.scheduler.enabled:false}")
    private boolean schedulerEnabled;

    /**
     * 每日03点执行批量更新
     */
    @Scheduled(cron = "0 0 3 * * ?")
    public void searchIndexCreateTask() throws Exception {
        if (schedulerEnabled) {
            JobParameters jobParameters = new JobParametersBuilder().addLong("time", System.currentTimeMillis()).toJobParameters();
            JobExecution run = jobLauncher.run(searchIndexCreateTaskJob, jobParameters);
            run.getId();
        }
    }

}

// 1. 主配置类调整(按表分区)
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private SqlSessionFactory sqlSessionFactory;


    // 线程池配置(核心线程数=表数量)
    @Bean("batchTaskExecutor")
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setThreadNamePrefix("table-processor-");
        return executor;
    }

    /**
     * 更新搜索索引任务
     *
     * @return
     */
    @Bean("searchIndexCreateTaskJob")
    public Job searchIndexCreateTaskJob(
            @Qualifier("deleteOldDataStep") Step deleteOldDataStep,
            @Qualifier("masterSearchStep") Step masterStep
    ) {
        return jobBuilderFactory.get("searchIndexCreateTaskJob")
                .start(deleteOldDataStep)
                .next(masterStep)
                .listener(new BatchSearchJobListener())
                .build();
    }

    @Bean
    public Step deleteOldDataStep(@Qualifier("deleteTasklet") Tasklet deleteTasklet) {
        return stepBuilderFactory.get("deleteOldDataStep")
                .tasklet(deleteTasklet)
                .build();
    }
    /**
     * 索引表更新主任务步骤
     *
     * @return
     */
    @Bean("masterSearchStep")
    public Step masterStep(
            @Qualifier("updateSearchIndexData") Step updateSearchIndexData,
            @Qualifier("multiIndexTablePartitioner") MultiIndexTablePartitioner multiIndexTablePartitioner
    ) {
        return stepBuilderFactory.get("masterSearchStep")
                .partitioner(updateSearchIndexData.getName(), multiIndexTablePartitioner) // 分区器按表名分区一个表一个分区
                .step(updateSearchIndexData)
                .gridSize(10) // 按表分区了 并发数一般设置为核心数
                .taskExecutor(batchSearchTaskExecutor())
                .build();
    }

    // 索引线程池配置(核心线程数=表数量)
    @Bean("batchSearchTaskExecutor")
    public TaskExecutor batchSearchTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setThreadNamePrefix("search-processor-");
        return executor;
    }

    /**
     * 处理分页数据更新步骤
     * @return
     */
    @Bean("updateSearchIndexData")
    public Step updateSearchIndexData(
            @Qualifier("searchTableReader") MyBatisPagingItemReader<Map<String, Object>> myBatisPagingItemReader,
            @Qualifier("batchSearchIndexWriter") BatchSearchIndexWriter batchSearchIndexWriter,
            @Qualifier("searchIndexProcessor") SearchIndexProcessor searchIndexProcessor

    ) {
        return stepBuilderFactory.get("updateSearchIndexData")
                .<Map<String, Object>, Map<String, Object>>chunk(100)
                .reader(myBatisPagingItemReader)
                .processor(searchIndexProcessor)
                .writer(batchSearchIndexWriter)
                .faultTolerant()
                .skipPolicy(new AlwaysSkipItemSkipPolicy())
                .build();
    }
    /**
     * 分页获取需要处理的智搜数据的表数据
     * @return
     */
    @Bean("searchTableReader")
    @StepScope
    public MyBatisPagingItemReader<Map<String, Object>> searchTableReader(
            @Value("#{stepExecutionContext['entityType']}") String entityType, //分类
            @Value("#{stepExecutionContext['entityTypeName']}") String entityTypeName, //分类
            @Value("#{stepExecutionContext['entityKeyColumn']}") String entityKeyColumn,// 主键字段
            @Value("#{stepExecutionContext['entityTable']}") String entityTable,// 表名
            @Value("#{stepExecutionContext['entityTableName']}") String entityTableName,// 表名
            @Value("#{stepExecutionContext['entitySourceId']}") String entitySourceId,// 表数据来源
            @Value("#{stepExecutionContext['titleColumn']}") String titleColumn, // 标题字段
            @Value("#{stepExecutionContext['sumaryColumn']}") String sumaryColumn, // 摘要字段
            @Value("#{stepExecutionContext['deployTimeColumn']}") String deployTimeColumn, // 发布时间字段
            @Value("#{stepExecutionContext['contentColumn']}") String contentColumn, // 内容字段
            @Value("#{stepExecutionContext['sourceFrom']}") String sourceFrom, // 数据来源
            @Value("#{stepExecutionContext['keywordsDefault']}") String keywordsDefault, // 默认关键词
            @Value("#{stepExecutionContext['keywordsColumn']}") String keywordsColumn, // 关键词字段
            @Value("#{stepExecutionContext['fileUrlColumn']}") String fileUrlColumn, // 关键词字段
            @Value("#{stepExecutionContext['weightDefault']}") String weightDefault, // 默认权重
            @Value("#{stepExecutionContext['deleteColumn']}") String deleteColumn, // 逻辑删除字段
            @Value("#{stepExecutionContext['lngColumn']}") String lngColumn, // 逻辑删除字段
            @Value("#{stepExecutionContext['latColumn']}") String latColumn, // 逻辑删除字段
            @Value("#{stepExecutionContext['regionCodeColumn']}") String regionCodeColumn, // 新区区划字段
            @Value("#{stepExecutionContext['updateFlag']}") String updateFlag // 是否需要更新
    ) {

        MyBatisPagingItemReader<Map<String, Object>> reader = new MyBatisPagingItemReader<>();
        reader.setSqlSessionFactory(sqlSessionFactory);
        reader.setQueryId("com.bigdatacd.panorama.system.mapper.SearchIndexTableMapper.selectSearchIndexTableDataByPage");
        Map<String,Object> param = new HashMap<>();
        param.put(CommonConstant.ENTITY_TYPE,entityType);
        param.put(CommonConstant.ENTITY_TYPE_NAME,entityTypeName);
        param.put(CommonConstant.ENTITY_KEY_COLUMN,entityKeyColumn);
        param.put(CommonConstant.ENTITY_TABLE,entityTable);
        param.put(CommonConstant.ENTITY_TABLE_NAME,entityTableName);
        param.put(CommonConstant.SOURCE_FROM,sourceFrom);
        param.put(CommonConstant.DEPLOY_TIME_COLUMN,deployTimeColumn);
        param.put(CommonConstant.FILE_URL_COLUMN,fileUrlColumn);
        param.put(CommonConstant.SUMARY_COLUMN,sumaryColumn);
        param.put(CommonConstant.ENTITY_SOURCE_ID,entitySourceId);
        param.put(CommonConstant.TITLE_COLUMN,titleColumn);
        param.put(CommonConstant.CONTET_COLUMN,contentColumn);
        param.put(CommonConstant.KEYWORDS_DEFAULT,keywordsDefault);
        param.put(CommonConstant.KEYWORDS_COLUMN,keywordsColumn);
        param.put(CommonConstant.WEIGHT_DEFAULT,weightDefault);
        param.put(CommonConstant.UPDATE_FLAG,updateFlag);
        param.put(CommonConstant.DELETE_COLUMN,deleteColumn);
        param.put(CommonConstant.LNG_COLUMN,lngColumn);
        param.put(CommonConstant.LAT_COLUMN,latColumn);
        param.put(CommonConstant.REGION_CODE_COLUMN,regionCodeColumn);
        reader.setParameterValues(param);
        reader.setPageSize(1000);
        return reader;
    }

}

// 过程监听
@Component
public class BatchSearchJobListener implements JobExecutionListener {

    private long beingTime;
    private long endTime;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        beingTime = System.currentTimeMillis();
        System.out.println(jobExecution.getJobInstance().getJobName() + "  beforeJob...... " + beingTime);
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        endTime = System.currentTimeMillis();
        System.out.println(jobExecution.getJobInstance().getJobName() + "一共耗耗时:【" + (endTime - beingTime) + "】毫秒");
    }

}

// 构造智搜数据
@Component("batchSearchIndexWriter")
@StepScope
public class BatchSearchIndexWriter implements ItemWriter<Map<String, Object>> {

    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public void write(List<? extends Map<String, Object>> items) {
        //如果需要更新数据则删除后重新插入
        if (CollUtil.isNotEmpty(items)) {
            try {
                StringBuilder insertSql = new StringBuilder();
                // 构造插入语句
                insertSql.append("insert into ads_search_index(")
                        .append("pk_id,entity_type,entity_type_name,entity_key_column,entity_id,entity_source_id,entity_table,entity_table_name,title,sumary,file_url,source_from,deploy_time, ")
                        .append("content,keywords,pingyin_keywords,weight,lng,lat,region_code,region_name,created_time,updated_time)")
                        .append(" values ")
                        .append("(:pkId,:entityType,:entityTypeName,:entityKeyColumn,:entityId,:entitySourceId,:entityTable,:entityTableName,:title,:sumary,:fileUrl,:sourceFrom,:deployTime, ")
                        .append(":content,:keywords,:pingyinKeywords,:weight,:lng,:lat,:regionCode,:regionName,:createdTime,:updatedTime)");

                jdbcTemplate.batchUpdate(insertSql.toString(), items.stream()
                        .map(item -> new MapSqlParameterSource()
                                .addValue(CommonConstant.PK_ID, UUID.randomUUID().toString().replaceAll("-", CommonConstant.NULL_STR).toUpperCase())
                                .addValue(CommonConstant.ENTITY_TYPE, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TYPE))? item.get(CommonConstant.ENTITY_TYPE) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.ENTITY_TYPE_NAME, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TYPE_NAME))? item.get(CommonConstant.ENTITY_TYPE_NAME) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.ENTITY_KEY_COLUMN, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_KEY_COLUMN))? item.get(CommonConstant.ENTITY_KEY_COLUMN) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.ENTITY_ID, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_ID))? item.get(CommonConstant.ENTITY_ID) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.ENTITY_SOURCE_ID, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_SOURCE_ID))? item.get(CommonConstant.ENTITY_SOURCE_ID) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.ENTITY_TABLE, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TABLE))? item.get(CommonConstant.ENTITY_TABLE) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.ENTITY_TABLE_NAME, ObjectUtil.isNotEmpty(item.get(CommonConstant.ENTITY_TABLE_NAME))? item.get(CommonConstant.ENTITY_TABLE_NAME) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.TITLE, ObjectUtil.isNotEmpty(item.get(CommonConstant.TITLE))? item.get(CommonConstant.TITLE) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.SUMARY, ObjectUtil.isNotEmpty(item.get(CommonConstant.SUMARY))? item.get(CommonConstant.SUMARY) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.FILE_URL, ObjectUtil.isNotEmpty(item.get(CommonConstant.FILE_URL))? item.get(CommonConstant.FILE_URL) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.SOURCE_FROM, ObjectUtil.isNotEmpty(item.get(CommonConstant.SOURCE_FROM))? item.get(CommonConstant.SOURCE_FROM) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.DEPLOY_TIME, ObjectUtil.isNotEmpty(item.get(CommonConstant.DEPLOY_TIME))? item.get(CommonConstant.DEPLOY_TIME) : null)
                                .addValue(CommonConstant.CONTENT, ObjectUtil.isNotEmpty(item.get(CommonConstant.CONTENT))? item.get(CommonConstant.CONTENT) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.KEYWORDS, ObjectUtil.isNotEmpty(item.get(CommonConstant.KEYWORDS))? item.get(CommonConstant.KEYWORDS): CommonConstant.NULL_STR)
                                .addValue(CommonConstant.PINGYIN_KEYWORDS, ObjectUtil.isNotEmpty(item.get(CommonConstant.PINGYIN_KEYWORDS))? item.get(CommonConstant.PINGYIN_KEYWORDS) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.WEIGHT, ObjectUtil.isNotEmpty(item.get(CommonConstant.WEIGHT_DEFAULT))? item.get(CommonConstant.WEIGHT_DEFAULT) : 100)
                                // 判读经纬的范围
                                .addValue(CommonConstant.LNG, ObjectUtil.isNotEmpty(item.get(CommonConstant.LNG))&& LatLonValidator.isLongitudeValid(item.get(CommonConstant.LNG).toString())? item.get(CommonConstant.LNG) : CommonConstant.ZERO)
                                .addValue(CommonConstant.LAT, ObjectUtil.isNotEmpty(item.get(CommonConstant.LAT)) && LatLonValidator.isLatitudeValid(item.get(CommonConstant.LAT).toString())? item.get(CommonConstant.LAT) : CommonConstant.ZERO)
                                .addValue(CommonConstant.REGION_CODE, ObjectUtil.isNotEmpty(item.get(CommonConstant.REGION_CODE))? item.get(CommonConstant.REGION_CODE) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.REGION_NAME, ObjectUtil.isNotEmpty(item.get(CommonConstant.REGION_NAME))? item.get(CommonConstant.REGION_NAME) : CommonConstant.NULL_STR)
                                .addValue(CommonConstant.CREATED_TIME, DateUtil.date())
                                .addValue(CommonConstant.UPDATED_TIME, DateUtil.date())
                        )
                        .toArray(SqlParameterSource[]::new));
            } catch (Exception e) {
                e.printStackTrace();
                throw new CoreException(ErrorCodeEnum.SYS_ERROR, e.getMessage());
            }

        }
    }
}
// 简单处理数据同步问题直接删除再添加数据量不大
@Component("deleteTasklet")
@StepScope
public class DeleteTasklet implements Tasklet {
    @Autowired
    private NamedParameterJdbcTemplate jdbcTemplate;

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
        StringBuilder sqlBuilder = new StringBuilder();
        sqlBuilder.append("SELECT ");
        sqlBuilder.append("pk_id as pkId,");
        sqlBuilder.append("entity_type as entityType,");
        sqlBuilder.append("entity_type_name as entityTypeName,");
        sqlBuilder.append("entity_key_column as entityKeyColumn,");
        sqlBuilder.append("entity_table as entityTable,");
        sqlBuilder.append("entity_source_id as entitySourceId,");
        sqlBuilder.append("title_column as titleColumn,");
        sqlBuilder.append("content_column as contentColumn,");
        sqlBuilder.append("keywords_default as keywordsDefault,");
        sqlBuilder.append("keywords_column as keywordsColumn,");
        sqlBuilder.append("update_flag as updateFlag,");
        sqlBuilder.append("delete_column as deleteColumn,");
        sqlBuilder.append("lng_column as lngColumn,");
        sqlBuilder.append("lat_column as latColumn,");
        sqlBuilder.append("region_code_column as regionCodeColumn,");
        sqlBuilder.append("weight_default as weightDefault");
        sqlBuilder.append(" FROM ads_search_index_table where deleted =0 and update_flag ='1' ");
        List<Map<String, Object>> tables = jdbcTemplate.queryForList(sqlBuilder.toString(), new MapSqlParameterSource());
        String delSql = "update ads_search_index set deleted = '1' where deleted = '0' and entity_table = :entityTable and entity_type = :entityType";

        for (int i = 0; i < tables.size(); i++) {
            // 删除智搜索引数据
            jdbcTemplate.update(delSql, new MapSqlParameterSource().addValue(CommonConstant.ENTITY_TABLE, tables.get(i).get(CommonConstant.ENTITY_TABLE)).addValue(CommonConstant.ENTITY_TYPE, tables.get(i).get(CommonConstant.ENTITY_TYPE)));
        }
        return RepeatStatus.FINISHED;
    }
}

/**
 * 获取需要更新搜素索引的表
 */
@Component
@Slf4j
public class MultiIndexTablePartitioner implements Partitioner {

    private final DataSource dataSource;

    public MultiIndexTablePartitioner(DataSource dataSource) {
        this.dataSource = dataSource;
    }
    @Override
    public Map<String, ExecutionContext> partition(int gridSize) {
        JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
        StringBuilder sqlBuilder = new StringBuilder();
        sqlBuilder.append("SELECT ");
        sqlBuilder.append("pk_id as pkId,");
        sqlBuilder.append("entity_type as entityType,");
        sqlBuilder.append("entity_type_name as entityTypeName,");
        sqlBuilder.append("entity_key_column as entityKeyColumn,");
        sqlBuilder.append("entity_table as entityTable,");
        sqlBuilder.append("entity_table_name as entityTableName,");
        sqlBuilder.append("entity_source_id as entitySourceId,");
        sqlBuilder.append("title_column as titleColumn,");
        sqlBuilder.append("sumary_column as sumaryColumn,");
        sqlBuilder.append("file_url_column as fileUrlColumn,");
        sqlBuilder.append("deploy_time_column as deployTimeColumn,");
        sqlBuilder.append("source_from as sourceFrom,");
        sqlBuilder.append("content_column as contentColumn,");
        sqlBuilder.append("keywords_default as keywordsDefault,");
        sqlBuilder.append("keywords_column as keywordsColumn,");
        sqlBuilder.append("update_flag as updateFlag,");
        sqlBuilder.append("delete_column as deleteColumn,");
        sqlBuilder.append("lng_column as lngColumn,");
        sqlBuilder.append("lat_column as latColumn,");
        sqlBuilder.append("region_code_column as regionCodeColumn,");
        sqlBuilder.append("weight_default as weightDefault");
        sqlBuilder.append(" FROM ads_search_index_table where deleted =0 and update_flag ='1' ");
        List<Map<String,Object>> tables = jdbcTemplate.queryForList(sqlBuilder.toString());
        // 删除智搜索引数据

        Map<String, ExecutionContext> partitions = new HashMap<>();
        for (int i = 0; i < tables.size(); i++) {
            ExecutionContext ctx = new ExecutionContext();
            // 将需要传递的参数放到上下文中,用于动态批量更新的sql用
            ctx.putString(CommonConstant.PK_ID, String.valueOf(tables.get(i).get(CommonConstant.PK_ID)));
            ctx.putString(CommonConstant.ENTITY_TYPE, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TYPE)));
            ctx.putString(CommonConstant.ENTITY_TYPE_NAME, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TYPE_NAME)));
            ctx.putString(CommonConstant.ENTITY_KEY_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_KEY_COLUMN)));
            ctx.putString(CommonConstant.ENTITY_TABLE, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TABLE)));
            ctx.putString(CommonConstant.ENTITY_TABLE_NAME, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_TABLE_NAME)));
            ctx.putString(CommonConstant.ENTITY_SOURCE_ID, String.valueOf(tables.get(i).get(CommonConstant.ENTITY_SOURCE_ID)));
            ctx.putString(CommonConstant.TITLE_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.TITLE_COLUMN)));
            ctx.putString(CommonConstant.SUMARY_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.SUMARY_COLUMN)));
            ctx.putString(CommonConstant.FILE_URL_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.FILE_URL_COLUMN)));
            ctx.putString(CommonConstant.DEPLOY_TIME_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.DEPLOY_TIME_COLUMN)));
            ctx.putString(CommonConstant.SOURCE_FROM, String.valueOf(tables.get(i).get(CommonConstant.SOURCE_FROM)));
            ctx.putString(CommonConstant.CONTET_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.CONTET_COLUMN)));
            ctx.putString(CommonConstant.KEYWORDS_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.KEYWORDS_COLUMN)));
            ctx.putString(CommonConstant.KEYWORDS_DEFAULT, String.valueOf(tables.get(i).get(CommonConstant.KEYWORDS_DEFAULT)));
            ctx.putString(CommonConstant.WEIGHT_DEFAULT, String.valueOf(tables.get(i).get(CommonConstant.WEIGHT_DEFAULT)));
            ctx.putString(CommonConstant.UPDATE_FLAG, String.valueOf(tables.get(i).get(CommonConstant.UPDATE_FLAG)));
            ctx.putString(CommonConstant.DELETE_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.DELETE_COLUMN)));
            ctx.putString(CommonConstant.LNG_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.LNG_COLUMN)));
            ctx.putString(CommonConstant.LAT_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.LAT_COLUMN)));
            ctx.putString(CommonConstant.REGION_CODE_COLUMN, String.valueOf(tables.get(i).get(CommonConstant.REGION_CODE_COLUMN)));
            partitions.put("partition" + i, ctx);
        }
        return partitions;
    }
}
// 处理索引数据分词
@Component("searchIndexProcessor")
@Builder
public class SearchIndexProcessor implements ItemProcessor<Map<String, Object>, Map<String, Object>> {

    @Autowired
    private SysRegionService sysRegionService;

    @Override
    public Map<String, Object> process(Map<String, Object> item) {
        // 标题内容关键字分词
        item.put(CommonConstant.TITLE, item.get(CommonConstant.TITLE).toString());
        item.put(CommonConstant.SUMARY, ObjectUtil.isNotEmpty(item.get(CommonConstant.SUMARY))?item.get(CommonConstant.SUMARY).toString():CommonConstant.NULL_STR);
        item.put(CommonConstant.CONTENT, HanLpAdvancedUtil.segmentToString(item.get(CommonConstant.CONTENT).toString()));
        item.put(CommonConstant.KEYWORDS, HanLpAdvancedUtil.extractKeywords(item.get(CommonConstant.KEYWORDS).toString()));
        item.put(CommonConstant.PINGYIN_KEYWORDS, KeyWordUtils.getPinyin(item.get(CommonConstant.KEYWORDS).toString(), CommonConstant.SPACE));
        // 处理行政区划
        if (ObjectUtil.isNotEmpty(item.get(CommonConstant.REGION_CODE))) {
            // 不满12位的补全12位
            if (item.get(CommonConstant.REGION_CODE).toString().length() < CommonConstant.TWELVE_INT) {
                item.put(CommonConstant.REGION_CODE, StrUtil.padAfter(item.get(CommonConstant.REGION_CODE).toString(), CommonConstant.TWELVE_INT, CommonConstant.ZERO));
            }
            if (ObjectUtil.isNotEmpty(sysRegionService.getRegionNameByCode(item.get(CommonConstant.REGION_CODE).toString()))) {
                item.put(CommonConstant.REGION_NAME, sysRegionService.getRegionNameByCode(item.get(CommonConstant.REGION_CODE).toString()));
            } else {
                item.put(CommonConstant.REGION_CODE, CommonConstant.NULL_STR);
                item.put(CommonConstant.REGION_NAME, CommonConstant.NULL_STR);
            }
        }
        return item;
    }
}

效果图


网站公告

今日签到

点亮在社区的每一天
去签到