RabbitMQ应用:利用Rabbitmq做一个日志小框架,实现自动日志入库功能。

发布于:2023-01-04 ⋅ 阅读:(428) ⋅ 点赞:(0)

𝑰’𝒎 𝒉𝒉𝒈, 𝑰 𝒂𝒎 𝒂 𝒈𝒓𝒂𝒅𝒖𝒂𝒕𝒆 𝒔𝒕𝒖𝒅𝒆𝒏𝒕 𝒇𝒓𝒐𝒎 𝑵𝒂𝒏𝒋𝒊𝒏𝒈, 𝑪𝒉𝒊𝒏𝒂.

  • 🏫 𝑺𝒉𝒄𝒐𝒐𝒍: 𝑯𝒐𝒉𝒂𝒊 𝑼𝒏𝒊𝒗𝒆𝒓𝒔𝒊𝒕𝒚
  • 🌱 𝑳𝒆𝒂𝒓𝒏𝒊𝒏𝒈: 𝑰’𝒎 𝒄𝒖𝒓𝒓𝒆𝒏𝒕𝒍𝒚 𝒍𝒆𝒂𝒓𝒏𝒊𝒏𝒈 𝒅𝒆𝒔𝒊𝒈𝒏 𝒑𝒂𝒕𝒕𝒆𝒓𝒏, 𝑳𝒆𝒆𝒕𝒄𝒐𝒅𝒆, 𝒅𝒊𝒔𝒕𝒓𝒊𝒃𝒖𝒕𝒆𝒅 𝒔𝒚𝒔𝒕𝒆𝒎, 𝒎𝒊𝒅𝒅𝒍𝒆𝒘𝒂𝒓𝒆 𝒂𝒏𝒅 𝒔𝒐 𝒐𝒏.
  • 💓 𝑯𝒐𝒘 𝒕𝒐 𝒓𝒆𝒂𝒄𝒉 𝒎𝒆:𝑽𝑿
  • 📚 𝑴𝒚 𝒃𝒍𝒐𝒈: 𝒉𝒕𝒕𝒑𝒔://𝒉𝒉𝒈𝒚𝒚𝒅𝒔.𝒃𝒍𝒐𝒈.𝒄𝒔𝒅𝒏.𝒏𝒆𝒕/
  • 💼 𝑷𝒓𝒐𝒇𝒆𝒔𝒔𝒊𝒐𝒏𝒂𝒍 𝒔𝒌𝒊𝒍𝒍𝒔:𝒎𝒚 𝒅𝒓𝒆𝒂𝒎

前言

  之前一直都说rabbitmq可以用来做日志,我心里的思路就是把对象发送到mq里面,然后再另一个服务里面接受,入库就行,感觉整体流程应该就这样,差不多,于是开始了我的实现。

环境说明:

  • RabbitMQ 3.10.7
  • Erlang 25.0.4
  • Ruoyi 3.4.0 因为我是直接改的若以,这样方便点,有现成的项目,可以直接改。

1 rabbitmq docker部署

1.1 yml文件

root@hhg-Lenovo-Y430P:/home/hhg/docker/rabbitmq_test2# cat docker-compose.yml 
version: '3'
services:
  rabbitmq1:
    image: rabbitmq:management
    container_name: rabbitmq1
    restart: 'no'
    hostname: rabbitmq1
    ports:
      - "5675:5672"
      - "15673:15672"
    volumes:
      - ./data:/var/lib/rabbitmq
    environment:
      - RABBITMQ_DEFAULT_USER=rabbitmq
      - RABBITMQ_DEFAULT_PASS=rabbitmq

docker-compose启动就行,没什么好说的。

2 Springboot中进行测试

  因为最后要模块化,我们先在一个springboot中去测试成功了,再用进去。上来就弄很多模块给你,你会乱的。

2.1 application.yml

server:
  port: 8021
spring:
  #给项目来个名字
  application:
    name: rabbitmq-provider
  #配置rabbitMq 服务器
  rabbitmq:
    host: 192.168.2.151
    port: 5675
    username: rabbitmq
    password: rabbitmq
    #虚拟host 可以不设置,使用server默认host
    virtual-host: /

2.2 Rabbitmq配置

