FastAPI SSE 入门:构建实时服务器发送事件
在现代Web应用中,实时通信已成为不可或缺的一部分。无论是聊天应用、实时通知还是数据仪表盘,用户都期望能够即时获取最新信息。实现实时通信的主流技术通常是WebSocket,但对于许多只需要服务器向客户端单向推送数据的场景,Server-Sent Events (SSE) 提供了一种更简洁、更高效的替代方案。
FastAPI 作为一款高性能的Python Web框架,凭借其异步支持和易用性,非常适合构建SSE服务。本文将带您深入了解SSE的概念,并指导您如何使用FastAPI构建一个实时服务器发送事件的应用。
1. 什么是 Server-Sent Events (SSE)?
Server-Sent Events (SSE) 是一种基于HTTP标准的单向实时通信技术。它允许服务器在初始HTTP连接建立后,持续地向客户端推送数据。与WebSocket不同,SSE是建立在HTTP协议之上的,它使用一个普通的HTTP连接,但将 Content-Type 设置为 text/event-stream。
SSE 的主要特点:
- 单向通信: 服务器向客户端推送数据,客户端不能直接向服务器发送数据(当然,可以通过独立的HTTP请求实现)。
- 基于HTTP: 使用标准的HTTP/S协议,通常更容易与现有的基础设施(如代理、防火墙)兼容。
- 自动重连: 浏览器内置的
EventSource客户端API提供了自动重连机制,当连接断开时会自动尝试重新连接。 - 简单易用: 相对于WebSocket,SSE的实现和使用都更为简单,特别是在只需要单向数据流的场景。
SSE 的数据格式:
SSE 的数据流由一系列以 data: 开头的行组成,每条消息以两个换行符 \n\n 结尾。它还支持其他字段:
data:: 消息的数据内容。可以有多行data:,它们会被连接成一条消息。event:: 事件类型。客户端可以通过监听不同的事件类型来处理消息。id:: 消息ID。客户端会记住最后接收到的消息ID,并在重连时发送给服务器,以便服务器可以从上次中断的地方继续发送。retry:: 重连时间(毫秒)。客户端在连接断开后,会等待指定的时间再尝试重连。
示例消息格式:
“`
event: message
id: 1
data: 这是第一条消息。
event: update
id: 2
data: 用户 ‘Alice’ 上线了。
data: 时间: 2023-10-26 10:00:00
“`
2. FastAPI 中的 SSE 实现
FastAPI 本身并没有内置直接的SSE装饰器,但它提供了 StreamingResponse,这是实现SSE的关键。StreamingResponse 接受一个异步生成器作为内容,非常适合持续发送数据。
2.1 准备工作
确保您已安装 FastAPI 和 Uvicorn:
bash
pip install fastapi uvicorn
2.2 基本 SSE 服务
让我们创建一个简单的 FastAPI 应用,它每秒向客户端发送一个递增的数字。
main.py:
“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def generate_numbers():
“””
一个异步生成器,每秒生成一个数字并格式化为SSE事件。
“””
counter = 0
while True:
# SSE数据格式: data: [content]\n\n
message = f”data: {counter}\n\n”
yield message
counter += 1
await asyncio.sleep(1) # 等待1秒
@app.get(“/stream”)
async def stream_data(request: Request):
“””
提供 SSE 端点。
“””
return StreamingResponse(generate_numbers(), media_type=”text/event-stream”)
运行应用: uvicorn main:app –reload
“`
代码解释:
generate_numbers(): 这是一个异步生成器函数,它会无限循环,每秒递增counter并构造一个SSE格式的字符串f"data: {counter}\n\n",然后使用yield返回。asyncio.sleep(1)确保了非阻塞的等待。@app.get("/stream"): 定义了一个GET路由/stream。StreamingResponse: FastAPI 的响应类,用于流式传输数据。我们将generate_numbers()生成器传递给它。media_type="text/event-stream": 这是关键! 它告诉浏览器此响应是一个SSE流,而不是普通的HTTP响应。
2.3 运行服务器
在您的终端中,导航到 main.py 所在的目录并运行:
bash
uvicorn main:app --reload
现在,您的FastAPI SSE服务器已经在 http://127.0.0.1:8000 运行。
3. 客户端如何消费 SSE?
客户端(通常是浏览器)使用内置的 EventSource API 来接收和处理SSE流。
index.html:
“`html
FastAPI SSE 实时数据
“`
将上述 index.html 文件保存到 main.py 同一目录下。在浏览器中打开 index.html,您会看到页面上每秒显示一个递增的数字。
4. 进阶使用:自定义事件和断开处理
4.1 发送自定义事件类型
SSE 允许您定义不同的事件类型,客户端可以根据事件类型来处理不同的消息。
main.py (修改后的 generate_numbers):
“`python
import json # 导入json模块
async def generate_custom_events():
counter = 0
while True:
if counter % 3 == 0:
# 每3秒发送一个 ‘notification’ 事件
notification_data = {“user”: “System”, “message”: f”当前计数是 {counter}”}
yield f”event: notification\ndata: {json.dumps(notification_data)}\n\n”
else:
# 否则发送一个普通的 ‘message’ 事件
yield f”event: message\ndata: {counter}\n\n”
counter += 1
await asyncio.sleep(1)
@app.get(“/custom-stream”)
async def custom_stream_data(request: Request):
return StreamingResponse(generate_custom_events(), media_type=”text/event-stream”)
“`
index.html (添加事件监听):
“`html
实时通知
“`
4.2 处理客户端断开连接
在实际应用中,客户端可能会主动或被动地断开连接。服务器需要能够检测到这些断开,并停止相关的生成器或清理资源。FastAPI 的 Request 对象可以用来检测连接是否断开。
main.py (添加断开连接检测):
“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
async def generate_with_disconnect_check(request: Request):
“””
一个异步生成器,带断开连接检测。
“””
counter = 0
try:
while True:
# 检查客户端是否已断开连接
if await request.is_disconnected():
print(f”客户端 {request.client.host}:{request.client.port} 已断开连接。停止发送数据。”)
break # 客户端断开,退出循环
message = f"data: {counter}\n\n"
yield message
counter += 1
await asyncio.sleep(1)
except asyncio.CancelledError:
# 当客户端关闭连接时,uvicorn可能会取消任务
print(f"Asyncio 任务被取消。客户端可能已断开连接。")
except Exception as e:
print(f"SSE 生成器发生错误: {e}")
finally:
print("SSE 生成器清理完成。")
@app.get(“/stream-disconnect”)
async def stream_with_disconnect_check(request: Request):
return StreamingResponse(generate_with_disconnect_check(request), media_type=”text/event-stream”)
“`
解释:
await request.is_disconnected(): 这是一个关键方法,它会检查客户端连接是否仍然活跃。如果客户端断开,它将返回True。try...except asyncio.CancelledError...finally: 这是一个良好的实践,用于处理异步任务被取消的情况(例如,当Uvicorn关闭或客户端连接突然中断时),并在finally块中进行资源清理。
5. 实际应用场景
- 实时通知: 当有新消息、新评论或任何需要即时提醒用户的信息时。
- 进度更新: 长时间运行的任务(如文件上传、数据处理)可以实时向客户端报告进度。
- 实时仪表盘: 股票价格、服务器指标、物联网设备数据等。
- 简单的聊天室: 对于不需要高并发和复杂双向通信的轻量级聊天功能。
6. 总结
Server-Sent Events (SSE) 是一种强大而简单的单向实时通信技术,特别适合服务器向客户端推送数据的场景。FastAPI 凭借其异步能力和 StreamingResponse,使得构建高性能的SSE服务变得非常容易。
通过本文的介绍,您应该已经掌握了在FastAPI中实现SSE的基础知识,包括如何构建SSE端点、如何处理客户端连接以及如何在客户端消费SSE数据。在实际项目中,您可以根据需求进一步扩展这些概念,例如结合消息队列(如Redis Pub/Sub)来实现更复杂的实时数据广播。
现在,您已经准备好在您的FastAPI应用中构建引人注目的实时功能了!