FastAPI SSE 入门:构建实时服务器发送事件 – wiki大全


FastAPI SSE 入门:构建实时服务器发送事件

在现代Web应用中,实时通信已成为不可或缺的一部分。无论是聊天应用、实时通知还是数据仪表盘,用户都期望能够即时获取最新信息。实现实时通信的主流技术通常是WebSocket,但对于许多只需要服务器向客户端单向推送数据的场景,Server-Sent Events (SSE) 提供了一种更简洁、更高效的替代方案。

FastAPI 作为一款高性能的Python Web框架,凭借其异步支持和易用性,非常适合构建SSE服务。本文将带您深入了解SSE的概念,并指导您如何使用FastAPI构建一个实时服务器发送事件的应用。

1. 什么是 Server-Sent Events (SSE)?

Server-Sent Events (SSE) 是一种基于HTTP标准的单向实时通信技术。它允许服务器在初始HTTP连接建立后,持续地向客户端推送数据。与WebSocket不同,SSE是建立在HTTP协议之上的,它使用一个普通的HTTP连接,但将 Content-Type 设置为 text/event-stream

SSE 的主要特点:

  • 单向通信: 服务器向客户端推送数据,客户端不能直接向服务器发送数据(当然,可以通过独立的HTTP请求实现)。
  • 基于HTTP: 使用标准的HTTP/S协议,通常更容易与现有的基础设施(如代理、防火墙)兼容。
  • 自动重连: 浏览器内置的 EventSource 客户端API提供了自动重连机制,当连接断开时会自动尝试重新连接。
  • 简单易用: 相对于WebSocket,SSE的实现和使用都更为简单,特别是在只需要单向数据流的场景。

SSE 的数据格式:

SSE 的数据流由一系列以 data: 开头的行组成,每条消息以两个换行符 \n\n 结尾。它还支持其他字段:

  • data:: 消息的数据内容。可以有多行 data:,它们会被连接成一条消息。
  • event:: 事件类型。客户端可以通过监听不同的事件类型来处理消息。
  • id:: 消息ID。客户端会记住最后接收到的消息ID,并在重连时发送给服务器,以便服务器可以从上次中断的地方继续发送。
  • retry:: 重连时间(毫秒)。客户端在连接断开后,会等待指定的时间再尝试重连。

示例消息格式:

“`
event: message
id: 1
data: 这是第一条消息。

event: update
id: 2
data: 用户 ‘Alice’ 上线了。
data: 时间: 2023-10-26 10:00:00

“`

2. FastAPI 中的 SSE 实现

FastAPI 本身并没有内置直接的SSE装饰器,但它提供了 StreamingResponse,这是实现SSE的关键。StreamingResponse 接受一个异步生成器作为内容,非常适合持续发送数据。

2.1 准备工作

确保您已安装 FastAPI 和 Uvicorn:

bash
pip install fastapi uvicorn

2.2 基本 SSE 服务

让我们创建一个简单的 FastAPI 应用,它每秒向客户端发送一个递增的数字。

main.py:

“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def generate_numbers():
“””
一个异步生成器,每秒生成一个数字并格式化为SSE事件。
“””
counter = 0
while True:
# SSE数据格式: data: [content]\n\n
message = f”data: {counter}\n\n”
yield message
counter += 1
await asyncio.sleep(1) # 等待1秒

@app.get(“/stream”)
async def stream_data(request: Request):
“””
提供 SSE 端点。
“””
return StreamingResponse(generate_numbers(), media_type=”text/event-stream”)

运行应用: uvicorn main:app –reload

“`

代码解释:

  1. generate_numbers(): 这是一个异步生成器函数,它会无限循环,每秒递增 counter 并构造一个SSE格式的字符串 f"data: {counter}\n\n",然后使用 yield 返回。asyncio.sleep(1) 确保了非阻塞的等待。
  2. @app.get("/stream"): 定义了一个GET路由 /stream
  3. StreamingResponse: FastAPI 的响应类,用于流式传输数据。我们将 generate_numbers() 生成器传递给它。
  4. media_type="text/event-stream": 这是关键! 它告诉浏览器此响应是一个SSE流,而不是普通的HTTP响应。

2.3 运行服务器

在您的终端中,导航到 main.py 所在的目录并运行:

bash
uvicorn main:app --reload

现在,您的FastAPI SSE服务器已经在 http://127.0.0.1:8000 运行。

3. 客户端如何消费 SSE?

客户端(通常是浏览器)使用内置的 EventSource API 来接收和处理SSE流。

index.html:

“`html






