Appearance
使用 Python 快速集成 Azure Service Bus 企业级消息队列
解决云原生应用中的异步解耦问题:通过 Python SDK 快速实现可靠的消息队列(Queue)和发布/订阅(Topic/Subscription)模式,确保企业级通信的可靠性。
为什么需要这个技能
在构建大规模分布式系统时,同步调用会导致系统级联故障且响应缓慢。Azure Service Bus 提供了高度可靠的消息中间件,支持复杂的企业集成模式(如 FIFO 顺序处理、死信队列、延迟发送)。
通过本技能,你可以让 AI 帮你快速编写高性能的异步发送者(Sender)和接收者(Receiver),处理高并发消息流,并实现错误重试与死信处理机制,无需手动处理底层的连接管理和协议细节。
适用场景
- 服务解耦:将耗时的后台任务(如发送邮件、生成报表)从主请求流中剥离。
- 削峰填谷:在流量高峰期通过队列缓冲请求,防止后端服务崩溃。
- 事件驱动架构:利用 Topic 实现一个事件被多个下游服务同时消费的发布/订阅模式。
- 严格顺序处理:需要通过 Session ID 确保特定订单或用户的消息按顺序执行。
核心工作流
- 环境配置与认证:安装
azure-servicebus与azure-identity,使用DefaultAzureCredential实现无密钥安全登录。 - 客户端实例化:根据需求选择异步
ServiceBusClient或同步客户端,通过上下文管理器(async with)确保资源释放。 - 消息发送策略:
- 单条消息发送。
- 使用
create_message_batch()构建消息批处理,优化网络吞吐量。 - 设定
scheduled_time实现定时任务发送。
- 消息接收与结算:
- PeekLock 模式:接收后锁定,处理成功调用
complete_message(),处理失败调用abandon_message()触发重试。 - ReceiveAndDelete 模式:接收即删除,适用于对可靠性要求较低的场景。
- PeekLock 模式:接收后锁定,处理成功调用
- 异常处理(DLQ):将无法处理的“毒药消息”通过
dead_letter_message()移至死信队列,方便后续审计和手动修复。
代码实现
bash
pip install azure-servicebus azure-identitypython
import asyncio
from azure.servicebus.aio import ServiceBusClient
from azure.servicebus import ServiceBusMessage
from azure.identity.aio import DefaultAzureCredential
async def send_and_receive():
# 使用身份认证
credential = DefaultAzureCredential()
namespace = "<namespace>.servicebus.windows.net"
async with ServiceBusClient(fully_qualified_namespace=namespace, credential=credential) as client:
# 发送消息
sender = client.get_queue_sender(queue_name="myqueue")
async with sender:
await sender.send_messages(ServiceBusMessage("Hello, Service Bus!"))
# 接收并结算消息
receiver = client.get_queue_receiver(queue_name="myqueue")
async with receiver:
messages = await receiver.receive_messages(max_message_count=10, max_wait_time=5)
for msg in messages:
print(f"Processing: {str(msg)}")
await receiver.complete_message(msg) # 成功后从队列移除
asyncio.run(send_and_receive())下载和安装
下载 azure-servicebus-py 中文版 Skill ZIP
解压后将目录放入你的 AI 工具 skills 文件夹,重启工具后即可使用。具体路径参考内附的 USAGE.zh.md。
你可能还需要
暂无推荐