Go语言——goflow工作流使用

发布于:2025-05-11 ⋅ 阅读:(11) ⋅ 点赞:(0)

一、引入依赖

这个很坑,他不允许连接带密码的redis,只能使用不带密码的redis,要带密码的话得自己改一下源代码,无语

go get github.com/s8sg/goflow

二、画出我们的工作流程

在这里插入图片描述

三、编写代码

package main

import (
	"encoding/json"
	"fmt"
	flow "github.com/s8sg/goflow/flow/v1"
	goflow "github.com/s8sg/goflow/v1"
	"log"
	"math/rand"
	"strconv"
)

// Input 输入一个数字
func Input(data []byte, option map[string][]string) ([]byte, error) {
	var input map[string]int
	// 获取输入的数
	if err := json.Unmarshal(data, &input); err != nil {
		return nil, err
	}
	outputInt := input["input"]
	// 将数据交给工作流处理
	return []byte(strconv.Itoa(outputInt)), nil
}

// AddOne 加上10以内的一个随机整数
func AddOne(data []byte, option map[string][]string) ([]byte, error) {
	// 获取上一个工作流的数据
	num, _ := strconv.Atoi(string(data))
	outputInt := num + rand.Intn(10) + 1
	fmt.Println("AddOne = ", outputInt)
	// 交给下一个工作流处理
	return []byte(strconv.Itoa(outputInt)), nil
}

// AddTwo 加上10以内的一个随机整数
func AddTwo(data []byte, option map[string][]string) ([]byte, error) {
	num, _ := strconv.Atoi(string(data))
	outputInt := num + rand.Intn(10) + 1
	fmt.Println("AddTwo = ", outputInt)
	return []byte(strconv.Itoa(outputInt)), nil
}

// Aggregator 聚合节点
func Aggregator(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Println("Aggregator = ", string(data))
	return data, nil
}

// Expand10 扩大10倍
func Expand10(data []byte, option map[string][]string) ([]byte, error) {
	num, _ := strconv.Atoi(string(data))
	outputInt := num * 10
	fmt.Println("Expand10 = ", outputInt)
	return []byte(strconv.Itoa(outputInt)), nil
}

// Expand100 扩大100倍
func Expand100(data []byte, option map[string][]string) ([]byte, error) {
	num, _ := strconv.Atoi(string(data))
	outputInt := num * 100
	fmt.Println("Expand100 = ", outputInt)
	return []byte(strconv.Itoa(outputInt)), nil
}

// Output 输出节点
func Output(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Println("Output = ", string(data))
	return data, nil
}

// 定义我们自己的一个流程
func MyFlow(workflow *flow.Workflow, context *flow.Context) error {
	// 创建DAG
	dag := workflow.Dag()
	// 创建节点
	dag.Node("input", Input)
	dag.Node("add-one", AddOne)
	dag.Node("add-two", AddTwo)
	// 这个聚合节点,就需要拿到add-one和add-two的结果
	dag.Node("aggregator", Aggregator, flow.Aggregator(func(m map[string][]byte) ([]byte, error) {
		addOneResult, _ := strconv.Atoi(string(m["add-one"]))
		addTwoResult, _ := strconv.Atoi(string(m["add-two"]))
		num := addOneResult + addTwoResult
		fmt.Println("aggregator = ", num)
		return []byte(strconv.Itoa(num)), nil
	}))
	// 这个方式是获取到节点的数据进行判断,然后返回一个字符串数组
	f1 := func(bytes []byte) []string {
		num, _ := strconv.Atoi(string(bytes))
		fmt.Println("ConditionalBranch = ", num)
		if num > 10 {
			return []string{"moreThan"}
		}
		return []string{"lessThan"}
	}
	// 这个方法就是将分支的数据返回给output
	f2 := func(m map[string][]byte) ([]byte, error) {
		if v, ok := m["moreThan"]; ok {
			i, _ := strconv.Atoi(string(v))
			fmt.Println("f2 moreThan = ", i)
			return v, nil
		}
		if v, ok := m["lessThan"]; ok {
			i, _ := strconv.Atoi(string(v))
			fmt.Println("f2 lessThan = ", i)
			return v, nil
		}
		return nil, nil
	}
	// 创建一个条件分支节点
	branches := dag.ConditionalBranch("judge", []string{"moreThan", "lessThan"}, f1, flow.Aggregator(f2))
	branches["moreThan"].Node("expand-10", Expand10)
	branches["lessThan"].Node("expand-100", Expand100)
	dag.Node("output", Output)
	// 构建关系
	dag.Edge("input", "add-one")
	dag.Edge("input", "add-two")
	dag.Edge("add-one", "aggregator")
	dag.Edge("add-two", "aggregator")
	dag.Edge("aggregator", "judge")
	dag.Edge("judge", "output")
	return nil
}

func main() {
	fs := goflow.FlowService{
		Port:              10001,
		RedisURL:          "127.0.0.1:6379",
		RedisPwd:          "p@ssw0rd",
		WorkerConcurrency: 5,
		RetryCount:        0,
	}
	if err := fs.Register("myFlow", MyFlow); err != nil {
		log.Printf("goflow register err: %v\n", err)
		return
	}
	if err := fs.Start(); err != nil {
		panic(err)
	}
}

四、Postman测试

在这里插入图片描述

在这里插入图片描述


网站公告

今日签到

点亮在社区的每一天
去签到