FastAPI SSE 客户端

FastAPI SSE 实时数据



“`

将上述 index.html 文件保存到 main.py 同一目录下。在浏览器中打开 index.html,您会看到页面上每秒显示一个递增的数字。

4. 进阶使用:自定义事件和断开处理

4.1 发送自定义事件类型

SSE 允许您定义不同的事件类型,客户端可以根据事件类型来处理不同的消息。

main.py (修改后的 generate_numbers):

“`python
import json # 导入json模块

async def generate_custom_events():
counter = 0
while True:
if counter % 3 == 0:
# 每3秒发送一个 ‘notification’ 事件
notification_data = {“user”: “System”, “message”: f”当前计数是 {counter}”}
yield f”event: notification\ndata: {json.dumps(notification_data)}\n\n”
else:
# 否则发送一个普通的 ‘message’ 事件
yield f”event: message\ndata: {counter}\n\n”

    counter += 1
    await asyncio.sleep(1)

@app.get(“/custom-stream”)
async def custom_stream_data(request: Request):
return StreamingResponse(generate_custom_events(), media_type=”text/event-stream”)
“`

index.html (添加事件监听):

“`html

实时通知

“`

4.2 处理客户端断开连接

在实际应用中,客户端可能会主动或被动地断开连接。服务器需要能够检测到这些断开,并停止相关的生成器或清理资源。FastAPI 的 Request 对象可以用来检测连接是否断开。

main.py (添加断开连接检测):

“`python
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json

app = FastAPI()

async def generate_with_disconnect_check(request: Request):
“””
一个异步生成器,带断开连接检测。
“””
counter = 0
try:
while True:
# 检查客户端是否已断开连接
if await request.is_disconnected():
print(f”客户端 {request.client.host}:{request.client.port} 已断开连接。停止发送数据。”)
break # 客户端断开,退出循环

        message = f"data: {counter}\n\n"
        yield message
        counter += 1
        await asyncio.sleep(1)
except asyncio.CancelledError:
    # 当客户端关闭连接时,uvicorn可能会取消任务
    print(f"Asyncio 任务被取消。客户端可能已断开连接。")
except Exception as e:
    print(f"SSE 生成器发生错误: {e}")
finally:
    print("SSE 生成器清理完成。")

@app.get(“/stream-disconnect”)
async def stream_with_disconnect_check(request: Request):
return StreamingResponse(generate_with_disconnect_check(request), media_type=”text/event-stream”)
“`

解释:

  • await request.is_disconnected(): 这是一个关键方法,它会检查客户端连接是否仍然活跃。如果客户端断开,它将返回 True
  • try...except asyncio.CancelledError...finally: 这是一个良好的实践,用于处理异步任务被取消的情况(例如,当Uvicorn关闭或客户端连接突然中断时),并在 finally 块中进行资源清理。

5. 实际应用场景

  • 实时通知: 当有新消息、新评论或任何需要即时提醒用户的信息时。
  • 进度更新: 长时间运行的任务(如文件上传、数据处理)可以实时向客户端报告进度。
  • 实时仪表盘: 股票价格、服务器指标、物联网设备数据等。
  • 简单的聊天室: 对于不需要高并发和复杂双向通信的轻量级聊天功能。

6. 总结

Server-Sent Events (SSE) 是一种强大而简单的单向实时通信技术,特别适合服务器向客户端推送数据的场景。FastAPI 凭借其异步能力和 StreamingResponse,使得构建高性能的SSE服务变得非常容易。

通过本文的介绍,您应该已经掌握了在FastAPI中实现SSE的基础知识,包括如何构建SSE端点、如何处理客户端连接以及如何在客户端消费SSE数据。在实际项目中,您可以根据需求进一步扩展这些概念,例如结合消息队列(如Redis Pub/Sub)来实现更复杂的实时数据广播。

现在,您已经准备好在您的FastAPI应用中构建引人注目的实时功能了!


滚动至顶部