RocketMQ5.0 生产者

发布于:2024-08-08 ⋅ 阅读:(125) ⋅ 点赞:(0)

生产者消息类型:

​​​​​​​

延迟队列的生产者

package main

import (
	"context"
	"fmt"
	"github.com/apache/rocketmq-clients/golang/v5"
	"github.com/apache/rocketmq-clients/golang/v5/credentials"
	errgroup2 "golang.org/x/sync/errgroup"
	"log"
	"os"
	"strconv"
	"time"
)

const (
	Topic     = "DelayTopic"
	GroupName = "testG"
	Endpoint  = "localhost:8081"
	Region    = "xxxxxx"
	AccessKey = "xxxxxx"
	SecretKey = "xxxxxx"
)

func main() {
	os.Setenv("mq.consoleAppender.enabled", "true")
	golang.ResetLogger()
	// new producer instance

	producer, err := golang.NewProducer(&golang.Config{
		Endpoint:    Endpoint,
		Credentials: &credentials.SessionCredentials{},
	},
		golang.WithTopics(Topic),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start producer
	err = producer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// gracefule stop producer
	defer producer.GracefulStop()
	var wg = errgroup2.Group{}
	wg.SetLimit(10)
	for i := 0; i < 1000; i++ {
		wg.Go(func() error {
			msg := &golang.Message{
				Topic: Topic,
				Body:  []byte("this is a message : " + strconv.Itoa(i) + time.Now().Format(time.DateTime)),
			}
			// set keys and tag
			msg.SetKeys("a", "b")
			msg.SetTag("ab")
			msg.SetDelayTimestamp(time.Now().Add(time.Second * 10))

			// send message in sync
			resp, err := producer.Send(context.TODO(), msg)
			if err != nil {
				log.Fatal(err)
			}

			for i := 0; i < len(resp); i++ {
				fmt.Printf("%#v\n", resp[i])
			}
			return nil
		})

		// wait a moment
		time.Sleep(time.Second * 1)
	}
	wg.Wait()
	time.Sleep(time.Minute * 10)
}

设置topic的。message.type                                                                                                                                     docker exec -it rmqnamesrv /bin/bash       

sh mqadmin updateTopic -c DefaultCluster -t DelayTopic -n 127.0.0.1:9876 -a +message.type=DELAY


sh mqadmin updateTopic -n <nameserver_address> -t <topic_name> -c <cluster_name> -a +message.type=DELAY

   消费者

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/apache/rocketmq-clients/golang"
	"github.com/apache/rocketmq-clients/golang/credentials"
)

const (
	Topic     = "DelayTopic"
	GroupName = "testG"
	Endpoint  = "localhost:8081"
)

var (
	// maximum waiting time for receive func
	awaitDuration = time.Second * 5
	// maximum number of messages received at one time
	maxMessageNum int32 = 16
	// invisibleDuration should > 20s
	invisibleDuration = time.Second * 20
	// receive messages in a loop
)

func main() {
	// log to console
	os.Setenv("mq.consoleAppender.enabled", "true")
	golang.ResetLogger()
	// new simpleConsumer instance
	simpleConsumer, err := golang.NewSimpleConsumer(&golang.Config{
		Endpoint:      Endpoint,
		Credentials:   &credentials.SessionCredentials{},
		ConsumerGroup: "string",
	},
		golang.WithAwaitDuration(awaitDuration),
		golang.WithSubscriptionExpressions(map[string]*golang.FilterExpression{
			Topic: golang.SUB_ALL,
		}),
	)
	if err != nil {
		log.Fatal(err)
	}
	// start simpleConsumer
	err = simpleConsumer.Start()
	if err != nil {
		log.Fatal(err)
	}
	// gracefule stop simpleConsumer
	defer simpleConsumer.GracefulStop()

	go func() {
		defer func() {
			if err := recover(); err != nil {
				fmt.Println(err)
			}
		}()
		for {
			fmt.Println("start recevie message")
			mvs, err := simpleConsumer.Receive(context.TODO(), maxMessageNum, invisibleDuration)
			if err != nil {
				fmt.Println(err)
			}
			// ack message
			for _, mv := range mvs {
				simpleConsumer.Ack(context.TODO(), mv)
				fmt.Println(string(mv.GetBody()) + "  " + time.Now().Format(time.DateTime))
			}
			fmt.Println("wait a moment")
			fmt.Println()
			time.Sleep(time.Second * 3)
		}
	}()
	// run for a while
	time.Sleep(time.Minute * 20)
}


网站公告

今日签到

点亮在社区的每一天
去签到