001 rabbitmq减库存demo direct

发布于:2024-04-25 ⋅ 阅读:(33) ⋅ 点赞:(0)

Producer

这段代码定义了一个名为Producer的RESTful Web服务Controller,它提供了一个HTTP GET接口/direct/sendMsg,用于发送消息到RabbitMQ的交换机。当该接口被调用时,它会发送一个固定的消息字符串到名为myDirectExchangeAAA的交换机,并使用keyAAA作为RoutingKey。发送成功后,它会返回一个表示成功的字符串。


// 声明一个包名,用于组织和管理Java类。  
package com.example.direct;  
  
// 导入Spring框架中RabbitMQ相关的RabbitTemplate类,它提供了发送和接收消息的方法。  
import org.springframework.amqp.rabbit.core.RabbitTemplate;  
  
// 导入Spring框架的自动装配注解,用于自动注入依赖。  
import org.springframework.beans.factory.annotation.Autowired;  
  
// 导入Spring Web模块的注解,用于映射HTTP GET请求到特定的处理方法。  
import org.springframework.web.bind.annotation.GetMapping;  
  
// 导入Spring Web模块的注解,用于定义Controller类的请求映射路径。  
import org.springframework.web.bind.annotation.RequestMapping;  
  
// 导入Spring Web模块的注解,用于标识一个类为RESTful Web服务的Controller。  
import org.springframework.web.bind.annotation.RestController;  
  
// 使用@RestController注解标识该类为RESTful Web服务的Controller,  
// 意味着此类将处理HTTP请求并返回数据。  
@RestController  
  
// 使用@RequestMapping注解定义该Controller的基础路径为"direct"。  
@RequestMapping("direct")  
  
// 声明一个名为Producer的公共类。  
public class Producer {  
  
    // 使用@Autowired注解自动注入RabbitTemplate的实例,  
    // 以便在类中使用RabbitMQ的功能。  
    @Autowired  
    private RabbitTemplate rabbitTemplate;  
  
    // 使用@GetMapping注解映射HTTP GET请求到sendMsg方法,  
    // 当访问"/direct/sendMsg"路径时,将调用此方法。  
    @GetMapping("sendMsg")  
  
    // 声明一个公共的sendMsg方法,该方法不接收任何参数,并返回一个字符串。  
    public String sendMsg(){  
  
        // 定义一个要发送的消息字符串。  
        String msg = "已经生成了订单,需要减去库存1个";  
  
        // 使用RabbitTemplate的convertAndSend方法发送消息到RabbitMQ交换机。  
        // 第一个参数是交换机的名称("myDirectExchangeAAA"),  
        // 第二个参数是RoutingKey("keyAAA"),用于确定消息应该路由到哪个队列,  
        // 第三个参数是要发送的消息内容(msg)。  
        rabbitTemplate.convertAndSend("myDirectExchangeAAA","keyAAA",msg);  
  
        // 返回一个表示消息发送成功的字符串。  
        return "send msg ok";  
    }  
}


Consumer


package com.example.direct; // 声明包名为com.example.direct。  
  
import com.rabbitmq.client.Channel; // 导入RabbitMQ的Channel类,它代表了一个通信信道。  
  
import org.springframework.amqp.core.Message; // 导入Spring AMQP的Message类,表示一条消息。  
  
import org.springframework.amqp.rabbit.annotation.RabbitHandler; // 导入RabbitHandler注解,标识处理RabbitMQ消息的方法。  
  
import org.springframework.amqp.rabbit.annotation.RabbitListener; // 导入RabbitListener注解,用于监听RabbitMQ队列。  
  
import org.springframework.stereotype.Component; // 导入Spring的Component注解,标识该类为Spring的一个组件。  
  
import java.io.IOException; // 导入Java的IOException类,处理可能的输入输出异常。  
  
// 使用@Component注解将该类声明为Spring的一个组件,这样Spring会自动扫描并管理它。  
@Component   
public class Consumer { // 声明一个公共类Consumer。  
  
    // 使用@RabbitHandler注解标识该方法为处理RabbitMQ消息的方法。  
    // 使用@RabbitListener注解监听名为"queueAAA"的队列。  
    @RabbitHandler   
    @RabbitListener(queues = "queueAAA")   
    public void getMSg1(Message message, Channel channel){ // 定义一个公共方法getMSg1,接收一个Message和一个Channel作为参数。  
  
        try {  
            System.out.println("模拟库存业务处理减库存:" + message); // 打印接收到的消息。  
  
            Integer stock = 3;  
  
            int number = 10;   
  
            number -= stock; 
  
            // 消息确认:立马删除 消息。这是RabbitMQ的消息确认机制,确保消息被正确处理后可以安全删除。  
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);   
  
            System.out.println("减库存业务执行结束,队列消息已删除"); // 打印消息表示业务处理完毕且消息已被删除。  
  
        } catch (Exception e) { // 捕获所有异常。  
            try {  
                System.out.println("减库存业务有异常,消息重入队列"); // 打印异常信息。  
                // 当处理消息时发生异常,可以选择将消息重新放回队列以供后续处理。  
                // 这里basicReject的第二个参数为true,表示消息将重新入队。  
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);   
            } catch (IOException ioException) { // 捕获可能的IO异常。  
                ioException.printStackTrace(); // 打印IO异常的堆栈信息。  
            }  
            e.printStackTrace(); // 打印原始异常的堆栈信息。  
        }  
    }  
}


RabbitMQDirectConfig.java



package com.example.direct;


import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQDirectConfig {

//    1. 创建交换机
    @Bean
    public DirectExchange newDirectExchange(){
        return new DirectExchange("myDirectExchangeAAA",true,false);
    }

    //2. 创建队列
    @Bean
    public Queue newQueueA(){
        return new Queue("queueAAA",true);
    }

//3. 绑定队列到交换机中
    @Bean
    public Binding bindingA(){
        return BindingBuilder.bind(newQueueA()).to(newDirectExchange()).with("keyAAA");
    }


}


application.yaml


server:
  servlet:
    context-path: /app
spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    publisher-confirm-type: correlated  # 确认交换机已经接收到生产者的消息了
    publisher-returns: true   #  消息已经到了队列(交换机与队列绑定成功的)
    listener:
      simple:
        acknowledge-mode: manual # 手动消息确认



ServletInitializer.java


package com.example;

import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;

public class ServletInitializer extends SpringBootServletInitializer {

    @Override
    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
        return application.sources(RabbitmqApplication.class);
    }

}


RabbitmqApplication.java


package com.example;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitmqApplication {

    public static void main(String[] args) {
        SpringApplication.run(RabbitmqApplication.class, args);
    }

}


pom.xml


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.6</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>war</packaging>
    <name>rabbitmq</name>
    <description>rabbitmq</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>

        <!-- AMQP客户端 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.4.1</version>
        </dependency>



        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>