rabbitmq--默认模式(点对点)

发布于:2025-08-01 ⋅ 阅读:(13) ⋅ 点赞:(0)

导入包:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml 

spring
 rabbitmq:
  host: localhost
  port: 5672
  username: guest
  password: guest
  virtual-host: /  # 默认虚拟主机
  listener:
    simple:
      acknowledge-mode: manual # 可根据需要设置为auto
  publisher-confirm-type: correlated

RabbitMQConfig.java
package com.example.dyreportapi.config;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@Configuration
public class RabbitMQConfig {

    public static final String DECLARE_QUEUE = "*****替换正式队列名***"; // 申报队列
    public static final String RESPONSE_QUEUE = "***替换正式队列名**"; // 回执队列
    public static final String RK_QUEUE = "***替换正式队列名**"; // 出入库队列

    @Bean
    public Queue declareQueue() {
        return new Queue(DECLARE_QUEUE, true); // durable
    }

    @Bean
    public Queue responseQueue() {
        return new Queue(RESPONSE_QUEUE, true);
    }
    public Queue rkQUEUE() {
        return new Queue(RK_QUEUE, true);
    }



    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        template.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                System.out.println("消息成功投递至 RabbitMQ Broker");
            } else {
                System.out.println("消息未投递成功:" + cause);
            }
        });
        return template;
    }

}

ResponseMessageListener.java 

import com.example.dyreportapi.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import javax.xml.parsers.ParserConfigurationException;
import org.xml.sax.SAXException;

@Component
public class ResponseMessageListener {

    @RabbitListener(queues = RabbitMQConfig.RESPONSE_QUEUE)
    public void receiveMessage(String message, Channel channel, Message amqpMessage) throws IOException {
        System.out.println("接收到回执消息: " + message);

        long deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();

        try {
            // 解析XML消息
            if (isXmlMessage(message)) {
                parseXmlMessage(message);
            }

            // 判断是否是某个业务消息的回执
            if (message.contains("03355813-a4c0-49b8-955b-edd0e2934275") || message.contains("LT072303")) {
                System.out.println("✅ 匹配到本次发送的车辆备案回执!");
                // 正常确认消息
                channel.basicAck(deliveryTag, false);
            } else {
                // 不匹配的消息,拒绝并重新入队给其他消费者处理
                channel.basicNack(deliveryTag, false, true); // requeue=true
            }
        } catch (Exception e) {
            System.err.println("处理消息时发生错误: " + e.getMessage());
            // 处理异常时,可以选择重新入队或不重新入队
            channel.basicNack(deliveryTag, false, true);
        }
    }

    /**
     * 判断消息是否为XML格式
     */
    private boolean isXmlMessage(String message) {
        return message.trim().startsWith("<") && message.trim().endsWith(">");
    }

    /**
     * 解析XML消息并提取关键标签
     */
    private void parseXmlMessage(String xmlMessage) {
        try {
            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
            DocumentBuilder builder = factory.newDocumentBuilder();
            Document document = builder.parse(new ByteArrayInputStream(xmlMessage.getBytes("UTF-8")));

            // 获取根元素
            Element root = document.getDocumentElement();
            System.out.println("根元素: " + root.getNodeName());

            // 解析常见的回执标签示例
            parseCommonResponseTags(document);

        } catch (ParserConfigurationException | SAXException | IOException e) {
            System.err.println("XML解析失败: " + e.getMessage());
        }
    }

    /**
     * 解析常见的回执标签
     */
    private void parseCommonResponseTags(Document document) {
        // 解析PRE_NO标签
        NodeList preNoNodes = document.getElementsByTagName("PRE_NO");
        if (preNoNodes.getLength() > 0) {
            String preNo = preNoNodes.item(0).getTextContent();
            System.out.println("PRE_NO: " + preNo);
        }

        // 解析MESSAGE_ID标签
        NodeList messageIdNodes = document.getElementsByTagName("MESSAGE_ID");
        if (messageIdNodes.getLength() > 0) {
            String messageId = messageIdNodes.item(0).getTextContent();
            System.out.println("MESSAGE_ID: " + messageId);
        }

        // 解析RESPONSE_CODE标签
        NodeList responseCodeNodes = document.getElementsByTagName("RESPONSE_CODE");
        if (responseCodeNodes.getLength() > 0) {
            String responseCode = responseCodeNodes.item(0).getTextContent();
            System.out.println("RESPONSE_CODE: " + responseCode);
        }

        // 解析RESPONSE_MESSAGE标签
        NodeList responseMessageNodes = document.getElementsByTagName("RESPONSE_MESSAGE");
        if (responseMessageNodes.getLength() > 0) {
            String responseMessage = responseMessageNodes.item(0).getTextContent();
            System.out.println("RESPONSE_MESSAGE: " + responseMessage);
        }

        // 解析NK_STATUS标签
        NodeList nkStatusNodes = document.getElementsByTagName("NK_STATUS");
        if (nkStatusNodes.getLength() > 0) {
            String nkStatus = nkStatusNodes.item(0).getTextContent();
            System.out.println("NK_STATUS: " + nkStatus);
        }
    }
}
package com.example.dyreportapi.controller;

