生命无罪,健康万岁,我是laity。
我曾七次鄙视自己的灵魂:
第一次,当它本可进取时,却故作谦卑;
第二次,当它在空虚时,用爱欲来填充;
第三次,在困难和容易之间,它选择了容易;
第四次,它犯了错,却借由别人也会犯错来宽慰自己;
第五次,它自由软弱,却把它认为是生命的坚韧;
第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;
第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。
基于Kafka实现动态监听topic功能
业务场景:导条根据各家接口进行数据分发其中包含动态kafka-topic,各家通过监听topic实现获取数据从而实现后续业务。
实现逻辑
pom
yaml 方案1 接收的是String
kafka:
bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
listener:
type: batch
consumer:
enable-auto-commit: false
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
group-id: consumer-sb
producer:
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
yaml 方案2 接收的是Byte
kafka:
bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
listener:
type: batch
consumer:
enable-auto-commit: false
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
key-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
auto-offset-reset: earliest
group-id: consumer-sb
producer:
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
收消息CODE
KafkaConfig.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import java.util.HashMap;
import java.util.Map;
/**
* @author laity
*/
@EnableKafka
@Configuration
public class KafkaConfig {
// 解决 Could not create message listener - MessageHandlerMethodFactory not set TODO:WWS 不好使
/*@Bean
public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationBeanPostProcessor() {
KafkaListenerAnnotationBeanPostProcessor processor = new KafkaListenerAnnotationBeanPostProcessor();
processor.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
return processor;
}*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> map = new HashMap<>();
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "youKafkaIp:9092");
map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-laity");
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<String, String>(map);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(5);
// new DefaultMessageHandlerMethodFactory()
return factory;
}
// implements KafkaListenerConfigurer + 解决 Could not create message listener - MessageHandlerMethodFactory not set
/*@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
}*/
}
KafkaListenerController.java
package cn.iocoder.yudao.server.controller.admin.szbl;
import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.server.controller.admin.szbl.common.config.kafka.MyComponent;
import cn.iocoder.yudao.server.controller.admin.szbl.vo.InitSceneVO;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.security.PermitAll;
/**
* @author laity
*/
@RestController
@RequestMapping("/kafka")
public class KafkaListenerController {
private final MyComponent component;
public KafkaListenerController(MyComponent component) {
this.component = component;
}
private String topic;
// 用于接收导条分发数据接口
@PostMapping("/reception")
@PermitAll
public CommonResult<Boolean> putAwayL(@RequestBody InitSceneVO vo) {
// …… 业务逻辑
// 去执行 监听固定的topic
component.startListening(vo.getGzTopicName());
return CommonResult.success(true);
}
}
DynamicKafkaListenerService.java
import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;
import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.Objects;
/**
* @author laity 动态管理Kafka监听器
*/
@Service
public class DynamicKafkaListenerService {
private final KafkaListenerEndpointRegistry registry;
private final ConcurrentKafkaListenerContainerFactory<String, String> factory;
@Autowired
public DynamicKafkaListenerService(KafkaListenerEndpointRegistry registry, ConcurrentKafkaListenerContainerFactory<String, String> factory) {
this.registry = registry;
this.factory = factory;
}
public void addListener(String topic, String groupId, Object bean, Method method) {
if (AopUtils.isAopProxy(bean)) {
try {
bean = ((Advised) bean).getTargetSource().getTarget();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
assert bean != null;
endpoint.setBean(bean);
endpoint.setMethod(method);
endpoint.setTopics(topic);
endpoint.setGroup(groupId);
endpoint.setId(method.getName() + "_" + LocalDateTime.now());
endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); // 之前怎么点都点不出来这个属性 突然又出来了……无语
registry.registerListenerContainer(endpoint, factory, true); // 指定容器工厂
}
public void removeListener(String beanName) {
// 断言
Objects.requireNonNull(registry.getListenerContainer(beanName)).stop();
registry.unregisterListenerContainer(beanName);
}
}
BlueKafkaConsumer.java
import org.springframework.stereotype.Component;
/**
* @author laity
*/
@Component
public class BlueKafkaConsumer {
// @KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "consumer-laity")
public void listen(Object record) {
System.out.println("======================= 接收动态KafkaTopics Received message ========================");
System.out.println(record.toString());
}
}
MyComponent.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.lang.reflect.Method;
/**
* @author laity
*/
@Component
public class MyComponent {
private final DynamicKafkaListenerService kafkaListenerService;
private final BlueKafkaConsumer blueKafkaConsumer;
@Autowired
public MyComponent(DynamicKafkaListenerService kafkaListenerService, BlueKafkaConsumer blueKafkaConsumer) {
this.kafkaListenerService = kafkaListenerService;
this.blueKafkaConsumer = blueKafkaConsumer;
}
public void startListening(String topic) {
try {
Method blueMethod = BlueKafkaConsumer.class.getMethod("listen", Object.class);
kafkaListenerService.addListener(topic, "consumer-laity", blueKafkaConsumer, blueMethod);
} catch (NoSuchMethodException e) {
throw new RuntimeException(e);
}
}
public void stopListening(String beanName) {
kafkaListenerService.removeListener(beanName);
}
// init
@PostConstruct // 这个是服务启动时调用 但我想要的时实时可变的
public void init() {
}
}
世界上最可贵的两个词,一个叫认真,一个叫坚持,认真的人改变自己,坚持的人改变命运,有些事情不是看到了希望才去坚持,而是坚持了才有希望。我是Laity,正在前进的Laity。