@Configuration
public class RabbitmqConfig implements RabbitListenerConfigurer {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    //自动装配消息监听器所在的容器工厂配置类实例
//    @Autowired
//    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer() {
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用JSON的格式进行传输
        factory.setMessageConverter(producerJackson2MessageConverter());
//        //设置并发消费者实例的初始数量。在这里为1个
//        factory.setConcurrentConsumers(1);
//        //设置并发消费者实例的最大数量。在这里为1个
//        factory.setMaxConcurrentConsumers(1);
//        //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        // 关闭自动应答
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        // 设置批量消费
        factory.setBatchListener(true);
        factory.setBatchSize(3);
        factory.setConsumerBatchEnabled(true);
        return factory;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

    @Bean
    MessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
        return messageHandlerMethodFactory;
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }
    //队列 起名:springDirectQueue
    @Bean
    public Queue springDirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,不然你电脑关机的时候队列就会消失。
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。

        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("springDirectQueue", true);
    }

    //Direct交换机 起名:springDirectExchange
    @Bean
    public DirectExchange TestDirectExchange() {
        return new DirectExchange("springDirectExchange", true, false);
    }

    //用两个队列,将队列和交换机绑定, 并设置用于匹配键:springDirectRouting1, springDirectRouting2,用来实验routingKey的作用
    @Bean
    public Binding bindingDirectQueue1() {
        return BindingBuilder.bind(springDirectQueue()).to(TestDirectExchange()).with("springDirectRouting1");
    }

这些配置的作用:

  1. 保证rabbitmq消费者代码能够直接序列化对象,就是将消息直接封装到你的对象里面。而不是直接操作Message 对象,为什么呢?因为我们如果先将对象序列化放进message的body里面,再从message中序列化弄出来,其实挺麻烦的。
  2. 设置了一个简单的队列和exchange,然后进行了绑定。
  3. 开启了批量消费,因为日志来一个message就处理一条,就往数据库里面插一条数据,那我能不能把几条消息拉到一起,形成一个list呢?然后通过批量插入呢?批量消费就能够实现这样的事情。

2.3 测试代码

2.3.1 消费单条message 自动转成对象

    /**
     * 如果是一条一条消费,这么做是可以的,可以直接封装进对象里面,然后进行手动ACK
     * @param message
     * @param channel
     * @param user
     * @throws Exception
     */
    @RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
    public void processMessage2(Message message, Channel channel, SysUser user) throws Exception {
        System.out.println(user);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

2.3.2 消费多条message转成对象

在这里插入图片描述
官网对于批量消费,给出了这几种方案。

  • the first is called with the raw, unconverted org.springframework.amqp.core.Message s received.
  • the second is called with the org.springframework.messaging.Message<?> s with converted payloads and mapped headers/properties.
  • the third is called with the converted payloads, with no access to headers/properteis.

You can also add a Channel parameter, often used when using MANUAL ack mode. This is not very useful with the third example because you don’t have access to the delivery_tag property.
所以我们都测试一下。

    /**
     * 批量消费, 直接把消费的信息装进list里面,形成usersList,缺点就是,没办法进行手动ACK了
     *
     * @param users
     * @throws Exception
     */
    @RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
    public void processMessage(List<SysUser> users) throws Exception {
//        System.out.println(users);
        for (SysUser user : users) {
            System.out.println(user);
        }
    }
    /**
     * 批量消费,将几条消息用一个list来存储,对list进行遍历,将body提出来,转成list,同时进行手动ACK
     *
     * @param messages
     * @param channel
     * @throws Exception
     */
    @RabbitListener(queues = "springDirectQueue", containerFactory = "singleListenerContainer")
    public void processMessage3(List<Message> messages, Channel channel) throws Exception {
        List<SysUser> collect = messages.stream().map(message -> {
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return JSON.parseObject(new String(message.getBody()), SysUser.class);

        }).collect(Collectors.toList());
        System.out.println(collect);

    }

我的理想型,就是上面的这种,可以同时ACK以及,封装userList,但是需要多几行代码才行。

3 ruoyi部分:通过AOP+RabbitMQ去实现日志记录

3.1 模块说明

模块:

  • ruoyi-common-log
    • RabbitmqLogQueueConfig:把日志需要的队列和exchange的声明放在这个里面。
    • service:在service里面设置发送具体消息的方法,比如发送到哪个exchange里面。
  • ruoyi-common-mq
    • RabbitMQConfig:把通用配置,序列化等,放在这个模块里面,这样可以多个模块公用。
    • ProduceService:利用mq进行send消息的通用方法。供其他模块使用。
  • ruoyi-modules-system
    • OperListener:监听的方法,用来接收消息,并且调用本模块里面的service实现批量入库这个功能。

3.2 ruoyi-common-mq模块代码

  • RabbitmqConfig.java
package com.ruoyi.common.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory;

/**
 * @author xx
 * @date 2021/10/5 16:24
 */
@Configuration
public class RabbitmqConfig implements RabbitListenerConfigurer {
    @Autowired
    private CachingConnectionFactory connectionFactory;
    //自动装配消息监听器所在的容器工厂配置类实例
//    @Autowired
//    private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;

    @Bean(name = "singleListenerContainer")
    public SimpleRabbitListenerContainerFactory listenerContainer() {
        //定义消息监听器所在的容器工厂
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        //设置容器工厂所用的实例
        factory.setConnectionFactory(connectionFactory);
        //设置消息在传输中的格式,在这里采用JSON的格式进行传输
        factory.setMessageConverter(producerJackson2MessageConverter());
//        //设置并发消费者实例的初始数量。在这里为1个
//        factory.setConcurrentConsumers(1);
//        //设置并发消费者实例的最大数量。在这里为1个
//        factory.setMaxConcurrentConsumers(1);
//        //设置并发消费者实例中每个实例拉取的消息数量-在这里为1个
        factory.setPrefetchCount(1);
        // 关闭自动应答
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);

        // 设置批量消费
        factory.setBatchListener(true);
        factory.setBatchSize(3);
        factory.setConsumerBatchEnabled(true);
        return factory;
    }

    @Bean
    public Jackson2JsonMessageConverter producerJackson2MessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(producerJackson2MessageConverter());
        return rabbitTemplate;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
        registrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory());
    }

    @Bean
    MessageHandlerMethodFactory messageHandlerMethodFactory() {
        DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory();
        messageHandlerMethodFactory.setMessageConverter(consumerJackson2MessageConverter());
        return messageHandlerMethodFactory;
    }

    @Bean
    public MappingJackson2MessageConverter consumerJackson2MessageConverter() {
        return new MappingJackson2MessageConverter();
    }
}