import com.example.dyreportapi.service.VehicleDeclareSender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class VehicleDeclareController {

    @Autowired
    private VehicleDeclareSender sender;

    @GetMapping("/sendVehicle")
    public String sendVehicleDeclare() {
        sender.sendVehicleDeclareMessage();
        return "车辆备案报文已发送";
    }

    @GetMapping("/sendVehicleOut")
    public String sendVehicleOut() {
        sender.sendVehicleHfdMessage();
        return "车辆出厂报文已发送";
    }
}
package com.example.dyreportapi.service;

import com.example.dyreportapi.config.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.UUID;

@Service
public class VehicleDeclareSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 车辆备案报文
     */
    public void sendVehicleDeclareMessage() {
        String messageId = UUID.randomUUID().toString();
        String messageDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
  System.out.println(messageId);
  System.out.println(messageDate);


        String xml = "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +
                "<DECLARE_DATA>\n" +
                "  <MESSAGE_HEAD>\n" +
                "    <MESSAGE_TYPE>VEH101</MESSAGE_TYPE>\n" +// 报文类型
                "    <MESSAGE_ID>" + messageId + "</MESSAGE_ID>\n" +// 报文ID
                "    <MESSAGE_TYPE>VEH101</MESSAGE_TYPE>\n" +// 报文类型
                "    <PRE_NO>LT072303</PRE_NO>\n" +// 预录入编号
                "    <B2B_NO>LT072303</B2B_NO>\n" +// 预录入编号
                "    <MESSAGE_DATE>" + messageDate + "</MESSAGE_DATE>\n" +// 报文时间
                "    <SENDER_ID>3408660A06</SENDER_ID>\n" +// 发送方ID
//                "    <SEND_ADDRESS>安庆</SEND_ADDRESS>\n" +// 发送方地址
                "    <RECEIVER_ID>3329</RECEIVER_ID>\n" +// 接收方ID
                "  </MESSAGE_HEAD>\n" +
                "  <VEH101>\n" +
                "    <VEH_BSC>\n" +
                "      <VEHICLE_NO>皖A12345</VEHICLE_NO>\n" +// 车牌号
                "      <B2B_NO>皖A12345</B2B_NO>\n" +// 车牌号
                "      <VEHICLE_TYPE>3</VEHICLE_TYPE>\n" +// 车辆类型
                "      <VEHICLE_WT>12345.67</VEHICLE_WT>\n" +// 车辆重量
                "      <CUSTOMS_CODE>3329</CUSTOMS_CODE>\n" +// 关区代码 备注:3329安庆综保
//                "      <NOTE>测试车辆备案</NOTE>\n" +// 备注
                "      <DCL_TYPECD>1</DCL_TYPECD>\n" +// 申报类型 1:备案;2:变更;
//                "      <STEP_ID></STEP_ID>\n" +// 当前环节
                "      <DECLARE_CODE>3408660A06</DECLARE_CODE>\n" + // 申报公司编号
                "      <DECLARE_NAME>安庆振新汽车有限公司</DECLARE_NAME>\n" +// 申报公司名称
                "      <CREATE_BY>admin</CREATE_BY>\n" +// 创建人
                "      <CREATE_TIME>" + messageDate + "</CREATE_TIME>\n" +// 创建时间
                "      <DECLARE_BY>admin</DECLARE_BY>\n" +// 申报人
                "      <DECLARE_TIME>" + messageDate + "</DECLARE_TIME>\n" +// 申报时间
//                "      <APPROVE_TIME></APPROVE_TIME>\n" +// 审核时间
                "    </VEH_BSC>\n" +
                "  </VEH101>\n" +
                "</DECLARE_DATA>";
        System.out.println("已发送车辆备案报文:" + xml);

        rabbitTemplate.convertAndSend(RabbitMQConfig.DECLARE_QUEUE, xml);
        System.out.println("已发送车辆备案报文至队列:" + RabbitMQConfig.DECLARE_QUEUE);
    }


    /**
     * 整车出厂报文
     */
    public void sendVehicleHfdMessage() {
        String messageId = UUID.randomUUID().toString();
        String messageDate = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        System.out.println(messageId);
        System.out.println(messageDate);

/**
 * <?xml version="1.0" encoding="UTF-8" standalone="yes"?>
 * <JEERP_BZ>
 *     <MESSAGE_HEAD>
 *         <MESSAGE_TYPE>HFDB2B</MESSAGE_TYPE>
 *         <MESSAGE_ID>232ab09e-75d0-4fbe-a94e-c02f752c6cf6-20250729101240</MESSAGE_ID>
 *         <MESSAGE_TIME>2025-07-29T10:12:40</MESSAGE_TIME>
 *         <SENDER_ADDRESS></SENDER_ADDRESS>
 *         <RECEIVER_ID>JGEQ</RECEIVER_ID>
 *         <RECEIVER_ADDRESS></RECEIVER_ADDRESS>
 *         <CUSTOM_CODE>3329</CUSTOM_CODE>
 *         <SEQNO>HFD202507297719</SEQNO>
 *         <NOTE></NOTE>
 *         <POCKET_ID>null232ab09e-75d0-4fbe-a94e-c02f752c6cf6-20250729101240</POCKET_ID>
 *         <TOTAL_POCKET_QTY>1</TOTAL_POCKET_QTY>
 *         <CUR_POCKET_NO>1</CUR_POCKET_NO>
 *         <ROUTE_CODE></ROUTE_CODE>
 *     </MESSAGE_HEAD>
 *     <MESSAGE_BODY>
 *         <JG_HFD_HEAD_B2B>
 *             <ERP_NO>HFD202507297719</ERP_NO>
 *             <DCL_TYPECD>1</DCL_TYPECD>
 *             <PASSPORT_TYPECD>5</PASSPORT_TYPECD>
 *             <IO_TYPECD>I</IO_TYPECD>
 *             <MASTER_CUSCD>3329</MASTER_CUSCD>
 *             <AREAIN_ETPSNO>3408660A06</AREAIN_ETPSNO>
 *             <DCL_ETPSNO>3408660A06</DCL_ETPSNO>
 *             <INPUT_CODE>3408660A06</INPUT_CODE>
 *             <CREATE_USER>hxy0154</CREATE_USER>
 *             <VEHICLE_NO>LT072302</VEHICLE_NO>
 *             <VEHICLE_WT>10000</VEHICLE_WT>
 *             <TOTAL_GROSS_WT>560</TOTAL_GROSS_WT>
 *             <TOTAL_NET_WT>510</TOTAL_NET_WT>
 *             <CREATE_DATE>2025-07-29T10:12:40</CREATE_DATE>
 *             <TRAILER_NO>LT072302</TRAILER_NO>
 *             <PLAT_TYPECD>0</PLAT_TYPECD>
 *             <COL1>0</COL1>
 *             <IS_PP>0</IS_PP>
 *             <ETPS_PREENT_NO>HFD202507297719</ETPS_PREENT_NO>
 *         </JG_HFD_HEAD_B2B>
 *         <JG_HFD_LIST_B2B>
 *             <GUID_L>HFD202507297719_1</GUID_L>
 *             <PASSPORT_SEQNO>1</PASSPORT_SEQNO>
 *             <GDECD>LLDB1</GDECD>
 *             <GDS_NM>领料调拨</GDS_NM>
 *             <DCL_UNITCD>007</DCL_UNITCD>
 *             <DCL_QTY>30</DCL_QTY>
 *             <GROSS_WT>560</GROSS_WT>
 *             <NET_WT>510</NET_WT>
 *             <IS_GOODS>0</IS_GOODS>
 *         </JG_HFD_LIST_B2B>
 *     </MESSAGE_BODY>
 * </JEERP_BZ>
 */
        // 构建基础的XML头部和尾部
        StringBuilder xmlBuilder = new StringBuilder();
        xmlBuilder.append("<?xml version=\"1.0\" encoding=\"UTF-8\"  standalone=\"yes\" ?>\n")
                .append("<JEERP_BZ>\n")
                .append("  <MESSAGE_HEAD>\n")
                //报文类型
                .append("    <MESSAGE_TYPE>HFDB2B</MESSAGE_TYPE>\n")
                //报文ID
                .append("    <MESSAGE_ID>").append(messageId).append("</MESSAGE_ID>\n")
                //报文发送时间
                .append("    <MESSAGE_TIME>").append(messageDate).append("</MESSAGE_TIME>\n")
                //发送方地址
                .append("    <SENDER_ADDRESS></SENDER_ADDRESS>\n")
                //申报公司编号
                .append("    <RECEIVER_ID>JGEQ</RECEIVER_ID>\n")
                //接收方地址
                .append("    <RECEIVER_ADDRESS></RECEIVER_ADDRESS>\n")
               //使用您提供的安庆关区代码
                .append("    <CUSTOM_CODE>3329</CUSTOM_CODE>\n")
                //业务单号,唯一
                .append("    <SEQNO>HFD202507245559</SEQNO>\n")

               //备注
                .append("    <NOTE></NOTE>\n")
                .append("    <POCKET_ID>").append(messageId).append("</POCKET_ID>\n")
                .append("    <TOTAL_POCKET_QTY>1</TOTAL_POCKET_QTY>\n")
                .append("    <CUR_POCKET_NO>1</CUR_POCKET_NO>\n")
                .append("    <ROUTE_CODE></ROUTE_CODE>\n")
                .append("  </MESSAGE_HEAD>\n")

                .append("  <MESSAGE_BODY>\n")

                .append("  <JG_HFD_HEAD_B2B>\n")
                //业务单号,唯一
                .append("    <ERP_NO>HFD202507245559</ERP_NO>\n")

                //申报类型 1备案  3作废
                .append("    <DCL_TYPECD>1</DCL_TYPECD>\n")
                //核放单类型 5-卡口登记货物 6-空车进出区
                .append("    <PASSPORT_TYPECD>5</PASSPORT_TYPECD>\n")

                //进出标志 I-进区,E-出区
                .append("    <IO_TYPECD>E</IO_TYPECD>\n")
                //主管海关
                .append("    <MASTER_CUSCD>3329</MASTER_CUSCD>\n")
                //国内企业编码
                .append("    <AREAIN_ETPSNO>3408660A06</AREAIN_ETPSNO>\n")
                //申报单位代码
                .append("    <DCL_ETPSNO>3408660A06</DCL_ETPSNO>\n")
                //录入单位代码
                .append("    <INPUT_CODE>3408660A06</INPUT_CODE>\n")
                //创建
                .append("    <CREATE_USER>hxy0154</CREATE_USER>\n")
                //车牌号
                .append("    <VEHICLE_NO>YQY99999</VEHICLE_NO>\n")
                //车自重
                .append("    <VEHICLE_WT>1000.19</VEHICLE_WT>\n")
                //货物总毛重
                .append("    <TOTAL_GROSS_WT>560</TOTAL_GROSS_WT>\n")
                //货物总净重
                .append("    <TOTAL_NET_WT>510</TOTAL_NET_WT>\n")
                //创建时间

                .append("    <CREATE_DATE>").append(messageDate).append("</CREATE_DATE>\n")
               //挂车车牌号
                .append("    <TRAILER_NO>YQY99999</TRAILER_NO>\n")
                //是否无车辆运输货物 0 否 1是
                .append("    <PLAT_TYPECD>0</PLAT_TYPECD>\n")
                //到货确认标记
                .append("    <COL1>0</COL1>\n")
                //是否拼票业务
                .append("    <IS_PP>0</IS_PP>\n")
                //企业内部编号
                .append("    <ETPS_PREENT_NO>HFD202507245559</ETPS_PREENT_NO>\n")
                .append("  </JG_HFD_HEAD_B2B>\n");

        // 使用for循环生成多个JG_HFD_LIST_B2B项
        for (int i = 1; i <= 1; i++) {
            xmlBuilder.append("  <JG_HFD_LIST_B2B>\n")
                    // 报文ID
                    .append("    <GUID_L>HFD202507245559_").append(i).append("</GUID_L>\n")
                    // 序号 从1开始递增的正整数
                    .append("    <PASSPORT_SEQNO>1</PASSPORT_SEQNO>\n")
                    // 商品编码
                    .append("    <GDECD>LLDB1</GDECD>\n")
                    // 商品名称
                    .append("    <GDS_NM>车辆1</GDS_NM>\n")
                    // 申报计量单位
                    .append("    <DCL_UNITCD>007</DCL_UNITCD>\n")
                    // 申报数量
                    .append("    <DCL_QTY>3</DCL_QTY>\n")
                    // 申报毛重
                    .append("    <GROSS_WT>560</GROSS_WT>\n")
                    // 申报净重
                    .append("    <NET_WT>510</NET_WT>\n")
                    // 是否商品
                    .append("    <IS_GOODS>0</IS_GOODS>\n")
                    .append("  </JG_HFD_LIST_B2B>\n");
        }

        // 添加XML尾部
        xmlBuilder.append("  </MESSAGE_BODY>\n")
                .append("</JEERP_BZ>");

        String xml = xmlBuilder.toString();

        System.out.println("已生成金关核放单报文:" + xml);

      rabbitTemplate.convertAndSend(RabbitMQConfig.RK_QUEUE, xml);
      System.out.println("已发送车辆出入库消息:" + RabbitMQConfig.RK_QUEUE);
    }
}