如何用 Saga 编排分布式事务与长流程
解决分布式事务难题:通过编排模式管理跨服务流程,在部分失败时触发预定义的补偿事务,确保数据最终一致性,适用于订单、支付等长业务场景。
为什么需要这个技能
在微服务架构中,跨多个服务完成一笔交易(如订单、支付、发货)时,不存在全局数据库,无法使用传统两阶段提交(2PC)。传统 ACID 事务会导致服务强耦合,难以扩展。
Saga 模式将长事务拆解为一系列本地短事务。每个事务成功则推进流程,失败则触发补偿事务(逆向操作)回滚已完成的步骤。本技能提供基础架构与模板,帮助你在不牺牲一致性的前提下,优雅处理长流程。
适用场景
- 订单履约:库存锁定 -> 支付 -> 发货 -> 通知。任一环节失败需释放库存或退款。
- 跨服务审批流:多级审批中,前置审批通过但后置被拒时,自动撤销前置操作。
- 长周期业务流程:耗时数小时甚至跨天的作业,需检查点恢复和超时处理。
- 高可用系统:避免单点故障导致整个链路阻塞,支持重试与熔断。
核心工作流
- 定义步骤与补偿:明确每个本地事务(Action)及其对应的补偿操作(Compensation)。
- 选择编排策略:
- 编排(Orchestration):由中央调度器决定下一步,失败时自动触发回滚链。
- 编排(Choreography):服务间通过事件驱动协作,无需中央控制器。
- 管理状态机:跟踪
Started->Pending->Compensating->Completed/Failed。 - 处理超时:为长步骤设置超时检测,防止无限等待。
代码示例
以下模板展示了基于 Python 的基础编排器实现,包含事件发布、补偿执行及超时控制。
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Any, Optional
from datetime import datetime
import uuid
class SagaState(Enum):
STARTED = "started"
PENDING = "pending"
COMPENSATING = "compensating"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: str
compensation: str
status: str = "pending"
result: Optional[Dict] = None
error: Optional[str] = None
executed_at: Optional[datetime] = None
compensated_at: Optional[datetime] = None
@dataclass
class Saga:
saga_id: str
saga_type: str
state: SagaState
data: Dict[str, Any]
steps: List[SagaStep]
current_step: int = 0
created_at: datetime = field(default_factory=datetime.utcnow)
updated_at: datetime = field(default_factory=datetime.utcnow)
class SagaOrchestrator(ABC):
"""Base class for saga orchestrators."""
def __init__(self, saga_store, event_publisher):
self.saga_store = saga_store
self.event_publisher = event_publisher
@abstractmethod
def define_steps(self, data: Dict) -> List[SagaStep]:
"""Define the saga steps."""
pass
@property
@abstractmethod
def saga_type(self) -> str:
"""Unique saga type identifier."""
pass
async def start(self, data: Dict) -> Saga:
"""Start a new saga."""
saga = Saga(
saga_id=str(uuid.uuid4()),
saga_type=self.saga_type,
state=SagaState.STARTED,
data=data,
steps=self.define_steps(data)
)
await self.saga_store.save(saga)
await self._execute_next_step(saga)
return saga
async def handle_step_completed(self, saga_id: str, step_name: str, result: Dict):
"""Handle successful step completion."""
saga = await self.saga_store.get(saga_id)
# Update step
for step in saga.steps:
if step.name == step_name:
step.status = "completed"
step.result = result
step.executed_at = datetime.utcnow()
break
saga.current_step += 1
saga.updated_at = datetime.utcnow()
# Check if saga is complete
if saga.current_step >= len(saga.steps):
saga.state = SagaState.COMPLETED
await self.saga_store.save(saga)
await self._on_saga_completed(saga)
else:
saga.state = SagaState.PENDING
await self.saga_store.save(saga)
await self._execute_next_step(saga)
async def handle_step_failed(self, saga_id: str, step_name: str, error: str):
"""Handle step failure - start compensation."""
saga = await self.saga_store.get(saga_id)
# Mark step as failed
for step in saga.steps:
if step.name == step_name:
step.status = "failed"
step.error = error
break
saga.state = SagaState.COMPENSATING
saga.updated_at = datetime.utcnow()
await self.saga_store.save(saga)
# Start compensation from current step backwards
await self._compensate(saga)
async def _execute_next_step(self, saga: Saga):
"""Execute the next step in the saga."""
if saga.current_step >= len(saga.steps):
return
step = saga.steps[saga.current_step]
step.status = "executing"
await self.saga_store.save(saga)
# Publish command to execute step
await self.event_publisher.publish(
step.action,
{
"saga_id": saga.saga_id,
"step_name": step.name,
**saga.data
}
)
async def _compensate(self, saga: Saga):
"""Execute compensation for completed steps."""
# Compensate in reverse order
for i in range(saga.current_step - 1, -1, -1):
step = saga.steps[i]
if step.status == "completed":
step.status = "compensating"
await self.saga_store.save(saga)
await self.event_publisher.publish(
step.compensation,
{
"saga_id": saga.saga_id,
"step_name": step.name,
"original_result": step.result,
**saga.data
}
)
async def handle_compensation_completed(self, saga_id: str, step_name: str):
"""Handle compensation completion."""
saga = await self.saga_store.get(saga_id)
for step in saga.steps:
if step.name == step_name:
step.status = "compensated"
step.compensated_at = datetime.utcnow()
break
# Check if all compensations complete
all_compensated = all(
s.status in ("compensated", "pending", "failed")
for s in saga.steps
)
if all_compensated:
saga.state = SagaState.FAILED
await self._on_saga_failed(saga)
await self.saga_store.save(saga)
async def _on_saga_completed(self, saga: Saga):
"""Called when saga completes successfully."""
await self.event_publisher.publish(
f"{self.saga_type}Completed",
{"saga_id": saga.saga_id, **saga.data}
)
async def _on_saga_failed(self, saga: Saga):
"""Called when saga fails after compensation."""
await self.event_publisher.publish(
f"{self.saga_type}Failed",
{"saga_id": saga.saga_id, "error": "Saga failed", **saga.data}
)
下载和安装
下载 saga-orchestration 中文版 Skill ZIP
你可能还需要
暂无推荐