【设计模式】12、observer 观察者模式

发布于:2024-04-30 ⋅ 阅读:(33) ⋅ 点赞:(0)

十二、observer 观察者模式

https://refactoringguru.cn/design-patterns/observer

发布订阅模式, client 都可以向 broker 注册, broker 管理所有 connection, 当 broker 收到某事件时, 广播给所有 clients.

  • 各种消息队列, 如 kafka, redis stream 都是这种结构
  • websocket server, live video streaming server 也都是这样的

12.1 subscriber

https://refactoringguru.cn/design-patterns/observer/go/example

如果有一个 websocket broker, 各种 client 都注册, 当 broker 收到消息时向各 clients 广播消息

12observer/121subscriber
├── broker.go
├── broker_test.go
├── client.go
└── readme.md

12.1.1 broker_test.go

package _21subscriber

import (
	"fmt"
	"testing"
)

/*
=== RUN   TestBroker
给ws连接通知
给ws连接通知
给http连接通知

给ws连接通知
给ws连接通知
给http连接通知
给tcp连接通知

给ws连接通知
给ws连接通知
给http连接通知
给tcp连接通知
--- PASS: TestBroker (0.00s)
PASS
*/
func TestBroker(t *testing.T) {
	b := NewBroker(&wsClient{}, &wsClient{}, &httpClient{})
	b.Broadcast()
	b.NotifySome("ws1", "ws2", "http3")
	fmt.Println()

	b.AddSubscription(&tcpClient{})
	b.Broadcast()
	fmt.Println()

	b.RemoveSubscription(&httpClient{})
	b.Broadcast()
}

12.2.2 broker.go

package _21subscriber

import (
	"slices"
)

type broker struct {
	clients []client
}

func NewBroker(clients ...client) *broker {
	return &broker{
		clients: clients,
	}
}

func (b *broker) AddSubscription(c client) {
	b.clients = append(b.clients, c)
}

func (b *broker) RemoveSubscription(c client) {
	clients := make([]client, 0)
	for _, cli := range b.clients {
		if c.ID() == cli.ID() {
			continue
		}
		clients = append(clients, cli)
	}
	b.clients = clients
}

func (b *broker) Broadcast() {
	for _, c := range b.clients {
		c.Notify()
	}
}

func (b *broker) NotifySome(ids ...string) {
	for _, c := range b.clients {
		if slices.Contains(ids, c.ID()) {
			c.Notify()
		}
	}
}

12.2.3 client.go

package _21subscriber

import (
	"fmt"
	"math/rand"
	"strconv"
)

type client interface {
	ID() string
	Notify()
}

type wsClient struct{}

func (c *wsClient) Notify() {
	fmt.Println("给ws连接通知")
}

func (c *wsClient) ID() string {
	return "ws" + strconv.Itoa(rand.Int())
}

type httpClient struct{}

func (c *httpClient) Notify() {
	fmt.Println("给http连接通知")
}

func (c *httpClient) ID() string {
	return "http" + strconv.Itoa(rand.Int())
}

type tcpClient struct{}

func (c *tcpClient) Notify() {
	fmt.Println("给tcp连接通知")
}

func (c *tcpClient) ID() string {
	return "tcp" + strconv.Itoa(rand.Int())
}

网站公告

今日签到

点亮在社区的每一天
去签到