FastAPI WebSocket 完整教程 (含代码示例)
FastAPI WebSocket 完整教程:从入门到实战
引言
在现代 Web 开发中,实时通信扮演着越来越重要的角色。无论是实时聊天、股票行情推送、在线游戏还是协作编辑工具,都需要服务器能够主动向客户端推送信息,而不仅仅是响应客户端的请求。WebSocket 协议正是为解决这一需求而生,它提供了一个全双工的通信通道,允许服务器和客户端在建立连接后自由地相互发送数据。
FastAPI,作为一个现代化、高性能的 Python Web 框架,凭借其异步特性、类型提示支持和强大的依赖注入系统,天然地适合构建 WebSocket 应用。它简化了 WebSocket 端点的创建和管理,让开发者能够专注于业务逻辑。
本教程将带你深入了解如何在 FastAPI 中使用 WebSocket,从基础连接、数据收发,到管理多个连接、实现广播,再到身份验证和一些进阶技巧。我们将通过具体的代码示例,一步步构建一个功能完善的 WebSocket 应用。
本教程涵盖内容:
- WebSocket 基础概念回顾
- FastAPI 环境准备
- 创建第一个 WebSocket 端点
- 发送和接收文本/JSON 数据
- 处理客户端断开连接
- 管理多个 WebSocket 连接(实现简易聊天室)
- 向特定客户端发送消息与广播消息
- 前端 JavaScript 客户端示例
- WebSocket 中的身份验证
- 进阶话题与最佳实践
目标读者:
本教程适合对 Python 和 Web 开发有一定基础,了解 FastAPI 基本用法,并希望学习如何在 FastAPI 中实现 WebSocket 功能的开发者。
1. WebSocket 基础概念回顾
在深入 FastAPI 实现之前,我们先简单回顾一下 WebSocket 的核心概念:
- 持久连接 (Persistent Connection): 与 HTTP 的请求-响应模式不同,WebSocket 在客户端和服务器之间建立一个持久性的 TCP 连接。一旦连接建立,双方都可以随时向对方发送数据,无需每次都重新建立连接。
- 全双工通信 (Full-Duplex): 数据可以同时在两个方向上传输,服务器可以主动向客户端推送信息,客户端也可以随时向服务器发送信息。
- 协议升级: WebSocket 连接通常始于一个标准的 HTTP(S) 请求(包含
Upgrade: websocket
和Connection: Upgrade
等头部信息)。如果服务器支持 WebSocket,它会响应一个 HTTP 101 Switching Protocols 状态码,随后该 TCP 连接就转变为 WebSocket 连接。 - 低延迟: 由于连接是持久的,减少了 HTTP 请求的握手和头部开销,数据传输延迟更低,非常适合实时应用场景。
- URL 模式: WebSocket URL 使用
ws://
(不加密) 或wss://
(加密,基于 TLS/SSL) 模式。
2. FastAPI 环境准备
首先,确保你已经安装了 Python (推荐 3.7+)。然后,我们需要安装 FastAPI 和一个 ASGI 服务器,例如 Uvicorn。同时,websockets
库是 FastAPI 处理 WebSocket 底层通信所依赖的,通常会作为 FastAPI 的依赖项自动安装,但显式安装可以确保版本兼容。
bash
pip install fastapi uvicorn websockets
fastapi
: 核心框架。uvicorn
: ASGI 服务器,用于运行 FastAPI 应用。websockets
: 处理 WebSocket 协议的底层库。
3. 创建第一个 WebSocket 端点
让我们从创建一个最简单的 WebSocket 端点开始。这个端点将接受客户端连接,然后简单地将接收到的任何消息原样发回(一个 "echo" 服务)。
创建一个名为 main.py
的文件:
```python
main.py
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import logging
配置日志记录
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
app = FastAPI()
提供一个简单的 HTML 页面用于测试
html = """
WebSocket Echo Test
"""
@app.get("/")
async def get():
"""提供用于测试 WebSocket 的 HTML 页面"""
return HTMLResponse(html)
核心:WebSocket 端点
@app.websocket("/ws/echo")
async def websocket_echo_endpoint(websocket: WebSocket):
"""
处理 WebSocket 连接的端点。
接收文本消息并将其回显给客户端。
"""
logger.info(f"客户端 {websocket.client.host}:{websocket.client.port} 尝试连接...")
# 1. 接受连接
await websocket.accept()
logger.info(f"客户端 {websocket.client.host}:{websocket.client.port} 连接成功")
try:
# 2. 循环接收和发送消息
while True:
# 等待接收文本消息
data = await websocket.receive_text()
logger.info(f"收到来自 {websocket.client.host}:{websocket.client.port} 的消息: {data}")
# 发送收到的文本消息回客户端
await websocket.send_text(f"服务器回显: {data}")
logger.info(f"已向 {websocket.client.host}:{websocket.client.port} 回显消息")
except WebSocketDisconnect:
# 3. 处理客户端断开连接
logger.warning(f"客户端 {websocket.client.host}:{websocket.client.port} 断开连接")
except Exception as e:
# 处理其他可能的异常
logger.error(f"发生错误: {e}")
await websocket.close(code=1011, reason=f"服务器内部错误: {e}") # 告知客户端错误并关闭
finally:
# 确保连接在任何情况下(正常断开或异常)都被尝试关闭
# 虽然 accept() 之后,FastAPI 会在函数结束或异常时自动处理关闭,
# 但显式处理有助于理解流程和进行可能的清理工作。
# 注意:如果连接已断开,再次调用 close 可能无效或抛出异常,
# 但在 finally 中尝试是安全的。
logger.info(f"清理与客户端 {websocket.client.host}:{websocket.client.port} 的连接资源")
# 通常不需要在这里显式调用 websocket.close(),除非有特殊清理逻辑
# FastAPI 会在函数退出或 WebSocketDisconnect 异常后处理关闭
```
代码解析:
@app.websocket("/ws/echo")
: 这是一个装饰器,用于定义 WebSocket 路由。类似于@app.get()
或@app.post()
,但专门用于 WebSocket 连接。路径是/ws/echo
。async def websocket_echo_endpoint(websocket: WebSocket)
: 定义处理 WebSocket 连接的异步函数。参数websocket
是一个WebSocket
类的实例,它代表了当前的 WebSocket 连接。FastAPI 会自动注入这个对象。await websocket.accept()
: 这是建立 WebSocket 连接的关键步骤。在调用此方法之前,连接尚未完全建立。调用它会发送 WebSocket 握手的响应给客户端,确认连接。必须先调用accept()
才能进行后续的send
或receive
操作。while True:
: 使用一个无限循环来持续监听来自客户端的消息。data = await websocket.receive_text()
: 异步等待并接收来自客户端的文本消息。如果客户端发送的是非文本数据(如二进制数据或 JSON),或者连接已关闭,这里可能会出错或返回特定类型的数据/异常。await websocket.send_text(f"服务器回显: {data}")
: 异步向客户端发送文本消息。try...except WebSocketDisconnect:
: FastAPI 在客户端断开连接时会引发WebSocketDisconnect
异常。我们捕获这个异常来进行相应的处理,例如记录日志或清理资源。这是处理断开的标准方式。except Exception as e
: 捕获其他可能的运行时错误。finally
: 无论连接是正常断开还是因为异常中断,finally
块中的代码都会执行,适合放置资源清理逻辑。
运行与测试:
-
在终端中运行 Uvicorn:
bash
uvicorn main:app --reload
--reload
参数使得代码更改后服务器会自动重启,方便开发。 -
打开浏览器,访问
http://localhost:8000/
。你会看到一个简单的 HTML 页面。 -
在输入框中输入消息,点击 "Send"。你应该能在页面上看到服务器的回显消息(例如,输入 "hello",会显示 "服务器回显: hello")。同时,观察运行 Uvicorn 的终端,你会看到连接、接收和发送消息的日志。
-
关闭浏览器标签页或刷新页面,终端会显示客户端断开连接的日志。
4. 发送和接收不同类型的数据
WebSocket 不仅仅能传输文本,还可以传输 JSON 和二进制数据。FastAPI 提供了方便的方法来处理这些:
-
接收数据:
await websocket.receive_text()
: 接收文本数据。await websocket.receive_bytes()
: 接收二进制数据。await websocket.receive_json()
: 接收 JSON 数据。它会自动将接收到的文本或字节解析为 Python 字典或列表。如果数据不是有效的 JSON,会引发json.JSONDecodeError
(或者在某些 FastAPI/Starlette 版本中可能包装在RuntimeError
或其他 WebSocket 异常中,需要根据实际情况捕获)。await websocket.receive()
: 更底层的接收方法,返回一个包含type
和bytes
或text
键的字典。
-
发送数据:
await websocket.send_text(data: str)
: 发送文本数据。await websocket.send_bytes(data: bytes)
: 发送二进制数据。await websocket.send_json(data: Any, mode: str = "text")
: 发送 Python 对象(如字典、列表)作为 JSON。默认以文本模式 (mode="text"
) 发送,也可以设置为mode="binary"
以二进制 UTF-8 编码发送。
示例:处理 JSON 数据
修改 /ws/echo
端点来接收和发送 JSON:
```python
main.py (部分修改)
import json
... (其他 import 和 app 定义保持不变) ...
@app.websocket("/ws/json_echo")
async def websocket_json_endpoint(websocket: WebSocket):
logger.info(f"客户端 {websocket.client.host}:{websocket.client.port} 尝试连接到 JSON Echo...")
await websocket.accept()
logger.info(f"客户端 {websocket.client.host}:{websocket.client.port} (JSON Echo) 连接成功")
try:
while True:
# 接收 JSON 数据
try:
data = await websocket.receive_json()
logger.info(f"收到来自 {websocket.client.host}:{websocket.client.port} 的 JSON: {data}")
# 检查接收到的数据是否是字典
if isinstance(data, dict):
response = {
"message": "服务器已收到您的 JSON",
"your_data": data,
"server_status": "OK"
}
else:
response = {
"message": "服务器期望收到 JSON 对象 (字典)",
"received_type": str(type(data))
}
# 发送 JSON 数据回客户端
await websocket.send_json(response)
logger.info(f"已向 {websocket.client.host}:{websocket.client.port} 回复 JSON")
except json.JSONDecodeError:
logger.error(f"客户端 {websocket.client.host}:{websocket.client.port} 发送了无效的 JSON 数据")
await websocket.send_text("错误:请发送有效的 JSON 格式数据。")
except Exception as e: # 捕获 receive_json 可能抛出的其他异常,如 RuntimeError
logger.error(f"接收 JSON 时出错: {e}, 类型: {type(e)}")
# 可以根据异常类型决定是否关闭连接
await websocket.send_text(f"服务器处理错误: {e}")
# 如果错误严重,可以选择关闭连接
# await websocket.close(code=1011)
# break # 退出循环
except WebSocketDisconnect:
logger.warning(f"客户端 {websocket.client.host}:{websocket.client.port} (JSON Echo) 断开连接")
except Exception as e:
logger.error(f"WebSocket 连接发生意外错误: {e}")
# 尝试安全关闭,即使在 accept() 之后出现问题
try:
await websocket.close(code=1011, reason=f"服务器内部错误: {e}")
except RuntimeError as re:
logger.error(f"尝试关闭已断开的连接时出错: {re}") # 可能连接已经不在可用状态
finally:
logger.info(f"清理与客户端 {websocket.client.host}:{websocket.client.port} (JSON Echo) 的连接资源")
... (HTML 和 / 路由保持不变,但 JS 需要修改以发送 JSON) ...
更新 HTML 中的 JavaScript 以发送 JSON
html_json = """
WebSocket JSON Test
Message:
Value:
Received Messages:
"""
@app.get("/json")
async def get_json_page():
"""提供用于测试 JSON WebSocket 的 HTML 页面"""
return HTMLResponse(html_json)
```
现在,运行 uvicorn main:app --reload
并访问 http://localhost:8000/json
。在输入框中输入内容,点击 "Send JSON",你会看到客户端发送的 JSON 数据和服务器返回的 JSON 响应。尝试发送非 JSON 格式的文本,服务器会返回错误提示。
5. 处理客户端断开连接 (WebSocketDisconnect)
我们已经在上面的例子中看到了如何使用 try...except WebSocketDisconnect:
来捕获客户端的断开事件。这是非常重要的,因为它允许你:
- 优雅地终止循环:
WebSocketDisconnect
异常会跳出while True
循环。 - 记录日志: 了解哪些客户端断开了连接。
- 清理资源: 如果你为该连接维护了任何状态(例如,在聊天室中将其从活动用户列表中移除),这是执行清理操作的最佳位置。
```python
... 在 WebSocket 端点函数内 ...
try:
while True:
data = await websocket.receive_text()
# ... 处理数据 ...
except WebSocketDisconnect:
logger.warning(f"客户端 {websocket.client.host}:{websocket.client.port} 断开连接")
# 在这里执行特定于该连接的清理逻辑
# 例如:从用户列表中移除 websocket 对象
finally:
# 通用的清理逻辑,或者确认连接已关闭
logger.info(f"结束与客户端 {websocket.client.host}:{websocket.client.port} 的交互")
```
6. 管理多个 WebSocket 连接(实现简易聊天室)
现实世界的应用通常需要同时处理多个 WebSocket 连接,并在它们之间共享信息,例如聊天室。FastAPI 本身不直接提供连接管理的工具,但我们可以轻松地自己实现一个。
我们将创建一个 ConnectionManager
类来跟踪所有活动的连接,并提供广播消息的方法。
```python
main.py (添加 ConnectionManager 和聊天室端点)
from typing import List
... (之前的 import) ...
class ConnectionManager:
"""管理 WebSocket 连接"""
def init(self):
# 存储活动的 WebSocket 连接
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
"""接受新的 WebSocket 连接并将其添加到活动连接列表"""
await websocket.accept()
self.active_connections.append(websocket)
logger.info(f"新连接加入: {websocket.client.host}:{websocket.client.port}. 当前连接数: {len(self.active_connections)}")
def disconnect(self, websocket: WebSocket):
"""从活动连接列表中移除 WebSocket 连接"""
if websocket in self.active_connections:
self.active_connections.remove(websocket)
logger.info(f"连接断开: {websocket.client.host}:{websocket.client.port}. 当前连接数: {len(self.active_connections)}")
else:
logger.warning(f"尝试断开一个不在活动列表中的连接: {websocket.client.host}:{websocket.client.port}")
async def send_personal_message(self, message: str, websocket: WebSocket):
"""向单个 WebSocket 连接发送消息"""
try:
await websocket.send_text(message)
logger.debug(f"向 {websocket.client.host}:{websocket.client.port} 发送个人消息: {message}")
except Exception as e:
logger.error(f"向 {websocket.client.host}:{websocket.client.port} 发送消息失败: {e}")
# 发送失败可能意味着连接已断开,可以考虑在这里也调用 disconnect
self.disconnect(websocket)
async def broadcast(self, message: str, sender: WebSocket = None):
"""向所有活动的 WebSocket 连接广播消息 (可以选择排除发送者)"""
logger.info(f"广播消息: {message} (共 {len(self.active_connections)} 个连接)")
disconnected_sockets = []
for connection in self.active_connections:
if connection != sender: # 避免给自己发送广播
try:
await connection.send_text(message)
except Exception as e:
# 如果发送失败,标记此连接为断开,后续处理
logger.error(f"广播时向 {connection.client.host}:{connection.client.port} 发送失败: {e}")
disconnected_sockets.append(connection)
# 清理广播时发现的已断开连接
for socket in disconnected_sockets:
self.disconnect(socket)
创建一个全局的 ConnectionManager 实例
manager = ConnectionManager()
聊天室 WebSocket 端点
@app.websocket("/ws/chat/{client_id}")
async def websocket_chat_endpoint(websocket: WebSocket, client_id: str):
"""
处理聊天室 WebSocket 连接。
- 接收客户端消息并广播给所有其他客户端。
- 处理新用户加入和离开的通知。
"""
# 连接客户端
await manager.connect(websocket)
# 向所有客户端广播新用户加入的消息
await manager.broadcast(f"用户 '{client_id}' 加入了聊天室", sender=websocket) # 通知其他人,但不通知自己
try:
while True:
# 接收来自客户端的消息
data = await websocket.receive_text()
logger.info(f"收到来自 '{client_id}' ({websocket.client.host}:{websocket.client.port}) 的消息: {data}")
# 构建要广播的消息,包含发送者信息
message_to_broadcast = f"'{client_id}': {data}"
# 广播消息给所有其他连接
await manager.broadcast(message_to_broadcast, sender=websocket) # 广播给除自己外的所有人
# (可选)也可以给自己回显一条确认消息
# await manager.send_personal_message(f"你发送了: {data}", websocket)
except WebSocketDisconnect:
# 客户端断开连接
manager.disconnect(websocket)
logger.warning(f"用户 '{client_id}' ({websocket.client.host}:{websocket.client.port}) 离开聊天室")
# 广播用户离开的消息
await manager.broadcast(f"用户 '{client_id}' 离开了聊天室") # 通知所有人
except Exception as e:
# 处理其他异常
logger.error(f"用户 '{client_id}' 的 WebSocket 连接发生错误: {e}")
# 尝试关闭连接并从管理器中移除
try:
await websocket.close(code=1011, reason=f"服务器错误: {e}")
except RuntimeError:
pass # 可能连接已关闭
manager.disconnect(websocket)
# 可以选择广播一个错误消息或用户异常离开的消息
await manager.broadcast(f"用户 '{client_id}' 因错误断开连接")
finally:
# 确保在任何情况下都尝试断开连接并广播离开消息(如果之前没做的话)
# 注意:如果在 disconnect 中已经处理了广播,这里可能需要避免重复
# 但放在 finally 可以作为一种保障机制
# manager.disconnect(websocket) # 确保移除
# await manager.broadcast(f"用户 '{client_id}' 离开了聊天室") # 可能重复,谨慎使用
logger.info(f"结束与用户 '{client_id}' 的交互")
提供聊天室的 HTML 页面
chat_html = """
FastAPI WebSocket Chat
"""
@app.get("/chat")
async def get_chat_page():
"""提供聊天室 HTML 页面"""
return HTMLResponse(chat_html)
```
代码解析:
ConnectionManager
类:__init__
: 初始化一个空列表active_connections
来存储WebSocket
对象。connect
: 接受连接 (websocket.accept()
) 并将WebSocket
对象添加到列表中。disconnect
: 从列表中移除指定的WebSocket
对象。send_personal_message
: 向单个WebSocket
发送消息。broadcast
: 遍历active_connections
列表,向除发送者(可选)之外的所有连接发送消息。增加了错误处理,如果在广播时发现连接已失效,则将其记录下来并在循环后统一移除。
- 全局实例
manager = ConnectionManager()
: 我们创建了一个全局的ConnectionManager
实例,这样所有对/ws/chat/{client_id}
的请求都可以访问同一个连接列表。 - 聊天端点
@app.websocket("/ws/chat/{client_id}")
:- 路径参数
client_id
用于标识不同的用户。 - 连接: 调用
manager.connect(websocket)
接受连接并添加到管理器。然后广播一条新用户加入的消息。 - 接收与广播: 在
while True
循环中,接收客户端消息 (receive_text
),然后调用manager.broadcast()
将格式化后的消息(包含发送者 ID)广播给所有其他连接。 - 断开: 在
WebSocketDisconnect
异常处理块中,调用manager.disconnect(websocket)
从管理器移除连接,并广播一条用户离开的消息。 - 错误处理: 增加了对其他异常的处理,确保即使发生错误也能尝试清理连接并通知其他用户。
- 路径参数
- 前端
chat.html
:- 增加了一个输入框让用户输入自己的名字 (
client_id
)。 - 连接按钮 (
connectWs
):获取用户输入的client_id
,构建 WebSocket URL (ws://localhost:8000/ws/chat/{client_id}
),创建 WebSocket 连接。 - 连接成功 (
onopen
) 后显示聊天框,禁用连接按钮,启用断开按钮。 - 收到消息 (
onmessage
) 后,将其添加到聊天记录列表中。 - 断开连接 (
onclose
) 后,隐藏聊天框,重置按钮状态。 - 发送消息 (
sendMessage
):将输入框的内容通过ws.send()
发送给服务器,并(可选地)在本地也显示自己发送的消息。
- 增加了一个输入框让用户输入自己的名字 (
运行与测试:
- 运行
uvicorn main:app --reload
。 - 在浏览器中打开多个标签页或窗口,都访问
http://localhost:8000/chat
。 - 在每个页面输入不同的名字,点击 "Connect"。
- 在一个页面发送消息,你会看到该消息出现在所有其他页面的聊天记录中(带有发送者名字)。你自己的页面不会收到广播(因为我们排除了
sender
),但我们在前端直接添加了 "You: ..." 的消息。 - 关闭其中一个页面,其他页面会收到该用户离开的通知。
7. 向特定客户端发送消息
ConnectionManager
类中的 send_personal_message
方法已经实现了向特定客户端发送消息的功能。如果你需要根据某些条件(例如用户 ID)找到特定的 WebSocket 连接并发送消息,你可能需要稍微扩展 ConnectionManager
。
一种常见做法是使用字典来存储连接,以用户 ID 或其他唯一标识符作为键:
```python
main.py (改进 ConnectionManager)
from typing import Dict, List
class EnhancedConnectionManager:
def init(self):
# 使用字典存储连接,键为 client_id,值为 WebSocket 对象
self.active_connections: Dict[str, WebSocket] = {}
async def connect(self, websocket: WebSocket, client_id: str):
await websocket.accept()
self.active_connections[client_id] = websocket
logger.info(f"用户 '{client_id}' ({websocket.client.host}:{websocket.client.port}) 连接. 当前连接数: {len(self.active_connections)}")
def disconnect(self, client_id: str):
if client_id in self.active_connections:
websocket = self.active_connections.pop(client_id) # 移除并获取对象
logger.info(f"用户 '{client_id}' ({websocket.client.host}:{websocket.client.port}) 断开. 当前连接数: {len(self.active_connections)}")
return websocket # 返回被移除的 websocket 对象,可能有用
else:
logger.warning(f"尝试断开一个不存在的用户 '{client_id}'")
return None
async def send_personal_message(self, message: str, client_id: str):
websocket = self.active_connections.get(client_id)
if websocket:
try:
await websocket.send_text(message)
logger.debug(f"向 '{client_id}' 发送个人消息: {message}")
except Exception as e:
logger.error(f"向 '{client_id}' 发送消息失败: {e}")
self.disconnect(client_id) # 发送失败则断开连接
else:
logger.warning(f"尝试向不存在或已断开的用户 '{client_id}' 发送消息")
async def broadcast(self, message: str, sender_id: str = None):
logger.info(f"广播消息: {message} (共 {len(self.active_connections)} 个连接)")
# 创建当前连接ID列表的副本进行迭代,防止在迭代时修改字典
client_ids = list(self.active_connections.keys())
disconnected_ids = []
for client_id in client_ids:
if client_id != sender_id:
websocket = self.active_connections.get(client_id)
if websocket: # 再次检查是否存在,因为可能在广播过程中断开
try:
await websocket.send_text(message)
except Exception as e:
logger.error(f"广播时向 '{client_id}' 发送失败: {e}")
disconnected_ids.append(client_id)
else:
# 如果在迭代开始后,连接被移除了,记录一下
logger.warning(f"广播时发现用户 '{client_id}' 已不存在")
# 清理广播时发现的已断开连接
for client_id in disconnected_ids:
self.disconnect(client_id)
使用新的管理器
enhanced_manager = EnhancedConnectionManager()
更新聊天端点以使用新的管理器和 client_id 作为键
@app.websocket("/ws/v2/chat/{client_id}")
async def websocket_chat_v2_endpoint(websocket: WebSocket, client_id: str):
# 检查 client_id 是否已被使用 (可选,但推荐)
if client_id in enhanced_manager.active_connections:
logger.warning(f"用户 ID '{client_id}' 已被占用,拒绝连接")
await websocket.close(code=1008, reason=f"User ID '{client_id}' is already in use.")
return
await enhanced_manager.connect(websocket, client_id)
await enhanced_manager.broadcast(f"用户 '{client_id}' 加入了聊天室", sender_id=client_id)
try:
while True:
data = await websocket.receive_text()
logger.info(f"收到来自 '{client_id}' 的消息: {data}")
message_to_broadcast = f"'{client_id}': {data}"
await enhanced_manager.broadcast(message_to_broadcast, sender_id=client_id)
except WebSocketDisconnect:
# 客户端正常断开
enhanced_manager.disconnect(client_id)
logger.warning(f"用户 '{client_id}' 离开聊天室")
await enhanced_manager.broadcast(f"用户 '{client_id}' 离开了聊天室")
except Exception as e:
# 异常断开
logger.error(f"用户 '{client_id}' 的 WebSocket 连接发生错误: {e}")
try:
await websocket.close(code=1011, reason=f"Server error: {e}")
except RuntimeError:
pass # Ignore errors if connection is already closed
# 确保从管理器中移除
enhanced_manager.disconnect(client_id)
await enhanced_manager.broadcast(f"用户 '{client_id}' 因错误断开连接")
finally:
logger.info(f"结束与用户 '{client_id}' 的交互")
(需要更新 HTML 中的 WebSocket URL 到 /ws/v2/chat/{client_id})
(并且可能需要更新 /chat 路由来提供这个新版本的 HTML 或调整 JS)
```
这个增强版的 ConnectionManager
使用字典存储连接,使得按 client_id
查找和发送消息变得非常高效。同时,我们在连接时添加了检查,防止重复的 client_id
连接。
8. 前端 JavaScript 客户端示例
我们在之前的 HTML 示例中已经包含了基本的 JavaScript WebSocket 客户端代码。这里总结一下关键部分:
-
创建 WebSocket 连接:
javascript
var clientId = "some_unique_id"; // 通常由用户输入或系统生成
var ws = new WebSocket(`ws://your_server_address/ws/path/${clientId}`);
// 或者使用安全连接 wss://
// var ws = new WebSocket(`wss://your_server_address/ws/path/${clientId}`); -
事件处理:
ws.onopen = function(event) { ... };
连接成功建立时触发。ws.onmessage = function(event) { var message = event.data; ... };
收到服务器消息时触发。event.data
包含消息内容(通常是字符串,如果是 JSON 需要JSON.parse()
)。ws.onclose = function(event) { ... };
连接关闭时触发。event.code
和event.reason
提供关闭信息。ws.onerror = function(event) { ... };
发生错误时触发。
-
发送消息:
javascript
ws.send("Hello Server!"); // 发送文本
ws.send(JSON.stringify({ type: "greeting", content: "Hi!" })); // 发送 JSON -
关闭连接:
javascript
ws.close(1000, "User logged out"); // code 1000 是正常关闭 -
检查连接状态:
ws.readyState
: 返回连接状态码 (0: CONNECTING, 1: OPEN, 2: CLOSING, 3: CLOSED)。
注意: 在生产环境中,强烈建议使用 wss://
(安全的 WebSocket 连接),这需要你的服务器配置 TLS/SSL 证书。
9. WebSocket 中的身份验证
保护 WebSocket 连接非常重要,特别是当它们用于传输敏感信息或执行特权操作时。FastAPI 的依赖注入系统同样适用于 WebSocket 端点,这使得实现身份验证变得相当直接。
常用的 WebSocket 身份验证方法包括:
- 通过 Query 参数传递 Token: 客户端连接时将认证令牌(如 JWT)放在 URL 的查询参数中:
wss://example.com/ws?token=YOUR_JWT_TOKEN
。服务器端在accept()
之前验证这个 token。这是最常见的方法之一,因为浏览器 WebSocket API 不直接支持设置自定义 Header。 - 通过 Cookie 传递 Token: 如果 WebSocket 连接是从已经通过身份验证的 Web 页面发起的,浏览器会自动发送与该域关联的 Cookie。服务器可以读取这些 Cookie 来验证用户身份。
- 通过 Subprotocol 传递 Token: WebSocket 支持子协议协商。可以在握手期间通过
Sec-WebSocket-Protocol
头部传递包含 token 的子协议名称,但这相对不常用且复杂。 - 连接后发送认证消息: 接受连接后,要求客户端发送的第一条消息是认证信息。在认证成功前,不处理其他业务消息。
使用 FastAPI 依赖注入进行 Token 验证 (Query 参数示例):
假设你有一个函数 get_current_user(token: str = Query(...))
用于验证 JWT 并返回用户信息,你可以将它用作 WebSocket 端点的依赖项。
```python
main.py (添加认证依赖)
from fastapi import Depends, Query, HTTPException, status
from fastapi.security import APIKeyQuery # 可以用来定义 Query 参数名
假设你有一个验证 token 并返回用户模型的函数
以下是一个非常简化的示例,你需要替换为实际的 JWT 或其他 token 验证逻辑
fake_users_db = {"user1": {"username": "user1", "id": 1}, "user2": {"username": "user2", "id": 2}}
async def get_current_user_ws(
websocket: WebSocket, # 需要 WebSocket 对象来处理连接关闭
token: str = Query(None, description="认证令牌") # 从 Query 参数获取 token
):
if token is None:
logger.warning("连接尝试缺少 token")
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Missing authentication token")
# 注意:在依赖项中关闭连接后,FastAPI 可能仍会尝试进入端点函数主体,
# 但此时 websocket.receive_ 或 send_ 会失败。更好的做法是引发 HTTPException,
# 但 WebSocket 端点默认不直接处理 HTTPExceptions 来关闭连接。
# 因此,直接关闭并返回 None 或引发一个特定异常可能更清晰。
# 或者,在端点函数开始时检查依赖项返回的是否是有效用户。
# 返回 None 或引发自定义异常通常更好
return None # 或者 raise WebSocketException("Unauthorized", code=status.WS_1008_POLICY_VIOLATION)
user = fake_users_db.get(token) # 极简化的 token -> user 查找
if user is None:
logger.warning(f"无效的 token: {token}")
await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Invalid authentication token")
return None # 或者 raise WebSocketException("Unauthorized", code=status.WS_1008_POLICY_VIOLATION)
logger.info(f"用户 '{user['username']}' 通过 token 验证")
return user # 返回用户信息字典
需要自定义一个 WebSocket 异常,如果想通过异常中断流程
class WebSocketException(Exception):
def init(self, message: str, code: int = status.WS_1008_POLICY_VIOLATION):
self.message = message
self.code = code
安全的 WebSocket 端点
@app.websocket("/ws/secure_chat")
async def secure_websocket_endpoint(
websocket: WebSocket,
# 使用 Depends 来注入验证逻辑,并获取用户信息
current_user: dict = Depends(get_current_user_ws)
):
# 如果依赖项返回 None (表示认证失败且已关闭连接),则直接返回
if current_user is None:
# 依赖项已经处理了关闭,这里不需要额外操作
logger.info("认证失败,WebSocket 连接未建立。")
return # 结束函数执行
client_id = current_user.get("username", "未知用户") # 使用认证后的用户名作为 ID
# 使用之前的 EnhancedConnectionManager (或者根据需要调整)
await enhanced_manager.connect(websocket, client_id)
await enhanced_manager.broadcast(f"安全用户 '{client_id}' 加入了聊天室", sender_id=client_id)
try:
while True:
data = await websocket.receive_text()
logger.info(f"收到来自安全用户 '{client_id}' 的消息: {data}")
message_to_broadcast = f"'{client_id}' (安全): {data}"
await enhanced_manager.broadcast(message_to_broadcast, sender_id=client_id)
except WebSocketDisconnect:
enhanced_manager.disconnect(client_id)
logger.warning(f"安全用户 '{client_id}' 离开聊天室")
await enhanced_manager.broadcast(f"安全用户 '{client_id}' 离开了聊天室")
except Exception as e:
logger.error(f"安全用户 '{client_id}' 的 WebSocket 连接发生错误: {e}")
try:
await websocket.close(code=1011, reason=f"Server error: {e}")
except RuntimeError:
pass
enhanced_manager.disconnect(client_id)
await enhanced_manager.broadcast(f"安全用户 '{client_id}' 因错误断开连接")
finally:
logger.info(f"结束与安全用户 '{client_id}' 的交互")
(前端 JS 连接时需要构
建带 token 的 URL: ws://localhost:8000/ws/secure_chat?token=user1
或 token=user2
)
(需要提供相应的 HTML 页面或修改现有页面以支持输入 token)
```
关键点:
- 依赖函数
get_current_user_ws
接收websocket: WebSocket
和token: str = Query(...)
作为参数。 - 在依赖函数内部进行 token 验证。如果验证失败,必须手动调用
await websocket.close(...)
并指定适当的关闭代码(如1008 Policy Violation
),然后返回None
或引发一个特定的异常(如果你定义了全局异常处理器来处理它)。仅仅raise HTTPException
通常不会像在 HTTP 请求中那样自动关闭 WebSocket 连接。 - 在 WebSocket 端点函数 (
secure_websocket_endpoint
) 中,通过Depends(get_current_user_ws)
注入依赖。 - 在端点函数的开始处检查
current_user
是否为None
。如果是,说明认证失败且连接已被依赖项关闭,应直接return
停止后续处理。 - 如果认证成功,
current_user
将包含用户信息,可以用于后续逻辑(例如,确定client_id
)。
这种方式将认证逻辑与 WebSocket 处理逻辑解耦,保持了代码的清晰和可维护性。
10. 进阶话题与最佳实践
- 错误处理: 在
while True
循环内部和外部都要有健壮的错误处理。记录详细的错误日志。考虑定义更具体的异常类型。 - 心跳/Ping-Pong: 长时间无数据传输的 WebSocket 连接可能会被中间代理(如防火墙、负载均衡器)断开。可以实现心跳机制:服务器定期向客户端发送 Ping 帧,客户端响应 Pong 帧(反之亦可),以保持连接活跃。
websockets
库和 Uvicorn 通常会自动处理基本的 Ping/Pong,但你可能需要根据需要进行配置或自定义。 - 二进制数据: 对于需要传输大量二进制数据(如文件、音视频流)的场景,使用
receive_bytes()
和send_bytes()
。 - 背压(Backpressure): 如果一方发送数据的速度远快于另一方处理的速度,可能会导致内存溢出。虽然 ASGI 和
websockets
库会进行一些处理,但在高负载下仍需注意。可以考虑限制消息队列的大小或实现应用层面的流控。 - 子协议 (Subprotocols): 客户端可以在连接时通过
Sec-WebSocket-Protocol
Header 声明它支持的子协议列表,服务器从中选择一个并通过同名 Header 返回。这允许在 WebSocket 之上定义更具体的应用层协议。可以在websocket.accept(subprotocol=...)
中指定服务器选择的子协议。 -
后台任务 (Background Tasks): 对于耗时的操作(如数据库查询、调用外部 API),不要阻塞 WebSocket 的事件循环。使用 FastAPI 的
BackgroundTasks
或asyncio.create_task
将这些操作放到后台执行。
```python
from fastapi import BackgroundTasks@app.websocket("/ws/long_task/{item_id}")
async def ws_long_task(websocket: WebSocket, item_id: str, background_tasks: BackgroundTasks):
await websocket.accept()
async def perform_long_task(ws: WebSocket, item: str):
# 模拟耗时操作
await asyncio.sleep(5)
result = f"Task for item {item} completed."
try:
await ws.send_text(result)
except WebSocketDisconnect:
logger.info("Client disconnected before task completion.")
except Exception as e:
logger.error(f"Failed to send task result: {e}")# 将耗时任务添加到后台执行 background_tasks.add_task(perform_long_task, websocket, item_id) # 可以立即给客户端一个反馈 await websocket.send_text(f"后台任务已启动,处理项目: {item_id}") # 注意:这里不能在 try...except WebSocketDisconnect 中等待任务完成 # 你需要设计好如何在任务完成后通知客户端(如果它仍然连接着) # 上面的例子是任务完成后直接尝试发送 # 保持连接开放以接收其他指令或等待断开 try: while True: # 可以接收客户端的其他指令,或者只是保持连接 data = await websocket.receive_text() if data == "status?": await websocket.send_text("Task is running in the background.") # ... 其他逻辑 ... except WebSocketDisconnect: logger.info(f"Client {item_id} disconnected.") finally: # 清理逻辑 pass
``
proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade";
* **扩展性:** 对于需要处理大量并发连接的应用,单个 FastAPI 实例可能成为瓶颈。可以考虑:
* **水平扩展:** 运行多个 FastAPI 实例,并使用支持 WebSocket 的负载均衡器(如 Nginx 配置)。
ConnectionManager` 中进行。需要引入外部消息中间件(如 Redis Pub/Sub, RabbitMQ, Kafka)。每个 FastAPI 实例连接到消息队列,接收到的消息发布到队列,所有实例订阅队列并将消息推送给它们管理的本地 WebSocket 连接。
* **消息队列/发布订阅:** 当需要跨多个实例广播消息时,不能简单地在单个实例的
结论
FastAPI 为构建 WebSocket 应用提供了强大而简洁的工具。通过利用其异步特性、依赖注入系统和对标准 WebSocket 协议的良好支持,我们可以轻松创建从简单的 Echo 服务到复杂的实时聊天室,乃至需要身份验证和后台处理的高级应用。
掌握 WebSocket
对象的核心方法 (accept
, receive_*
, send_*
),理解 WebSocketDisconnect
异常的处理,并学会使用连接管理器来处理多个客户端,是构建健壮 WebSocket 应用的关键。同时,不要忘记考虑安全性(身份验证、WSS)、错误处理和在高并发场景下的扩展性问题。
希望本教程能帮助你全面理解 FastAPI 中的 WebSocket,并为你构建下一代实时 Web 应用打下坚实的基础。祝你编码愉快!





赶快来坐沙发