订阅类A
package com.hdx.master.listener;
import com.hdx.master.entity.TtPointIndicator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class AMessageListener extends MessageListenerAdapter {
@Autowired
RedisTemplate<String, Object> redisTemplate;
@SneakyThrows
@Override
public void onMessage(Message message, byte[] pattern) {
List<TtPointIndicator> ttPointIndicatorList = (List<TtPointIndicator>) redisTemplate.getValueSerializer().deserialize(message.getBody());
}
}
订阅类B
package com.hdx.master.listener;
import com.hdx.master.entity.TtPointIndicator;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.stereotype.Component;
import java.util.List;
@Slf4j
@Component
public class BMessageListener extends MessageListenerAdapter {
@Autowired
RedisTemplate<String, Object> redisTemplate;
@SneakyThrows
@Override
public void onMessage(Message message, byte[] pattern) {
List<TtPointIndicator> ttPointIndicatorList = (List<TtPointIndicator>) redisTemplate.getValueSerializer().deserialize(message.getBody());
}
}
订阅的主题的配置类
package com.hdx.master.socket.client;
import com.hdx.master.listener.AMessageListener;
import com.hdx.master.listener.BMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class OldDataHandle implements InitializingBean {
@Autowired
private RedisMessageListenerContainer container;
@Autowired
private AMessageListener aMessageListener;
@Autowired
private BMessageListener bMessageListener;
@Override
public void afterPropertiesSet() throws Exception {
subscribe();
}
public void subscribe() {
container.addMessageListener(aMessageListener, new PatternTopic("aTOPIC"));
container.addMessageListener(bMessageListener, new PatternTopic("bTOPIC"));
}
}
redis配置类
package com.hdx.master.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.repository.configuration.EnableRedisRepositories;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@EnableRedisRepositories
public class RedisConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16);
executor.setMaxPoolSize(500);
executor.setQueueCapacity(10000);
executor.setThreadFactory(new CustomizableThreadFactory("RedisPubSub-exec-"));
executor.initialize();
return executor;
}
@Bean
public RedisMessageListenerContainer messageListenerContainer(Executor threadPoolTaskExecutor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
container.setTaskExecutor(threadPoolTaskExecutor);
return container;
}
@Bean
@Primary
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory);
redisTemplate.setKeySerializer(keySerializer());
redisTemplate.setHashKeySerializer(keySerializer());
redisTemplate.setValueSerializer(jacksonSerializer());
redisTemplate.setHashValueSerializer(jacksonSerializer());
redisTemplate.afterPropertiesSet();
return redisTemplate;
}
public RedisSerializer<String> keySerializer() {
return new StringRedisSerializer();
}
private Jackson2JsonRedisSerializer jacksonSerializer() {
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
return jackson2JsonRedisSerializer;
}
}
测试使用mvc接口发送消息进行发布订阅
package com.hdx.master.controller;
import com.hdx.master.common.HttpResult;
import com.hdx.master.utils.RedisUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
@RequestMapping("test")
public class PublishController {
@Autowired
private RedisTemplate<String, Object> redisTemplate;
@PostMapping("/publish")
public HttpResult publish(String message) {
redisTemplate.convertAndSend("aTOPIC", message);
return HttpResult.successMsg("发布成功");
}
}