𝑰’𝒎 𝒉𝒉𝒈, 𝑰 𝒂𝒎 𝒂 𝒈𝒓𝒂𝒅𝒖𝒂𝒕𝒆 𝒔𝒕𝒖𝒅𝒆𝒏𝒕 𝒇𝒓𝒐𝒎 𝑵𝒂𝒏𝒋𝒊𝒏𝒈, 𝑪𝒉𝒊𝒏𝒂.
- 🏫 𝑺𝒉𝒄𝒐𝒐𝒍: 𝑯𝒐𝒉𝒂𝒊 𝑼𝒏𝒊𝒗𝒆𝒓𝒔𝒊𝒕𝒚
- 🌱 𝑳𝒆𝒂𝒓𝒏𝒊𝒏𝒈: 𝑰’𝒎 𝒄𝒖𝒓𝒓𝒆𝒏𝒕𝒍𝒚 𝒍𝒆𝒂𝒓𝒏𝒊𝒏𝒈 𝒅𝒆𝒔𝒊𝒈𝒏 𝒑𝒂𝒕𝒕𝒆𝒓𝒏, 𝑳𝒆𝒆𝒕𝒄𝒐𝒅𝒆, 𝒅𝒊𝒔𝒕𝒓𝒊𝒃𝒖𝒕𝒆𝒅 𝒔𝒚𝒔𝒕𝒆𝒎, 𝒎𝒊𝒅𝒅𝒍𝒆𝒘𝒂𝒓𝒆 𝒂𝒏𝒅 𝒔𝒐 𝒐𝒏.
- 💓 𝑯𝒐𝒘 𝒕𝒐 𝒓𝒆𝒂𝒄𝒉 𝒎𝒆:𝑽𝑿
- 📚 𝑴𝒚 𝒃𝒍𝒐𝒈: 𝒉𝒕𝒕𝒑𝒔://𝒉𝒉𝒈𝒚𝒚𝒅𝒔.𝒃𝒍𝒐𝒈.𝒄𝒔𝒅𝒏.𝒏𝒆𝒕/
- 💼 𝑷𝒓𝒐𝒇𝒆𝒔𝒔𝒊𝒐𝒏𝒂𝒍 𝒔𝒌𝒊𝒍𝒍𝒔:𝒎𝒚 𝒅𝒓𝒆𝒂𝒎
前言
之前一直都说rabbitmq可以用来做日志,我心里的思路就是把对象发送到mq里面,然后再另一个服务里面接受,入库就行,感觉整体流程应该就这样,差不多,于是开始了我的实现。
环境说明:
- RabbitMQ 3.10.7
- Erlang 25.0.4
- Ruoyi 3.4.0 因为我是直接改的若以,这样方便点,有现成的项目,可以直接改。
1 rabbitmq docker部署
1.1 yml文件
root@hhg-Lenovo-Y430P:/home/hhg/docker/rabbitmq_test2# cat docker-compose.yml
version: '3'
services:
rabbitmq1:
image: rabbitmq:management
container_name: rabbitmq1
restart: 'no'
hostname: rabbitmq1
ports:
- "5675:5672"
- "15673:15672"
volumes:
- ./data:/var/lib/rabbitmq
environment:
- RABBITMQ_DEFAULT_USER=rabbitmq
- RABBITMQ_DEFAULT_PASS=rabbitmq
docker-compose启动就行,没什么好说的。
2 Springboot中进行测试
因为最后要模块化,我们先在一个springboot中去测试成功了,再用进去。上来就弄很多模块给你,你会乱的。
2.1 application.yml
server:
port: 8021
spring:
#给项目来个名字
application:
name: rabbitmq-provider
#配置rabbitMq 服务器
rabbitmq:
host: 192.168.2.151
port: 5675
username: rabbitmq
password: rabbitmq
#虚拟host 可以不设置,使用server默认host
virtual-host: /
2.2 Rabbitmq配置
@Configuration
public class RabbitmqConfig implements RabbitListenerConfigurer {
@Autowired
private CachingConnectionFactory connectionFactory;
//自动装配消息监听器所在的容器工厂配置类实例
// @Autowired
// private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer() {
//定义消息监听器所在的容器工厂
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置容器工厂所用的实例
factory.setConnectionFactory(connectionFactory);
//设置消息在传输中的格式,在这里采用JSON的格式进行传输
factory.setMessageConverter(producerJackson2MessageConverter());
// //设置并发消费者实例的初始数量。在这里为1个
// factory.setConcurrentConsumers(1);
// //设置并发消费者实例的最大数量。在这里为1个
// factory.setMaxConcurrentConsumers(1);
// //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
factory.setPrefetchCount(1);
// 关闭自动应答
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置批量消费
factory.setBatchListener(true);
factory.setBatchSize(3);
factory.setConsumerBatchEnabled(true);
return factory;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
@Bean
MessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
return messageHandlerMethodFactory;
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
//队列 起名:springDirectQueue
@Bean
public Queue springDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,不然你电脑关机的时候队列就会消失。
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("springDirectQueue", true);
}
//Direct交换机 起名:springDirectExchange
@Bean
public DirectExchange TestDirectExchange() {
return new DirectExchange("springDirectExchange", true, false);
}
//用两个队列,将队列和交换机绑定, 并设置用于匹配键:springDirectRouting1, springDirectRouting2,用来实验routingKey的作用
@Bean
public Binding bindingDirectQueue1() {
return BindingBuilder.bind(springDirectQueue()).to(TestDirectExchange()).with("springDirectRouting1");
}
这些配置的作用:
- 保证rabbitmq消费者代码能够直接序列化对象,就是将消息直接封装到你的对象里面。而不是直接操作Message 对象,为什么呢?因为我们如果先将对象序列化放进message的body里面,再从message中序列化弄出来,其实挺麻烦的。
- 设置了一个简单的队列和exchange,然后进行了绑定。
- 开启了批量消费,因为日志来一个message就处理一条,就往数据库里面插一条数据,那我能不能把几条消息拉到一起,形成一个list呢?然后通过批量插入呢?批量消费就能够实现这样的事情。
2.3 测试代码
2.3.1 消费单条message 自动转成对象
/**
* 如果是一条一条消费,这么做是可以的,可以直接封装进对象里面,然后进行手动ACK
* @param message
* @param channel
* @param user
* @throws Exception
*/
@RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
public void processMessage2(Message message, Channel channel, SysUser user) throws Exception {
System.out.println(user);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
2.3.2 消费多条message转成对象
官网对于批量消费,给出了这几种方案。
- the first is called with the raw, unconverted org.springframework.amqp.core.Message s received.
- the second is called with the org.springframework.messaging.Message<?> s with converted payloads and mapped headers/properties.
- the third is called with the converted payloads, with no access to headers/properteis.
You can also add a Channel parameter, often used when using MANUAL ack mode. This is not very useful with the third example because you don’t have access to the delivery_tag property.
所以我们都测试一下。
/**
* 批量消费, 直接把消费的信息装进list里面,形成usersList,缺点就是,没办法进行手动ACK了
*
* @param users
* @throws Exception
*/
@RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
public void processMessage(List<SysUser> users) throws Exception {
// System.out.println(users);
for (SysUser user : users) {
System.out.println(user);
}
}
/**
* 批量消费,将几条消息用一个list来存储,对list进行遍历,将body提出来,转成list,同时进行手动ACK
*
* @param messages
* @param channel
* @throws Exception
*/
@RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
public void processMessage3(List<Message> messages, Channel channel) throws Exception {
List<SysUser> collect = messages.stream().map(message -> {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
return JSON.parseObject(new String(message.getBody()), SysUser.class);
}).collect(Collectors.toList());
System.out.println(collect);
}
我的理想型,就是上面的这种,可以同时ACK以及,封装userList,但是需要多几行代码才行。
3 ruoyi部分:通过AOP+RabbitMQ去实现日志记录
3.1 模块说明
模块:
- ruoyi-common-log
- RabbitmqLogQueueConfig:把日志需要的队列和exchange的声明放在这个里面。
- service:在service里面设置发送具体消息的方法,比如发送到哪个exchange里面。
- ruoyi-common-mq
- RabbitMQConfig:把通用配置,序列化等,放在这个模块里面,这样可以多个模块公用。
- ProduceService:利用mq进行send消息的通用方法。供其他模块使用。
- ruoyi-modules-system
- OperListener:监听的方法,用来接收消息,并且调用本模块里面的service实现批量入库这个功能。
3.2 ruoyi-common-mq模块代码
- RabbitmqConfig.java
package com.ruoyi.common.mq.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;
/**
* @author xx
* @date 2021/10/5 16:24
*/
@Configuration
public class RabbitmqConfig implements RabbitListenerConfigurer {
@Autowired
private CachingConnectionFactory connectionFactory;
//自动装配消息监听器所在的容器工厂配置类实例
// @Autowired
// private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
@Bean(name = "singleListenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer() {
//定义消息监听器所在的容器工厂
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//设置容器工厂所用的实例
factory.setConnectionFactory(connectionFactory);
//设置消息在传输中的格式,在这里采用JSON的格式进行传输
factory.setMessageConverter(producerJackson2MessageConverter());
// //设置并发消费者实例的初始数量。在这里为1个
// factory.setConcurrentConsumers(1);
// //设置并发消费者实例的最大数量。在这里为1个
// factory.setMaxConcurrentConsumers(1);
// //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
factory.setPrefetchCount(1);
// 关闭自动应答
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
// 设置批量消费
factory.setBatchListener(true);
factory.setBatchSize(3);
factory.setConsumerBatchEnabled(true);
return factory;
}
@Bean
public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
return rabbitTemplate;
}
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
}
@Bean
MessageHandlerMethodFactory messageHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
return messageHandlerMethodFactory;
}
@Bean
public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
return new MappingJackson2MessageConverter();
}
}
ruoyi-common-mq就是一个总体的配置设置,这样其他就可以不用再配了,直接使用。
- ProduceService.java
package com.ruoyi.common.mq.producer;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class ProduceService {
@Resource
private RabbitTemplate rabbitTemplate;
public <T> void send(String exchange, String routingKey, T object) {
// Map<String, String> msgMap = new HashMap<>();
// msgMap.put("message", msg);
// String messageJson = JSONObject.toJSONString(msgMap);
// Message message = MessageBuilder
// .withBody(messageJson.getBytes())
// .setContentType(MessageProperties.CONTENT_TYPE_JSON)
// .setContentEncoding("utf-8")
// .setMessageId(UUID.randomUUID() + "")
// .build();
// log.info("生产者发送:" + message);
rabbitTemplate.convertAndSend(exchange, routingKey, object);
}
}
3.3 ruoyi-common-log
- LogAspect
package com.ruoyi.common.log.aspect;
import java.util.Collection;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindingResult;
import org.springframework.web.multipart.MultipartFile;
import com.alibaba.fastjson.JSON;
import com.ruoyi.common.core.utils.ServletUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.core.utils.ip.IpUtils;
import com.ruoyi.common.log.annotation.Log;
import com.ruoyi.common.log.enums.BusinessStatus;
import com.ruoyi.common.log.service.AsyncLogService;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.system.api.domain.SysOperLog;
/**
* 操作日志记录处理
*
* @author ruoyi
*/
@Aspect
@Component
public class LogAspect
{
private static final Logger log = LoggerFactory.getLogger(LogAspect.class);
@Autowired
private AsyncLogService asyncLogService;
/**
* 处理完请求后执行
*
* @param joinPoint 切点
*/
@AfterReturning(pointcut = "@annotation(controllerLog)", returning = "jsonResult")
public void doAfterReturning(JoinPoint joinPoint, Log controllerLog, Object jsonResult)
{
handleLog(joinPoint, controllerLog, null, jsonResult);
}
/**
* 拦截异常操作
*
* @param joinPoint 切点
* @param e 异常
*/
@AfterThrowing(value = "@annotation(controllerLog)", throwing = "e")
public void doAfterThrowing(JoinPoint joinPoint, Log controllerLog, Exception e)
{
handleLog(joinPoint, controllerLog, e, null);
}
protected void handleLog(final JoinPoint joinPoint, Log controllerLog, final Exception e, Object jsonResult)
{
try
{
// *========数据库日志=========*//
SysOperLog operLog = new SysOperLog();
operLog.setStatus(BusinessStatus.SUCCESS.ordinal());
// 请求的地址
String ip = IpUtils.getIpAddr(ServletUtils.getRequest());
operLog.setOperIp(ip);
operLog.setOperUrl(ServletUtils.getRequest().getRequestURI());
String username = SecurityUtils.getUsername();
if (StringUtils.isNotBlank(username))
{
operLog.setOperName(username);
}
if (e != null)
{
operLog.setStatus(BusinessStatus.FAIL.ordinal());
operLog.setErrorMsg(StringUtils.substring(e.getMessage(), 0, 2000));
}
// 设置方法名称
String className = joinPoint.getTarget().getClass().getName();
String methodName = joinPoint.getSignature().getName();
operLog.setMethod(className + "." + methodName + "()");
// 设置请求方式
operLog.setRequestMethod(ServletUtils.getRequest().getMethod());
// 处理设置注解上的参数
getControllerMethodDescription(joinPoint, controllerLog, operLog, jsonResult);
// 保存数据库
asyncLogService.saveSysLog(operLog);
}
catch (Exception exp)
{
// 记录本地异常日志
log.error("==前置通知异常==");
log.error("异常信息:{}", exp.getMessage());
exp.printStackTrace();
}
}
/**
* 获取注解中对方法的描述信息 用于Controller层注解
*
* @param log 日志
* @param operLog 操作日志
* @throws Exception
*/
public void getControllerMethodDescription(JoinPoint joinPoint, Log log, SysOperLog operLog, Object jsonResult) throws Exception
{
// 设置action动作
operLog.setBusinessType(log.businessType().ordinal());
// 设置标题
operLog.setTitle(log.title());
// 设置操作人类别
operLog.setOperatorType(log.operatorType().ordinal());
// 是否需要保存request,参数和值
if (log.isSaveRequestData())
{
// 获取参数的信息,传入到数据库中。
setRequestValue(joinPoint, operLog);
}
// 是否需要保存response,参数和值
if (log.isSaveResponseData() && StringUtils.isNotNull(jsonResult))
{
operLog.setJsonResult(StringUtils.substring(JSON.toJSONString(jsonResult), 0, 2000));
}
}
/**
* 获取请求的参数,放到log中
*
* @param operLog 操作日志
* @throws Exception 异常
*/
private void setRequestValue(JoinPoint joinPoint, SysOperLog operLog) throws Exception
{
String requestMethod = operLog.getRequestMethod();
if (HttpMethod.PUT.name().equals(requestMethod) || HttpMethod.POST.name().equals(requestMethod))
{
String params = argsArrayToString(joinPoint.getArgs());
operLog.setOperParam(StringUtils.substring(params, 0, 2000));
}
}
/**
* 参数拼装
*/
private String argsArrayToString(Object[] paramsArray)
{
String params = "";
if (paramsArray != null && paramsArray.length > 0)
{
for (Object o : paramsArray)
{
if (StringUtils.isNotNull(o) && !isFilterObject(o))
{
try
{
Object jsonObj = JSON.toJSON(o);
params += jsonObj.toString() + " ";
}
catch (Exception e)
{
}
}
}
}
return params.trim();
}
/**
* 判断是否需要过滤的对象。
*
* @param o 对象信息。
* @return 如果是需要过滤的对象,则返回true;否则返回false。
*/
@SuppressWarnings("rawtypes")
public boolean isFilterObject(final Object o)
{
Class<?> clazz = o.getClass();
if (clazz.isArray())
{
return clazz.getComponentType().isAssignableFrom(MultipartFile.class);
}
else if (Collection.class.isAssignableFrom(clazz))
{
Collection collection = (Collection) o;
for (Object value : collection)
{
return value instanceof MultipartFile;
}
}
else if (Map.class.isAssignableFrom(clazz))
{
Map map = (Map) o;
for (Object value : map.entrySet())
{
Map.Entry entry = (Map.Entry) value;
return entry.getValue() instanceof MultipartFile;
}
}
return o instanceof MultipartFile || o instanceof HttpServletRequest || o instanceof HttpServletResponse
|| o instanceof BindingResult;
}
}
它这个AOP作用就在于@Log这个注解,只要遇到它就去执行,然后提取一些信息,通过log里面的代码去入库。
- RabbitmqLogQueueConfig.java
package com.ruoyi.common.log.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author xx
* @date 2021/10/5 16:24
*/
@Configuration
public class RabbitmqLogQueueConfig {
//队列 起名:springDirectQueue
@Bean
public Queue springDirectQueue() {
// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,不然你电脑关机的时候队列就会消失。
// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("springOperLogQueue", true);
}
//Direct交换机 起名:springDirectExchange
@Bean
public DirectExchange TestDirectExchange() {
return new DirectExchange("springOperLogExchange", true, false);
}
//用两个队列,将队列和交换机绑定, 并设置用于匹配键:springDirectRouting1, springDirectRouting2,用来实验routingKey的作用
@Bean
public Binding bindingDirectQueue1() {
return BindingBuilder.bind(springDirectQueue()).to(TestDirectExchange()).with("springOperLog");
}
}
把要用到的队列什么的配置好。
- AsyncLogService.java
package com.ruoyi.common.log.service;
import com.ruoyi.common.mq.producer.ProduceService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.ruoyi.common.core.constant.SecurityConstants;
import com.ruoyi.system.api.RemoteLogService;
import com.ruoyi.system.api.domain.SysOperLog;
import java.util.List;
/**
* 异步调用日志服务
*
* @author ruoyi
*/
@Service
public class AsyncLogService {
@Autowired
private RemoteLogService remoteLogService;
@Autowired
private ProduceService produceService;
/**
* 保存系统日志记录
*/
@Async
public void saveSysLog(SysOperLog sysOperLog) {
// remoteLogService.saveLog(sysOperLog, SecurityConstants.INNER);
saveSysLogList(sysOperLog);
}
public void saveSysLogList(SysOperLog sysOperLog) {
produceService.send("springOperLogExchange", "springOperLog", sysOperLog);
}
}
之前用的remoteService,也就是openfeign,我们这里就不用了,因为我们借助了消息队列,不需要远程服务了。所以我们改一下,换成向mq中发送消息的方式。
3.4 ruoyi-modules-system消费端
- OperListener
package com.ruoyi.system.rabbitmqListener;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.ruoyi.system.api.domain.SysOperLog;
import com.ruoyi.system.api.domain.SysUser;
import com.ruoyi.system.service.ISysOperLogService;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
/**
* 利用rabbitmq实现异步log处理
*/
@Component
public class OperListener {
@Autowired
private ISysOperLogService sysOperLogService;
@RabbitListener(queues = "springOperLogQueue", containerFactory = "singleListenerContainer")
@Transactional(rollbackFor = Exception.class)
public void processMessage3(List<Message> messages, Channel channel) throws Exception {
System.out.println("开始消费");
List<SysOperLog> collect = messages.stream().map(message -> {
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
e.printStackTrace();
}
return JSON.parseObject(new String(message.getBody()), SysOperLog.class);
}).collect(Collectors.toList());
sysOperLogService.inserOperlogList(collect);
}
}
利用之前springboot中的测试代码,我们这里直接用起来,然后入库就行了。
4 结语
到这里就结束了,这个方法是可行的,然后代码我都测试过,没有问题,最后可以入库的,算是RabbitMQ的第一次应用吧,这里成功实现了日志入库的功能。