使用 Temporal Go SDK 构建高可用分布式工作流

解决复杂分布式事务与长耗时业务流程的可靠性问题:通过 Temporal Go SDK 实现具备状态持久化、自动重试和确定性执行的工业级工作流。

为什么需要这个技能

在构建微服务架构时,处理跨服务、长周期(可能持续数天或数月)且必须保证最终一致性的业务流程非常困难。传统的 MQ 或定时任务方案难以在面对宕机、网络分区或复杂状态回滚(如 Saga 模式)时提供强有力的保证。

Temporal 提供了“持久化执行”能力,使开发者能够编写像本地函数一样简单的代码,但其状态在集群中自动持久化。然而,Temporal 对工作流的**确定性(Determinism)**有严格要求,任何非确定性代码都会导致 Replay 失败并引发 Panic。掌握此技能可以让开发者在确保系统健壮性的同时,高效处理高并发的分布式编排。

适用场景

  • 长周期业务流程:如订单履约、用户订阅计费、入职审批流程。
  • 复杂分布式事务:需要实现 Saga 模式以在步骤失败时执行补偿操作。
  • 高可靠任务编排:需要严格重试策略、超时管理且不能丢失状态的后台任务。
  • 企业级安全集成:需要配置 mTLS 证书认证的生产级 Worker 部署。

核心工作流

  1. 上下文收集与环境定义:明确 Temporal 集群类型(云端或自建)、Namespace、任务队列(Task Queue)以及安全认证要求(如 mTLS 路径)。
  2. 确保确定性校验:在编写 Workflow 代码前,必须遵循五大禁令:
    • 禁止使用原生的 Go 协程(应使用 workflow.Go)。
    • 禁止使用原生时间函数(如 time.Now,应使用 workflow.Now)。
    • 禁止非确定性的 Map 迭代(必须先对 Key 进行排序)。
    • 禁止在 Workflow 内部直接进行网络 IO 或磁盘操作(必须放入 Activity 中)。
    • 禁止使用原生随机数。
  3. 分层实现:按照 数据定义 $\to$ Activity 实现 $\to$ Workflow 编排 $\to$ Worker 部署 的顺序递进。
  4. 生命周期管理:针对长历史记录的工作流,实现 ContinueAsNew 以防止事件历史过大导致性能下降。

代码示例

确定性版本化工作流

func SubscriptionWorkflow(ctx workflow.Context, userID string) error {
    // 1. 使用版本化管理逻辑演进,防止升级时导致旧工作流 Replay 失败
    v := workflow.GetVersion(ctx, "billing_logic", workflow.DefaultVersion, 2)

    for i := 0; i < 12; i++ {
        ao := workflow.ActivityOptions{
            StartToCloseTimeout: 5 * time.Minute,
            RetryPolicy: &temporal.RetryPolicy{MaximumAttempts: 3},
        }
        ctx = workflow.WithActivityOptions(ctx, ao)

        // 2. 执行 Activity 并处理错误
        err := workflow.ExecuteActivity(ctx, ChargePaymentActivity, userID).Get(ctx, nil)
        if err != nil {
            workflow.GetLogger(ctx).Error("Payment failed", "Error", err)
            return err
        }

        // 3. 持久化休眠(支持时间跳跃测试)
        sleepDuration := 30 * 24 * time.Hour
        if v >= 2 {
            sleepDuration = 28 * 24 * time.Hour
        }

        if err := workflow.Sleep(ctx, sleepDuration); err != nil {
            return err
        }
    }
    return nil
}

配置 mTLS 安全 Worker

func RunSecureWorker() error {
    cert, err := tls.LoadX509KeyPair("client.pem", "client.key")
    if err != nil {
        return fmt.Errorf("failed to load client keys: %w", err)
    }

    caPem, err := os.ReadFile("ca.pem")
    if err != nil {
        return fmt.Errorf("failed to read CA cert: %w", err)
    }
    certPool := x509.NewCertPool()
    certPool.AppendCertsFromPEM(caPem)

    c, err := client.Dial(client.Options{
        HostPort:  "temporal.example.com:7233",
        Namespace: "production",
        ConnectionOptions: client.ConnectionOptions{
            TLS: &tls.Config{
                Certificates: []tls.Certificate{cert},
                RootCAs:      certPool,
            },
        },
    })
    if err != nil {
        return err
    }
    defer c.Close()

    w := worker.New(c, "payment-queue", worker.Options{})
    w.RegisterWorkflow(SubscriptionWorkflow)
    return w.Run(worker.InterruptCh())
}

下载和安装

下载 temporal-golang-pro 中文版 Skill ZIP

解压后将目录放入你的 AI 工具 skills 文件夹,重启工具后即可使用。具体路径参考内附的 USAGE.zh.md

你可能还需要

暂无推荐