ruoyi-common-mq就是一个总体的配置设置,这样其他就可以不用再配了,直接使用。

  • ProduceService.java
package com.ruoyi.common.mq.producer;


import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

@Component
public class ProduceService {

    @Resource
    private RabbitTemplate rabbitTemplate;

    public <T> void send(String exchange, String routingKey, T object) {
//        Map<String, String> msgMap = new HashMap<>();
//        msgMap.put("message", msg);
//        String messageJson = JSONObject.toJSONString(msgMap);
//        Message message = MessageBuilder
//                .withBody(messageJson.getBytes())
//                .setContentType(MessageProperties.CONTENT_TYPE_JSON)
//                .setContentEncoding("utf-8")
//                .setMessageId(UUID.randomUUID() + "")
//                .build();
//        log.info("生产者发送:" + message);
        rabbitTemplate.convertAndSend(exchange, routingKey, object);
    }
}

3.3 ruoyi-common-log

  • LogAspect
package com.ruoyi.common.log.aspect;

import java.util.Collection;
import java.util.Map;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpMethod;
import org.springframework.stereotype.Component;
import org.springframework.validation.BindingResult;
import org.springframework.web.multipart.MultipartFile;
import com.alibaba.fastjson.JSON;
import com.ruoyi.common.core.utils.ServletUtils;
import com.ruoyi.common.core.utils.StringUtils;
import com.ruoyi.common.core.utils.ip.IpUtils;
import com.ruoyi.common.log.annotation.Log;
import com.ruoyi.common.log.enums.BusinessStatus;
import com.ruoyi.common.log.service.AsyncLogService;
import com.ruoyi.common.security.utils.SecurityUtils;
import com.ruoyi.system.api.domain.SysOperLog;

/**
 * 操作日志记录处理
 * 
 * @author ruoyi
 */
@Aspect
@Component
public class LogAspect
{
    private static final Logger log = LoggerFactory.getLogger(LogAspect.class);
    
    @Autowired
    private AsyncLogService asyncLogService;

    /**
     * 处理完请求后执行
     *
     * @param joinPoint 切点
     */
    @AfterReturning(pointcut = "@annotation(controllerLog)", returning = "jsonResult")
    public void doAfterReturning(JoinPoint joinPoint, Log controllerLog, Object jsonResult)
    {
        handleLog(joinPoint, controllerLog, null, jsonResult);
    }

