Go微服务: 基于rocketmq:server和rocketmq:broker搭建RocketMQ环境,以及生产消息和延迟消费消息的实现

发布于:2024-06-06 ⋅ 阅读:(134) ⋅ 点赞:(0)

RocketMQ 的搭建


1 ) 配置 docker-compose.yaml 文件

version: '3.5'
services:
  rmqnamesrv:
    image: foxiswho/rocketmq:server
    container_name: rmqnamesrv
    ports:
      - 9876:9876
    volumes:
      - ./logs:/opt/logs
      - ./store:/opt/store
    networks:
      rmq:
        aliases:
          - rmqnamesrv

  rmqbroker:
    image: foxiswho/rocketmq:broker
    container_name: rmqbroker
    ports:
      - "10909:10909"
      - "10911:10911"
    volumes:
      - ./logs:/opt/logs
      - ./store:/opt/store
      - ./conf/broker.conf:/etc/rocketmq/broker.conf
    environment:
        NAMESRV_ADDR: "rmqnamesrv:9876"
        JAVA_OPTS: " -Duser.home=/opt"
        JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
    command: mqbroker -c /etc/rocketmq/broker.conf
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqbroker

  rmqconsole:
    image: styletang/rocketmq-console-ng
    container_name: rmqconsole
    ports:
      - "8080:8080"
    environment:
      JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
    depends_on:
      - rmqnamesrv
    networks:
      rmq:
        aliases:
          - rmqconsole

networks:
  rmq:
    name: rmq
    driver: bridge

2 ) 配置文件 conf/broker.conf

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
brokerIP1=192.168.124.6
defaultTopicQueueNums = 4
autoCreateTopicEnable = true
autoCreateSubscriptionGroup = true
listenPort = 10911
deleteWhen = 04
fileReservedTime = 120
mapedfileSizeCommitLog = 1073741824
mapedfileSizeConsumeQueue=300000
diskMaxUsedSpaceRatio = 88
maxMessageSize=65536
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
  • 注意,需要指定 brokerIP1 且不能使用 0.0.0.0 也不能不指定,否则无法通信
  • fileReservedTime 默认是 48h

3 ) 拉取镜像

  • $ docker pull foxiswho/rocketmq:server
  • $ docker pull foxiswho/rocketmq:broker
  • $ docker pull styletang/rocketmq-console-ng

4 )启动和检查

  • 启动 $ docker compose up -d
  • 检查状态 $ docker compose ps

打开 UI 界面验证

  • 访问:http://127.0.0.1:8080

上面这个就是和上面的 brokerIP1 对应

编写程序验证生产和消费消息

  • 现在简述下场景
    • 生产5条消息
    • 10s 后进行消费

代码实现

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	"github.com/apache/rocketmq-client-go/v2"
	"github.com/apache/rocketmq-client-go/v2/consumer"
	"github.com/apache/rocketmq-client-go/v2/primitive"
	"github.com/apache/rocketmq-client-go/v2/producer"
)

const groupName = "BBS_SHOP_GROUP_123"

func GetMqAddr() string {
	mqAddr := "127.0.0.1:9876" // 这里填入你的 NameServer 端口
	return mqAddr
}

func ProduceMsg(mqAddr string, topic string) {
	p, err := rocketmq.NewProducer( // 普通消息生产者
		producer.WithGroupName(groupName),
		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),
		producer.WithRetry(2),
	)
	if err != nil {
		panic(err)
	}
	err = p.Start()
	if err != nil {
		log.Fatal()
		fmt.Println("生产者错误: %v", err.Error())
		os.Exit(1)
	}
	for i := 0; i < 5; i++ {
		msg := &primitive.Message{
			Topic: topic,
			Body:  []byte("Hello XProjectOrder " + strconv.Itoa(i)),
		}
		msg.WithDelayTimeLevel(3)
		r, err := p.SendSync(context.Background(), msg)
		if err != nil {
			fmt.Println("发送消息错误: %v", err.Error())
		} else {
			fmt.Println("生产消息成功: " + r.String() + "-" + r.MsgID)
		}
	}
	err = p.Shutdown()
	if err != nil {
		fmt.Println("生产者shutdown: %v", err.Error())
		os.Exit(1)
	}
}

func ComsumeMsg(mqAddr string, topic string) {
	c, err := rocketmq.NewPushConsumer(
		consumer.WithGroupName(groupName),
		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{mqAddr})),
	)
	if err != nil {
		panic(err)
	}
	err = c.Subscribe(topic, consumer.MessageSelector{},
		func(ctx context.Context, msgList ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
			for i := range msgList {
				fmt.Printf("订阅消息,消费%v \n", msgList[i])
			}
			return consumer.ConsumeSuccess, nil
		})
	if err != nil {
		fmt.Println("消费消息错误: %v", err.Error())
	}
	err = c.Start()
	if err != nil {
		fmt.Println("开启消费这错误: %v", err.Error())
	}
	time.Sleep(time.Hour)
	err = c.Shutdown()
	if err != nil {
		fmt.Println("shutdown消费者错误: %v", err.Error())
	}
}

