top
本文目录
引言
1. WebSocket 基础概念回顾
2. FastAPI 环境准备
3. 创建第一个 WebSocket 端点
4. 发送和接收不同类型的数据
Received Messages:
5. 处理客户端断开连接 (WebSocketDisconnect)
6. 管理多个 WebSocket 连接(实现简易聊天室)
Chat Messages:
7. 向特定客户端发送消息
8. 前端 JavaScript 客户端示例
9. WebSocket 中的身份验证
10. 进阶话题与最佳实践
结论

FastAPI WebSocket 完整教程 (含代码示例)


FastAPI WebSocket 完整教程:从入门到实战

引言

在现代 Web 开发中,实时通信扮演着越来越重要的角色。无论是实时聊天、股票行情推送、在线游戏还是协作编辑工具,都需要服务器能够主动向客户端推送信息,而不仅仅是响应客户端的请求。WebSocket 协议正是为解决这一需求而生,它提供了一个全双工的通信通道,允许服务器和客户端在建立连接后自由地相互发送数据。

FastAPI,作为一个现代化、高性能的 Python Web 框架,凭借其异步特性、类型提示支持和强大的依赖注入系统,天然地适合构建 WebSocket 应用。它简化了 WebSocket 端点的创建和管理,让开发者能够专注于业务逻辑。

本教程将带你深入了解如何在 FastAPI 中使用 WebSocket,从基础连接、数据收发,到管理多个连接、实现广播,再到身份验证和一些进阶技巧。我们将通过具体的代码示例,一步步构建一个功能完善的 WebSocket 应用。

本教程涵盖内容:

  1. WebSocket 基础概念回顾
  2. FastAPI 环境准备
  3. 创建第一个 WebSocket 端点
  4. 发送和接收文本/JSON 数据
  5. 处理客户端断开连接
  6. 管理多个 WebSocket 连接(实现简易聊天室)
  7. 向特定客户端发送消息与广播消息
  8. 前端 JavaScript 客户端示例
  9. WebSocket 中的身份验证
  10. 进阶话题与最佳实践

目标读者:

本教程适合对 Python 和 Web 开发有一定基础,了解 FastAPI 基本用法,并希望学习如何在 FastAPI 中实现 WebSocket 功能的开发者。

1. WebSocket 基础概念回顾

在深入 FastAPI 实现之前,我们先简单回顾一下 WebSocket 的核心概念:

  • 持久连接 (Persistent Connection): 与 HTTP 的请求-响应模式不同,WebSocket 在客户端和服务器之间建立一个持久性的 TCP 连接。一旦连接建立,双方都可以随时向对方发送数据,无需每次都重新建立连接。
  • 全双工通信 (Full-Duplex): 数据可以同时在两个方向上传输,服务器可以主动向客户端推送信息,客户端也可以随时向服务器发送信息。
  • 协议升级: WebSocket 连接通常始于一个标准的 HTTP(S) 请求(包含 Upgrade: websocketConnection: Upgrade 等头部信息)。如果服务器支持 WebSocket,它会响应一个 HTTP 101 Switching Protocols 状态码,随后该 TCP 连接就转变为 WebSocket 连接。
  • 低延迟: 由于连接是持久的,减少了 HTTP 请求的握手和头部开销,数据传输延迟更低,非常适合实时应用场景。
  • URL 模式: WebSocket URL 使用 ws:// (不加密) 或 wss:// (加密,基于 TLS/SSL) 模式。

2. FastAPI 环境准备

首先,确保你已经安装了 Python (推荐 3.7+)。然后,我们需要安装 FastAPI 和一个 ASGI 服务器,例如 Uvicorn。同时,websockets 库是 FastAPI 处理 WebSocket 底层通信所依赖的,通常会作为 FastAPI 的依赖项自动安装,但显式安装可以确保版本兼容。

bash
pip install fastapi uvicorn websockets

  • fastapi: 核心框架。
  • uvicorn: ASGI 服务器,用于运行 FastAPI 应用。
  • websockets: 处理 WebSocket 协议的底层库。

3. 创建第一个 WebSocket 端点

让我们从创建一个最简单的 WebSocket 端点开始。这个端点将接受客户端连接,然后简单地将接收到的任何消息原样发回(一个 "echo" 服务)。

创建一个名为 main.py 的文件:

```python

main.py

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import HTMLResponse
import logging

配置日志记录

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)

app = FastAPI()

提供一个简单的 HTML 页面用于测试

html = """




Chat

WebSocket Echo Test





"""

@app.get("/")
async def get():
"""提供用于测试 WebSocket 的 HTML 页面"""
return HTMLResponse(html)

核心:WebSocket 端点

@app.websocket("/ws/echo")
async def websocket_echo_endpoint(websocket: WebSocket):
"""
处理 WebSocket 连接的端点。
接收文本消息并将其回显给客户端。
"""
logger.info(f"客户端 {websocket.client.host}:{websocket.client.port} 尝试连接...")
# 1. 接受连接
await websocket.accept()
logger.info(f"客户端 {websocket.client.host}:{websocket.client.port} 连接成功")

try:
    # 2. 循环接收和发送消息
    while True:
        # 等待接收文本消息
        data = await websocket.receive_text()
        logger.info(f"收到来自 {websocket.client.host}:{websocket.client.port} 的消息: {data}")

        # 发送收到的文本消息回客户端
        await websocket.send_text(f"服务器回显: {data}")
        logger.info(f"已向 {websocket.client.host}:{websocket.client.port} 回显消息")

except WebSocketDisconnect:
    # 3. 处理客户端断开连接
    logger.warning(f"客户端 {websocket.client.host}:{websocket.client.port} 断开连接")
except Exception as e:
    # 处理其他可能的异常
    logger.error(f"发生错误: {e}")
    await websocket.close(code=1011, reason=f"服务器内部错误: {e}") # 告知客户端错误并关闭
finally:
    # 确保连接在任何情况下(正常断开或异常)都被尝试关闭
    # 虽然 accept() 之后,FastAPI 会在函数结束或异常时自动处理关闭,
    # 但显式处理有助于理解流程和进行可能的清理工作。
    # 注意:如果连接已断开,再次调用 close 可能无效或抛出异常,
    # 但在 finally 中尝试是安全的。
    logger.info(f"清理与客户端 {websocket.client.host}:{websocket.client.port} 的连接资源")
    # 通常不需要在这里显式调用 websocket.close(),除非有特殊清理逻辑
    # FastAPI 会在函数退出或 WebSocketDisconnect 异常后处理关闭

```

代码解析:

  1. @app.websocket("/ws/echo"): 这是一个装饰器,用于定义 WebSocket 路由。类似于 @app.get()@app.post(),但专门用于 WebSocket 连接。路径是 /ws/echo
  2. async def websocket_echo_endpoint(websocket: WebSocket): 定义处理 WebSocket 连接的异步函数。参数 websocket 是一个 WebSocket 类的实例,它代表了当前的 WebSocket 连接。FastAPI 会自动注入这个对象。
  3. await websocket.accept(): 这是建立 WebSocket 连接的关键步骤。在调用此方法之前,连接尚未完全建立。调用它会发送 WebSocket 握手的响应给客户端,确认连接。必须先调用 accept() 才能进行后续的 sendreceive 操作。
  4. while True:: 使用一个无限循环来持续监听来自客户端的消息。
  5. data = await websocket.receive_text(): 异步等待并接收来自客户端的文本消息。如果客户端发送的是非文本数据(如二进制数据或 JSON),或者连接已关闭,这里可能会出错或返回特定类型的数据/异常。
  6. await websocket.send_text(f"服务器回显: {data}"): 异步向客户端发送文本消息。
  7. try...except WebSocketDisconnect:: FastAPI 在客户端断开连接时会引发 WebSocketDisconnect 异常。我们捕获这个异常来进行相应的处理,例如记录日志或清理资源。这是处理断开的标准方式。
  8. except Exception as e: 捕获其他可能的运行时错误。
  9. finally: 无论连接是正常断开还是因为异常中断,finally 块中的代码都会执行,适合放置资源清理逻辑。

运行与测试:

  1. 在终端中运行 Uvicorn:
    bash
    uvicorn main:app --reload

    --reload 参数使得代码更改后服务器会自动重启,方便开发。

  2. 打开浏览器,访问 http://localhost:8000/。你会看到一个简单的 HTML 页面。

  3. 在输入框中输入消息,点击 "Send"。你应该能在页面上看到服务器的回显消息(例如,输入 "hello",会显示 "服务器回显: hello")。同时,观察运行 Uvicorn 的终端,你会看到连接、接收和发送消息的日志。

  4. 关闭浏览器标签页或刷新页面,终端会显示客户端断开连接的日志。

4. 发送和接收不同类型的数据

WebSocket 不仅仅能传输文本,还可以传输 JSON 和二进制数据。FastAPI 提供了方便的方法来处理这些:

  • 接收数据:

    • await websocket.receive_text(): 接收文本数据。
    • await websocket.receive_bytes(): 接收二进制数据。
    • await websocket.receive_json(): 接收 JSON 数据。它会自动将接收到的文本或字节解析为 Python 字典或列表。如果数据不是有效的 JSON,会引发 json.JSONDecodeError (或者在某些 FastAPI/Starlette 版本中可能包装在 RuntimeError 或其他 WebSocket 异常中,需要根据实际情况捕获)。
    • await websocket.receive(): 更底层的接收方法,返回一个包含 typebytestext 键的字典。
  • 发送数据:

    • await websocket.send_text(data: str): 发送文本数据。
    • await websocket.send_bytes(data: bytes): 发送二进制数据。
    • await websocket.send_json(data: Any, mode: str = "text"): 发送 Python 对象(如字典、列表)作为 JSON。默认以文本模式 (mode="text") 发送,也可以设置为 mode="binary" 以二进制 UTF-8 编码发送。

示例:处理 JSON 数据

修改 /ws/echo 端点来接收和发送 JSON:

```python

main.py (部分修改)

import json

... (其他 import 和 app 定义保持不变) ...

@app.websocket("/ws/json_echo")
async def websocket_json_endpoint(websocket: WebSocket):
logger.info(f"客户端 {websocket.client.host}:{websocket.client.port} 尝试连接到 JSON Echo...")
await websocket.accept()
logger.info(f"客户端 {websocket.client.host}:{websocket.client.port} (JSON Echo) 连接成功")

try:
    while True:
        # 接收 JSON 数据
        try:
            data = await websocket.receive_json()
            logger.info(f"收到来自 {websocket.client.host}:{websocket.client.port} 的 JSON: {data}")

            # 检查接收到的数据是否是字典
            if isinstance(data, dict):
                response = {
                    "message": "服务器已收到您的 JSON",
                    "your_data": data,
                    "server_status": "OK"
                }
            else:
                 response = {
                    "message": "服务器期望收到 JSON 对象 (字典)",
                    "received_type": str(type(data))
                }

            # 发送 JSON 数据回客户端
            await websocket.send_json(response)
            logger.info(f"已向 {websocket.client.host}:{websocket.client.port} 回复 JSON")

        except json.JSONDecodeError:
            logger.error(f"客户端 {websocket.client.host}:{websocket.client.port} 发送了无效的 JSON 数据")
            await websocket.send_text("错误:请发送有效的 JSON 格式数据。")
        except Exception as e: # 捕获 receive_json 可能抛出的其他异常,如 RuntimeError
            logger.error(f"接收 JSON 时出错: {e}, 类型: {type(e)}")
            # 可以根据异常类型决定是否关闭连接
            await websocket.send_text(f"服务器处理错误: {e}")
            # 如果错误严重,可以选择关闭连接
            # await websocket.close(code=1011)
            # break # 退出循环

except WebSocketDisconnect:
    logger.warning(f"客户端 {websocket.client.host}:{websocket.client.port} (JSON Echo) 断开连接")
except Exception as e:
    logger.error(f"WebSocket 连接发生意外错误: {e}")
    # 尝试安全关闭,即使在 accept() 之后出现问题
    try:
        await websocket.close(code=1011, reason=f"服务器内部错误: {e}")
    except RuntimeError as re:
         logger.error(f"尝试关闭已断开的连接时出错: {re}") # 可能连接已经不在可用状态
finally:
    logger.info(f"清理与客户端 {websocket.client.host}:{websocket.client.port} (JSON Echo) 的连接资源")

... (HTML 和 / 路由保持不变,但 JS 需要修改以发送 JSON) ...

更新 HTML 中的 JavaScript 以发送 JSON

html_json = """



JSON Chat

WebSocket JSON Test

Message:
Value:

Received Messages:



    """

    @app.get("/json")
    async def get_json_page():
    """提供用于测试 JSON WebSocket 的 HTML 页面"""
    return HTMLResponse(html_json)
    ```

    现在,运行 uvicorn main:app --reload 并访问 http://localhost:8000/json。在输入框中输入内容,点击 "Send JSON",你会看到客户端发送的 JSON 数据和服务器返回的 JSON 响应。尝试发送非 JSON 格式的文本,服务器会返回错误提示。

    5. 处理客户端断开连接 (WebSocketDisconnect)

    我们已经在上面的例子中看到了如何使用 try...except WebSocketDisconnect: 来捕获客户端的断开事件。这是非常重要的,因为它允许你:

    1. 优雅地终止循环: WebSocketDisconnect 异常会跳出 while True 循环。
    2. 记录日志: 了解哪些客户端断开了连接。
    3. 清理资源: 如果你为该连接维护了任何状态(例如,在聊天室中将其从活动用户列表中移除),这是执行清理操作的最佳位置。

    ```python

    ... 在 WebSocket 端点函数内 ...

    try:
    while True:
    data = await websocket.receive_text()
    # ... 处理数据 ...
    except WebSocketDisconnect:
    logger.warning(f"客户端 {websocket.client.host}:{websocket.client.port} 断开连接")
    # 在这里执行特定于该连接的清理逻辑
    # 例如:从用户列表中移除 websocket 对象
    finally:
    # 通用的清理逻辑,或者确认连接已关闭
    logger.info(f"结束与客户端 {websocket.client.host}:{websocket.client.port} 的交互")
    ```

    6. 管理多个 WebSocket 连接(实现简易聊天室)

    现实世界的应用通常需要同时处理多个 WebSocket 连接,并在它们之间共享信息,例如聊天室。FastAPI 本身不直接提供连接管理的工具,但我们可以轻松地自己实现一个。

    我们将创建一个 ConnectionManager 类来跟踪所有活动的连接,并提供广播消息的方法。

    ```python

    main.py (添加 ConnectionManager 和聊天室端点)

    from typing import List

    ... (之前的 import) ...

    class ConnectionManager:
    """管理 WebSocket 连接"""
    def init(self):
    # 存储活动的 WebSocket 连接
    self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        """接受新的 WebSocket 连接并将其添加到活动连接列表"""
        await websocket.accept()
        self.active_connections.append(websocket)
        logger.info(f"新连接加入: {websocket.client.host}:{websocket.client.port}. 当前连接数: {len(self.active_connections)}")
    
    def disconnect(self, websocket: WebSocket):
        """从活动连接列表中移除 WebSocket 连接"""
        if websocket in self.active_connections:
            self.active_connections.remove(websocket)
            logger.info(f"连接断开: {websocket.client.host}:{websocket.client.port}. 当前连接数: {len(self.active_connections)}")
        else:
             logger.warning(f"尝试断开一个不在活动列表中的连接: {websocket.client.host}:{websocket.client.port}")
    
    
    async def send_personal_message(self, message: str, websocket: WebSocket):
        """向单个 WebSocket 连接发送消息"""
        try:
            await websocket.send_text(message)
            logger.debug(f"向 {websocket.client.host}:{websocket.client.port} 发送个人消息: {message}")
        except Exception as e:
             logger.error(f"向 {websocket.client.host}:{websocket.client.port} 发送消息失败: {e}")
             # 发送失败可能意味着连接已断开,可以考虑在这里也调用 disconnect
             self.disconnect(websocket)
    
    
    async def broadcast(self, message: str, sender: WebSocket = None):
        """向所有活动的 WebSocket 连接广播消息 (可以选择排除发送者)"""
        logger.info(f"广播消息: {message} (共 {len(self.active_connections)} 个连接)")
        disconnected_sockets = []
        for connection in self.active_connections:
            if connection != sender: # 避免给自己发送广播
                try:
                    await connection.send_text(message)
                except Exception as e:
                    # 如果发送失败,标记此连接为断开,后续处理
                    logger.error(f"广播时向 {connection.client.host}:{connection.client.port} 发送失败: {e}")
                    disconnected_sockets.append(connection)
    
        # 清理广播时发现的已断开连接
        for socket in disconnected_sockets:
            self.disconnect(socket)
    

    创建一个全局的 ConnectionManager 实例

    manager = ConnectionManager()

    聊天室 WebSocket 端点

    @app.websocket("/ws/chat/{client_id}")
    async def websocket_chat_endpoint(websocket: WebSocket, client_id: str):
    """
    处理聊天室 WebSocket 连接。
    - 接收客户端消息并广播给所有其他客户端。
    - 处理新用户加入和离开的通知。
    """
    # 连接客户端
    await manager.connect(websocket)
    # 向所有客户端广播新用户加入的消息
    await manager.broadcast(f"用户 '{client_id}' 加入了聊天室", sender=websocket) # 通知其他人,但不通知自己

    try:
        while True:
            # 接收来自客户端的消息
            data = await websocket.receive_text()
            logger.info(f"收到来自 '{client_id}' ({websocket.client.host}:{websocket.client.port}) 的消息: {data}")
    
            # 构建要广播的消息,包含发送者信息
            message_to_broadcast = f"'{client_id}': {data}"
            # 广播消息给所有其他连接
            await manager.broadcast(message_to_broadcast, sender=websocket) # 广播给除自己外的所有人
    
            # (可选)也可以给自己回显一条确认消息
            # await manager.send_personal_message(f"你发送了: {data}", websocket)
    
    except WebSocketDisconnect:
        # 客户端断开连接
        manager.disconnect(websocket)
        logger.warning(f"用户 '{client_id}' ({websocket.client.host}:{websocket.client.port}) 离开聊天室")
        # 广播用户离开的消息
        await manager.broadcast(f"用户 '{client_id}' 离开了聊天室") # 通知所有人
    except Exception as e:
        # 处理其他异常
        logger.error(f"用户 '{client_id}' 的 WebSocket 连接发生错误: {e}")
        # 尝试关闭连接并从管理器中移除
        try:
            await websocket.close(code=1011, reason=f"服务器错误: {e}")
        except RuntimeError:
            pass # 可能连接已关闭
        manager.disconnect(websocket)
        # 可以选择广播一个错误消息或用户异常离开的消息
        await manager.broadcast(f"用户 '{client_id}' 因错误断开连接")
    finally:
        # 确保在任何情况下都尝试断开连接并广播离开消息(如果之前没做的话)
        # 注意:如果在 disconnect 中已经处理了广播,这里可能需要避免重复
        # 但放在 finally 可以作为一种保障机制
        # manager.disconnect(websocket) # 确保移除
        # await manager.broadcast(f"用户 '{client_id}' 离开了聊天室") # 可能重复,谨慎使用
        logger.info(f"结束与用户 '{client_id}' 的交互")
    

    提供聊天室的 HTML 页面

    chat_html = """




    FastAPI Chat


    FastAPI WebSocket Chat







    """

    @app.get("/chat")
    async def get_chat_page():
    """提供聊天室 HTML 页面"""
    return HTMLResponse(chat_html)

    ```

    代码解析:

    1. ConnectionManager 类:
      • __init__: 初始化一个空列表 active_connections 来存储 WebSocket 对象。
      • connect: 接受连接 (websocket.accept()) 并将 WebSocket 对象添加到列表中。
      • disconnect: 从列表中移除指定的 WebSocket 对象。
      • send_personal_message: 向单个 WebSocket 发送消息。
      • broadcast: 遍历 active_connections 列表,向除发送者(可选)之外的所有连接发送消息。增加了错误处理,如果在广播时发现连接已失效,则将其记录下来并在循环后统一移除。
    2. 全局实例 manager = ConnectionManager(): 我们创建了一个全局的 ConnectionManager 实例,这样所有对 /ws/chat/{client_id} 的请求都可以访问同一个连接列表。
    3. 聊天端点 @app.websocket("/ws/chat/{client_id}"):
      • 路径参数 client_id 用于标识不同的用户。
      • 连接: 调用 manager.connect(websocket) 接受连接并添加到管理器。然后广播一条新用户加入的消息。
      • 接收与广播: 在 while True 循环中,接收客户端消息 (receive_text),然后调用 manager.broadcast() 将格式化后的消息(包含发送者 ID)广播给所有其他连接。
      • 断开: 在 WebSocketDisconnect 异常处理块中,调用 manager.disconnect(websocket) 从管理器移除连接,并广播一条用户离开的消息。
      • 错误处理: 增加了对其他异常的处理,确保即使发生错误也能尝试清理连接并通知其他用户。
    4. 前端 chat.html:
      • 增加了一个输入框让用户输入自己的名字 (client_id)。
      • 连接按钮 (connectWs):获取用户输入的 client_id,构建 WebSocket URL (ws://localhost:8000/ws/chat/{client_id}),创建 WebSocket 连接。
      • 连接成功 (onopen) 后显示聊天框,禁用连接按钮,启用断开按钮。
      • 收到消息 (onmessage) 后,将其添加到聊天记录列表中。
      • 断开连接 (onclose) 后,隐藏聊天框,重置按钮状态。
      • 发送消息 (sendMessage):将输入框的内容通过 ws.send() 发送给服务器,并(可选地)在本地也显示自己发送的消息。

    运行与测试:

    1. 运行 uvicorn main:app --reload
    2. 在浏览器中打开多个标签页或窗口,都访问 http://localhost:8000/chat
    3. 在每个页面输入不同的名字,点击 "Connect"。
    4. 在一个页面发送消息,你会看到该消息出现在所有其他页面的聊天记录中(带有发送者名字)。你自己的页面不会收到广播(因为我们排除了 sender),但我们在前端直接添加了 "You: ..." 的消息。
    5. 关闭其中一个页面,其他页面会收到该用户离开的通知。

    7. 向特定客户端发送消息

    ConnectionManager 类中的 send_personal_message 方法已经实现了向特定客户端发送消息的功能。如果你需要根据某些条件(例如用户 ID)找到特定的 WebSocket 连接并发送消息,你可能需要稍微扩展 ConnectionManager

    一种常见做法是使用字典来存储连接,以用户 ID 或其他唯一标识符作为键:

    ```python

    main.py (改进 ConnectionManager)

    from typing import Dict, List

    class EnhancedConnectionManager:
    def init(self):
    # 使用字典存储连接,键为 client_id,值为 WebSocket 对象
    self.active_connections: Dict[str, WebSocket] = {}

    async def connect(self, websocket: WebSocket, client_id: str):
        await websocket.accept()
        self.active_connections[client_id] = websocket
        logger.info(f"用户 '{client_id}' ({websocket.client.host}:{websocket.client.port}) 连接. 当前连接数: {len(self.active_connections)}")
    
    def disconnect(self, client_id: str):
        if client_id in self.active_connections:
            websocket = self.active_connections.pop(client_id) # 移除并获取对象
            logger.info(f"用户 '{client_id}' ({websocket.client.host}:{websocket.client.port}) 断开. 当前连接数: {len(self.active_connections)}")
            return websocket # 返回被移除的 websocket 对象,可能有用
        else:
            logger.warning(f"尝试断开一个不存在的用户 '{client_id}'")
            return None
    
    async def send_personal_message(self, message: str, client_id: str):
        websocket = self.active_connections.get(client_id)
        if websocket:
            try:
                await websocket.send_text(message)
                logger.debug(f"向 '{client_id}' 发送个人消息: {message}")
            except Exception as e:
                logger.error(f"向 '{client_id}' 发送消息失败: {e}")
                self.disconnect(client_id) # 发送失败则断开连接
        else:
            logger.warning(f"尝试向不存在或已断开的用户 '{client_id}' 发送消息")
    
    async def broadcast(self, message: str, sender_id: str = None):
        logger.info(f"广播消息: {message} (共 {len(self.active_connections)} 个连接)")
        # 创建当前连接ID列表的副本进行迭代,防止在迭代时修改字典
        client_ids = list(self.active_connections.keys())
        disconnected_ids = []
    
        for client_id in client_ids:
            if client_id != sender_id:
                websocket = self.active_connections.get(client_id)
                if websocket: # 再次检查是否存在,因为可能在广播过程中断开
                    try:
                        await websocket.send_text(message)
                    except Exception as e:
                        logger.error(f"广播时向 '{client_id}' 发送失败: {e}")
                        disconnected_ids.append(client_id)
                else:
                    # 如果在迭代开始后,连接被移除了,记录一下
                     logger.warning(f"广播时发现用户 '{client_id}' 已不存在")
    
    
        # 清理广播时发现的已断开连接
        for client_id in disconnected_ids:
            self.disconnect(client_id)
    

    使用新的管理器

    enhanced_manager = EnhancedConnectionManager()

    更新聊天端点以使用新的管理器和 client_id 作为键

    @app.websocket("/ws/v2/chat/{client_id}")
    async def websocket_chat_v2_endpoint(websocket: WebSocket, client_id: str):
    # 检查 client_id 是否已被使用 (可选,但推荐)
    if client_id in enhanced_manager.active_connections:
    logger.warning(f"用户 ID '{client_id}' 已被占用,拒绝连接")
    await websocket.close(code=1008, reason=f"User ID '{client_id}' is already in use.")
    return

    await enhanced_manager.connect(websocket, client_id)
    await enhanced_manager.broadcast(f"用户 '{client_id}' 加入了聊天室", sender_id=client_id)
    
    try:
        while True:
            data = await websocket.receive_text()
            logger.info(f"收到来自 '{client_id}' 的消息: {data}")
            message_to_broadcast = f"'{client_id}': {data}"
            await enhanced_manager.broadcast(message_to_broadcast, sender_id=client_id)
    
    except WebSocketDisconnect:
        # 客户端正常断开
        enhanced_manager.disconnect(client_id)
        logger.warning(f"用户 '{client_id}' 离开聊天室")
        await enhanced_manager.broadcast(f"用户 '{client_id}' 离开了聊天室")
    except Exception as e:
        # 异常断开
        logger.error(f"用户 '{client_id}' 的 WebSocket 连接发生错误: {e}")
        try:
            await websocket.close(code=1011, reason=f"Server error: {e}")
        except RuntimeError:
             pass # Ignore errors if connection is already closed
        # 确保从管理器中移除
        enhanced_manager.disconnect(client_id)
        await enhanced_manager.broadcast(f"用户 '{client_id}' 因错误断开连接")
    finally:
        logger.info(f"结束与用户 '{client_id}' 的交互")
    

    (需要更新 HTML 中的 WebSocket URL 到 /ws/v2/chat/{client_id})

    (并且可能需要更新 /chat 路由来提供这个新版本的 HTML 或调整 JS)

    ```

    这个增强版的 ConnectionManager 使用字典存储连接,使得按 client_id 查找和发送消息变得非常高效。同时,我们在连接时添加了检查,防止重复的 client_id 连接。

    8. 前端 JavaScript 客户端示例

    我们在之前的 HTML 示例中已经包含了基本的 JavaScript WebSocket 客户端代码。这里总结一下关键部分:

    1. 创建 WebSocket 连接:
      javascript
      var clientId = "some_unique_id"; // 通常由用户输入或系统生成
      var ws = new WebSocket(`ws://your_server_address/ws/path/${clientId}`);
      // 或者使用安全连接 wss://
      // var ws = new WebSocket(`wss://your_server_address/ws/path/${clientId}`);

    2. 事件处理:

      • ws.onopen = function(event) { ... }; 连接成功建立时触发。
      • ws.onmessage = function(event) { var message = event.data; ... }; 收到服务器消息时触发。event.data 包含消息内容(通常是字符串,如果是 JSON 需要 JSON.parse())。
      • ws.onclose = function(event) { ... }; 连接关闭时触发。event.codeevent.reason 提供关闭信息。
      • ws.onerror = function(event) { ... }; 发生错误时触发。
    3. 发送消息:
      javascript
      ws.send("Hello Server!"); // 发送文本
      ws.send(JSON.stringify({ type: "greeting", content: "Hi!" })); // 发送 JSON

    4. 关闭连接:
      javascript
      ws.close(1000, "User logged out"); // code 1000 是正常关闭

    5. 检查连接状态:

      • ws.readyState: 返回连接状态码 (0: CONNECTING, 1: OPEN, 2: CLOSING, 3: CLOSED)。

    注意: 在生产环境中,强烈建议使用 wss:// (安全的 WebSocket 连接),这需要你的服务器配置 TLS/SSL 证书。

    9. WebSocket 中的身份验证

    保护 WebSocket 连接非常重要,特别是当它们用于传输敏感信息或执行特权操作时。FastAPI 的依赖注入系统同样适用于 WebSocket 端点,这使得实现身份验证变得相当直接。

    常用的 WebSocket 身份验证方法包括:

    • 通过 Query 参数传递 Token: 客户端连接时将认证令牌(如 JWT)放在 URL 的查询参数中:wss://example.com/ws?token=YOUR_JWT_TOKEN。服务器端在 accept() 之前验证这个 token。这是最常见的方法之一,因为浏览器 WebSocket API 不直接支持设置自定义 Header。
    • 通过 Cookie 传递 Token: 如果 WebSocket 连接是从已经通过身份验证的 Web 页面发起的,浏览器会自动发送与该域关联的 Cookie。服务器可以读取这些 Cookie 来验证用户身份。
    • 通过 Subprotocol 传递 Token: WebSocket 支持子协议协商。可以在握手期间通过 Sec-WebSocket-Protocol 头部传递包含 token 的子协议名称,但这相对不常用且复杂。
    • 连接后发送认证消息: 接受连接后,要求客户端发送的第一条消息是认证信息。在认证成功前,不处理其他业务消息。

    使用 FastAPI 依赖注入进行 Token 验证 (Query 参数示例):

    假设你有一个函数 get_current_user(token: str = Query(...)) 用于验证 JWT 并返回用户信息,你可以将它用作 WebSocket 端点的依赖项。

    ```python

    main.py (添加认证依赖)

    from fastapi import Depends, Query, HTTPException, status
    from fastapi.security import APIKeyQuery # 可以用来定义 Query 参数名

    假设你有一个验证 token 并返回用户模型的函数

    以下是一个非常简化的示例,你需要替换为实际的 JWT 或其他 token 验证逻辑

    fake_users_db = {"user1": {"username": "user1", "id": 1}, "user2": {"username": "user2", "id": 2}}

    async def get_current_user_ws(
    websocket: WebSocket, # 需要 WebSocket 对象来处理连接关闭
    token: str = Query(None, description="认证令牌") # 从 Query 参数获取 token
    ):
    if token is None:
    logger.warning("连接尝试缺少 token")
    await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Missing authentication token")
    # 注意:在依赖项中关闭连接后,FastAPI 可能仍会尝试进入端点函数主体,
    # 但此时 websocket.receive_ 或 send_ 会失败。更好的做法是引发 HTTPException,
    # 但 WebSocket 端点默认不直接处理 HTTPExceptions 来关闭连接。
    # 因此,直接关闭并返回 None 或引发一个特定异常可能更清晰。
    # 或者,在端点函数开始时检查依赖项返回的是否是有效用户。
    # 返回 None 或引发自定义异常通常更好
    return None # 或者 raise WebSocketException("Unauthorized", code=status.WS_1008_POLICY_VIOLATION)

    user = fake_users_db.get(token) # 极简化的 token -> user 查找
    if user is None:
        logger.warning(f"无效的 token: {token}")
        await websocket.close(code=status.WS_1008_POLICY_VIOLATION, reason="Invalid authentication token")
        return None # 或者 raise WebSocketException("Unauthorized", code=status.WS_1008_POLICY_VIOLATION)
    
    logger.info(f"用户 '{user['username']}' 通过 token 验证")
    return user # 返回用户信息字典
    

    需要自定义一个 WebSocket 异常,如果想通过异常中断流程

    class WebSocketException(Exception):

    def init(self, message: str, code: int = status.WS_1008_POLICY_VIOLATION):

    self.message = message

    self.code = code

    安全的 WebSocket 端点

    @app.websocket("/ws/secure_chat")
    async def secure_websocket_endpoint(
    websocket: WebSocket,
    # 使用 Depends 来注入验证逻辑,并获取用户信息
    current_user: dict = Depends(get_current_user_ws)
    ):
    # 如果依赖项返回 None (表示认证失败且已关闭连接),则直接返回
    if current_user is None:
    # 依赖项已经处理了关闭,这里不需要额外操作
    logger.info("认证失败,WebSocket 连接未建立。")
    return # 结束函数执行

    client_id = current_user.get("username", "未知用户") # 使用认证后的用户名作为 ID
    # 使用之前的 EnhancedConnectionManager (或者根据需要调整)
    await enhanced_manager.connect(websocket, client_id)
    await enhanced_manager.broadcast(f"安全用户 '{client_id}' 加入了聊天室", sender_id=client_id)
    
    try:
        while True:
            data = await websocket.receive_text()
            logger.info(f"收到来自安全用户 '{client_id}' 的消息: {data}")
            message_to_broadcast = f"'{client_id}' (安全): {data}"
            await enhanced_manager.broadcast(message_to_broadcast, sender_id=client_id)
    
    except WebSocketDisconnect:
        enhanced_manager.disconnect(client_id)
        logger.warning(f"安全用户 '{client_id}' 离开聊天室")
        await enhanced_manager.broadcast(f"安全用户 '{client_id}' 离开了聊天室")
    except Exception as e:
        logger.error(f"安全用户 '{client_id}' 的 WebSocket 连接发生错误: {e}")
        try:
            await websocket.close(code=1011, reason=f"Server error: {e}")
        except RuntimeError:
            pass
        enhanced_manager.disconnect(client_id)
        await enhanced_manager.broadcast(f"安全用户 '{client_id}' 因错误断开连接")
    finally:
         logger.info(f"结束与安全用户 '{client_id}' 的交互")
    

    (前端 JS 连接时需要构

    建带 token 的 URL: ws://localhost:8000/ws/secure_chat?token=user1token=user2)

    (需要提供相应的 HTML 页面或修改现有页面以支持输入 token)

    ```

    关键点:

    • 依赖函数 get_current_user_ws 接收 websocket: WebSockettoken: str = Query(...) 作为参数。
    • 在依赖函数内部进行 token 验证。如果验证失败,必须手动调用 await websocket.close(...) 并指定适当的关闭代码(如 1008 Policy Violation),然后返回 None 或引发一个特定的异常(如果你定义了全局异常处理器来处理它)。仅仅 raise HTTPException 通常不会像在 HTTP 请求中那样自动关闭 WebSocket 连接。
    • 在 WebSocket 端点函数 (secure_websocket_endpoint) 中,通过 Depends(get_current_user_ws) 注入依赖。
    • 在端点函数的开始处检查 current_user 是否为 None。如果是,说明认证失败且连接已被依赖项关闭,应直接 return 停止后续处理。
    • 如果认证成功,current_user 将包含用户信息,可以用于后续逻辑(例如,确定 client_id)。

    这种方式将认证逻辑与 WebSocket 处理逻辑解耦,保持了代码的清晰和可维护性。

    10. 进阶话题与最佳实践

    • 错误处理:while True 循环内部和外部都要有健壮的错误处理。记录详细的错误日志。考虑定义更具体的异常类型。
    • 心跳/Ping-Pong: 长时间无数据传输的 WebSocket 连接可能会被中间代理(如防火墙、负载均衡器)断开。可以实现心跳机制:服务器定期向客户端发送 Ping 帧,客户端响应 Pong 帧(反之亦可),以保持连接活跃。websockets 库和 Uvicorn 通常会自动处理基本的 Ping/Pong,但你可能需要根据需要进行配置或自定义。
    • 二进制数据: 对于需要传输大量二进制数据(如文件、音视频流)的场景,使用 receive_bytes()send_bytes()
    • 背压(Backpressure): 如果一方发送数据的速度远快于另一方处理的速度,可能会导致内存溢出。虽然 ASGI 和 websockets 库会进行一些处理,但在高负载下仍需注意。可以考虑限制消息队列的大小或实现应用层面的流控。
    • 子协议 (Subprotocols): 客户端可以在连接时通过 Sec-WebSocket-Protocol Header 声明它支持的子协议列表,服务器从中选择一个并通过同名 Header 返回。这允许在 WebSocket 之上定义更具体的应用层协议。可以在 websocket.accept(subprotocol=...) 中指定服务器选择的子协议。
    • 后台任务 (Background Tasks): 对于耗时的操作(如数据库查询、调用外部 API),不要阻塞 WebSocket 的事件循环。使用 FastAPI 的 BackgroundTasksasyncio.create_task 将这些操作放到后台执行。
      ```python
      from fastapi import BackgroundTasks

      @app.websocket("/ws/long_task/{item_id}")
      async def ws_long_task(websocket: WebSocket, item_id: str, background_tasks: BackgroundTasks):
      await websocket.accept()
      async def perform_long_task(ws: WebSocket, item: str):
      # 模拟耗时操作
      await asyncio.sleep(5)
      result = f"Task for item {item} completed."
      try:
      await ws.send_text(result)
      except WebSocketDisconnect:
      logger.info("Client disconnected before task completion.")
      except Exception as e:
      logger.error(f"Failed to send task result: {e}")

      # 将耗时任务添加到后台执行
      background_tasks.add_task(perform_long_task, websocket, item_id)
      # 可以立即给客户端一个反馈
      await websocket.send_text(f"后台任务已启动,处理项目: {item_id}")
      # 注意:这里不能在 try...except WebSocketDisconnect 中等待任务完成
      # 你需要设计好如何在任务完成后通知客户端(如果它仍然连接着)
      # 上面的例子是任务完成后直接尝试发送
      
      # 保持连接开放以接收其他指令或等待断开
      try:
          while True:
              # 可以接收客户端的其他指令,或者只是保持连接
              data = await websocket.receive_text()
              if data == "status?":
                   await websocket.send_text("Task is running in the background.")
              # ... 其他逻辑 ...
      except WebSocketDisconnect:
          logger.info(f"Client {item_id} disconnected.")
      finally:
          # 清理逻辑
          pass
      

      ``
      * **扩展性:** 对于需要处理大量并发连接的应用,单个 FastAPI 实例可能成为瓶颈。可以考虑:
      * **水平扩展:** 运行多个 FastAPI 实例,并使用支持 WebSocket 的负载均衡器(如 Nginx 配置
      proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade";)。
      * **消息队列/发布订阅:** 当需要跨多个实例广播消息时,不能简单地在单个实例的
      ConnectionManager` 中进行。需要引入外部消息中间件(如 Redis Pub/Sub, RabbitMQ, Kafka)。每个 FastAPI 实例连接到消息队列,接收到的消息发布到队列,所有实例订阅队列并将消息推送给它们管理的本地 WebSocket 连接。

    结论

    FastAPI 为构建 WebSocket 应用提供了强大而简洁的工具。通过利用其异步特性、依赖注入系统和对标准 WebSocket 协议的良好支持,我们可以轻松创建从简单的 Echo 服务到复杂的实时聊天室,乃至需要身份验证和后台处理的高级应用。

    掌握 WebSocket 对象的核心方法 (accept, receive_*, send_*),理解 WebSocketDisconnect 异常的处理,并学会使用连接管理器来处理多个客户端,是构建健壮 WebSocket 应用的关键。同时,不要忘记考虑安全性(身份验证、WSS)、错误处理和在高并发场景下的扩展性问题。

    希望本教程能帮助你全面理解 FastAPI 中的 WebSocket,并为你构建下一代实时 Web 应用打下坚实的基础。祝你编码愉快!


    THE END
    icon
    0
    icon
    打赏
    icon
    分享
    icon
    二维码
    icon
    海报
    发表评论
    评论列表

    赶快来坐沙发