    /**
     * 拦截异常操作
     * 
     * @param joinPoint 切点
     * @param e 异常
     */
    @AfterThrowing(value = "@annotation(controllerLog)", throwing = "e")
    public void doAfterThrowing(JoinPoint joinPoint, Log controllerLog, Exception e)
    {
        handleLog(joinPoint, controllerLog, e, null);
    }

    protected void handleLog(final JoinPoint joinPoint, Log controllerLog, final Exception e, Object jsonResult)
    {
        try
        {
            // *========数据库日志=========*//
            SysOperLog operLog = new SysOperLog();
            operLog.setStatus(BusinessStatus.SUCCESS.ordinal());
            // 请求的地址
            String ip = IpUtils.getIpAddr(ServletUtils.getRequest());
            operLog.setOperIp(ip);
            operLog.setOperUrl(ServletUtils.getRequest().getRequestURI());
            String username = SecurityUtils.getUsername();
            if (StringUtils.isNotBlank(username))
            {
                operLog.setOperName(username);
            }

            if (e != null)
            {
                operLog.setStatus(BusinessStatus.FAIL.ordinal());
                operLog.setErrorMsg(StringUtils.substring(e.getMessage(), 0, 2000));
            }
            // 设置方法名称
            String className = joinPoint.getTarget().getClass().getName();
            String methodName = joinPoint.getSignature().getName();
            operLog.setMethod(className + "." + methodName + "()");
            // 设置请求方式
            operLog.setRequestMethod(ServletUtils.getRequest().getMethod());
            // 处理设置注解上的参数
            getControllerMethodDescription(joinPoint, controllerLog, operLog, jsonResult);
            // 保存数据库
            asyncLogService.saveSysLog(operLog);
        }
        catch (Exception exp)
        {
            // 记录本地异常日志
            log.error("==前置通知异常==");
            log.error("异常信息:{}", exp.getMessage());
            exp.printStackTrace();
        }
    }

    /**
     * 获取注解中对方法的描述信息 用于Controller层注解
     * 
     * @param log 日志
     * @param operLog 操作日志
     * @throws Exception
     */
    public void getControllerMethodDescription(JoinPoint joinPoint, Log log, SysOperLog operLog, Object jsonResult) throws Exception
    {
        // 设置action动作
        operLog.setBusinessType(log.businessType().ordinal());
        // 设置标题
        operLog.setTitle(log.title());
        // 设置操作人类别
        operLog.setOperatorType(log.operatorType().ordinal());
        // 是否需要保存request,参数和值
        if (log.isSaveRequestData())
        {
            // 获取参数的信息,传入到数据库中。
            setRequestValue(joinPoint, operLog);
        }
        // 是否需要保存response,参数和值
        if (log.isSaveResponseData() && StringUtils.isNotNull(jsonResult))
        {
            operLog.setJsonResult(StringUtils.substring(JSON.toJSONString(jsonResult), 0, 2000));
        }
    }

    /**
     * 获取请求的参数,放到log中
     * 
     * @param operLog 操作日志
     * @throws Exception 异常
     */
    private void setRequestValue(JoinPoint joinPoint, SysOperLog operLog) throws Exception
    {
        String requestMethod = operLog.getRequestMethod();
        if (HttpMethod.PUT.name().equals(requestMethod) || HttpMethod.POST.name().equals(requestMethod))
        {
            String params = argsArrayToString(joinPoint.getArgs());
            operLog.setOperParam(StringUtils.substring(params, 0, 2000));
        }
    }

    /**
     * 参数拼装
     */
    private String argsArrayToString(Object[] paramsArray)
    {
        String params = "";
        if (paramsArray != null && paramsArray.length > 0)
        {
            for (Object o : paramsArray)
            {
                if (StringUtils.isNotNull(o) && !isFilterObject(o))
                {
                    try
                    {
                        Object jsonObj = JSON.toJSON(o);
                        params += jsonObj.toString() + " ";
                    }
                    catch (Exception e)
                    {
                    }
                }
            }
        }
        return params.trim();
    }