func main() {
	topic := "BBS_SHOP_TOPIC_123"
	mqAddr := GetMqAddr()
	ProduceMsg(mqAddr, topic)
	ComsumeMsg(mqAddr, topic)
}
  • 以上是一个 demo,在真实场景,自行进行封装处理

  • 这里定义了一个 主题 BBS_SHOP_TOPIC_123 和 一个订阅组 BBS_SHOP_GROUP_123

  • 查看生产消息输出

    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80001, offsetMsgId=C0A87C0600002A9F0000000000000000, queueOffset=0, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80001
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80002, offsetMsgId=C0A87C0600002A9F00000000000000DE, queueOffset=1, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]]-C0A87C06209F0000000017eef7e80002
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80003, offsetMsgId=C0A87C0600002A9F00000000000001BC, queueOffset=2, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]]-C0A87C06209F0000000017eef7e80003
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80004, offsetMsgId=C0A87C0600002A9F000000000000029A, queueOffset=3, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]]-C0A87C06209F0000000017eef7e80004
    生产消息成功: SendResult [sendStatus=0, msgIds=C0A87C06209F0000000017eef7e80005, offsetMsgId=C0A87C0600002A9F0000000000000378, queueOffset=4, messageQueue=MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]]-C0A87C06209F0000000017eef7e80005
    
  • 查看消费 (10s 之后)

    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 0, Flag=0, properties=map[CONSUME_START_TIME:1717572747702 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80001], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80001, OffsetMsgId=C0A87C0600002A9F0000000000000456,QueueId=1, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737685, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747694, StoreHost=192.168.124.6:10911, CommitLogOffset=1110, BodyCRC=407480418, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 1, Flag=0, properties=map[CONSUME_START_TIME:1717572747706 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:2 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80002], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80002, OffsetMsgId=C0A87C0600002A9F0000000000000533,QueueId=2, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737698, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747701, StoreHost=192.168.124.6:10911, CommitLogOffset=1331, BodyCRC=1867421940, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 2, Flag=0, properties=map[CONSUME_START_TIME:1717572747710 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:3 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80003], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80003, OffsetMsgId=C0A87C0600002A9F0000000000000610,QueueId=3, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737702, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747704, StoreHost=192.168.124.6:10911, CommitLogOffset=1552, BodyCRC=1984416078, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=0]" consumerGroup=BBS_SHOP_GROUP_123 offset=0
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 3, Flag=0, properties=map[CONSUME_START_TIME:1717572747713 DELAY:3 MAX_OFFSET:1 MIN_OFFSET:0 REAL_QID:0 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80004], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80004, OffsetMsgId=C0A87C0600002A9F00000000000006ED,QueueId=0, StoreSize=221, QueueOffset=0, SysFlag=0, BornTimestamp=1717572737706, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747709, StoreHost=192.168.124.6:10911, CommitLogOffset=1773, BodyCRC=21035480, ReconsumeTimes=0, PreparedTransactionOffset=0] 
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=1]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=2]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    INFO[0010] update offset to broker success               MessageQueue="MessageQueue [topic=BBS_SHOP_TOPIC_123, brokerName=broker-bbs, queueId=3]" consumerGroup=BBS_SHOP_GROUP_123 offset=1
    订阅消息,消费[Message=[topic=BBS_SHOP_TOPIC_123, body=Hello XProjectOrder 4, Flag=0, properties=map[CONSUME_START_TIME:1717572747717 DELAY:3 MAX_OFFSET:2 MIN_OFFSET:0 REAL_QID:1 REAL_TOPIC:BBS_SHOP_TOPIC_123 UNIQ_KEY:C0A87C06209F0000000017eef7e80005], TransactionId=], MsgId=C0A87C06209F0000000017eef7e80005, OffsetMsgId=C0A87C0600002A9F00000000000007CA,QueueId=1, StoreSize=221, QueueOffset=1, SysFlag=0, BornTimestamp=1717572737709, BornHost=172.30.0.1:46162, StoreTimestamp=1717572747711, StoreHost=192.168.124.6:10911, CommitLogOffset=1994, BodyCRC=522480763, ReconsumeTimes=0, PreparedTransactionOffset=0]
    
  • 以上就是生产和消费的主要过程

效果

总结

  • 以上是简单的环境搭建和生产消息,以及延迟消费消息的 demo 示例
  • 实际场景中,结合以上demo,对一些异步发送消息的场景进行灵活运用和升级

网站公告

今日签到

点亮在社区的每一天
去签到