    /**
     * 判断是否需要过滤的对象。
     * 
     * @param o 对象信息。
     * @return 如果是需要过滤的对象,则返回true;否则返回false。
     */
    @SuppressWarnings("rawtypes")
    public boolean isFilterObject(final Object o)
    {
        Class<?> clazz = o.getClass();
        if (clazz.isArray())
        {
            return clazz.getComponentType().isAssignableFrom(MultipartFile.class);
        }
        else if (Collection.class.isAssignableFrom(clazz))
        {
            Collection collection = (Collection) o;
            for (Object value : collection)
            {
                return value instanceof MultipartFile;
            }
        }
        else if (Map.class.isAssignableFrom(clazz))
        {
            Map map = (Map) o;
            for (Object value : map.entrySet())
            {
                Map.Entry entry = (Map.Entry) value;
                return entry.getValue() instanceof MultipartFile;
            }
        }
        return o instanceof MultipartFile || o instanceof HttpServletRequest || o instanceof HttpServletResponse
                || o instanceof BindingResult;
    }
}

  它这个AOP作用就在于@Log这个注解,只要遇到它就去执行,然后提取一些信息,通过log里面的代码去入库。

  • RabbitmqLogQueueConfig.java
package com.ruoyi.common.log.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author xx
 * @date 2021/10/5 16:24
 */
@Configuration
public class RabbitmqLogQueueConfig {

    //队列 起名:springDirectQueue
    @Bean
    public Queue springDirectQueue() {
        // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,不然你电脑关机的时候队列就会消失。
        // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
        // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。

        //一般设置一下队列的持久化就好,其余两个就是默认false
        return new Queue("springOperLogQueue", true);
    }

    //Direct交换机 起名:springDirectExchange
    @Bean
    public DirectExchange TestDirectExchange() {
        return new DirectExchange("springOperLogExchange", true, false);
    }

    //用两个队列,将队列和交换机绑定, 并设置用于匹配键:springDirectRouting1, springDirectRouting2,用来实验routingKey的作用
    @Bean
    public Binding bindingDirectQueue1() {
        return BindingBuilder.bind(springDirectQueue()).to(TestDirectExchange()).with("springOperLog");
    }

}

把要用到的队列什么的配置好。

  • AsyncLogService.java
package com.ruoyi.common.log.service;

import com.ruoyi.common.mq.producer.ProduceService;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import com.ruoyi.common.core.constant.SecurityConstants;
import com.ruoyi.system.api.RemoteLogService;
import com.ruoyi.system.api.domain.SysOperLog;

import java.util.List;

/**
 * 异步调用日志服务
 *
 * @author ruoyi
 */
@Service
public class AsyncLogService {
    @Autowired
    private RemoteLogService remoteLogService;
    @Autowired
    private ProduceService produceService;

    /**
     * 保存系统日志记录
     */
    @Async
    public void saveSysLog(SysOperLog sysOperLog) {
//        remoteLogService.saveLog(sysOperLog, SecurityConstants.INNER);
        saveSysLogList(sysOperLog);
    }

    public void saveSysLogList(SysOperLog sysOperLog) {
        produceService.send("springOperLogExchange", "springOperLog", sysOperLog);
    }
}

  之前用的remoteService,也就是openfeign,我们这里就不用了,因为我们借助了消息队列,不需要远程服务了。所以我们改一下,换成向mq中发送消息的方式。

3.4 ruoyi-modules-system消费端

  • OperListener
package com.ruoyi.system.rabbitmqListener;

import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.ruoyi.system.api.domain.SysOperLog;
import com.ruoyi.system.api.domain.SysUser;
import com.ruoyi.system.service.ISysOperLogService;
import org.apache.logging.log4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

/**
 * 利用rabbitmq实现异步log处理
 */
@Component
public class OperListener {


    @Autowired
    private ISysOperLogService sysOperLogService;


    @RabbitListener(queues = "springOperLogQueue", containerFactory = "singleListenerContainer")
    @Transactional(rollbackFor = Exception.class)
    public void processMessage3(List<Message> messages, Channel channel) throws Exception {
        System.out.println("开始消费");
        List<SysOperLog> collect = messages.stream().map(message -> {
            try {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return JSON.parseObject(new String(message.getBody()), SysOperLog.class);
        }).collect(Collectors.toList());
        sysOperLogService.inserOperlogList(collect);
    }
}

利用之前springboot中的测试代码,我们这里直接用起来,然后入库就行了。

4 结语

  到这里就结束了,这个方法是可行的,然后代码我都测试过,没有问题,最后可以入库的,算是RabbitMQ的第一次应用吧,这里成功实现了日志入库的功能。

本文含有隐藏内容,请 开通VIP 后查看

网站公告

今日签到

点亮在社区的每一天
去签到