FastAPI 中的并发/并行问题总结与源码阅读

tada-zako

Python 与 FastAPI 中关于并发/并行相关问题总结

进程 (Process)、线程 (Thread)、协程 (Coroutine)

定义与区别

特性 进程 (Process) 线程 (Thread) 协程 (Coroutine)
定义 操作系统资源分配的最小单位,独立内存空间。 CPU 调度最小单位,共享进程内存空间。 用户态的轻量级线程,程序(非操作系统)控制调度。
调度 操作系统抢占式调度。 操作系统抢占式调度。 程序(事件循环)协作式调度。
开销 创建、销毁、切换 创建、销毁、切换较小 创建、销毁、切换极小
并行 可在多核 CPU 上实现真并行(不同进程可在不同核上同时运行)。 Python 中受 GIL 限制纯 CPU 密集型任务无法真并行。I/O 密集型任务可并发。 单线程内实现并发,I/O 密集型任务效率极高。
适用 CPU 密集型任务(需利用多核)。 I/O 密集型任务(如网络请求、文件读写)。 高并发 I/O 密集型任务
Python multiprocessing 模块。 threading 模块。 asyncio 模块,async/await 关键字。

协程和线程的区别

  • 协程:在单线程内通过 async/await 实现的轻量级并发,适合 I/O 密集型任务。协程通过事件循环调度,避免了线程切换的开销。
  • 线程:操作系统级别的并发,适合 I/O 密集型任务,但受 GIL 限制,无法实现 CPU 密集型任务的真并行。线程可以在多核 CPU 上运行,但切换开销较大。

Python GIL (Global Interpreter Lock) 的影响

  • GIL 的本质:它是一个互斥锁,确保了同一时刻只有一个 Python 线程在执行 Python 字节码
  • 对 CPU 密集型任务的影响:由于 GIL 存在,即使在多核 CPU 上,同一个 Python 进程内的多个线程也无法同时执行纯 Python 的 CPU 计算代码。它们会轮流执行,无法实现真并行,甚至可能因线程切换开销而降低性能。
  • 对 I/O 密集型任务的影响GIL 的存在并不妨碍 I/O 并发! 当一个 Python 线程执行 I/O 操作(如等待网络响应、读写文件、等待定时器)时,它会主动释放 GIL,允许其他线程运行。这就是为什么多线程在 Python 中仍然常用于 I/O 密集型应用。

协程 (async/await) 与 asyncio

  • 工作原理:协程运行在单线程asyncio 事件循环上。当一个协程遇到 await 关键字,它会主动暂停自身执行,让出 CPU 控制权并释放 GIL。事件循环会立即调度其他准备就绪的协程运行。
  • 解决的问题高效处理高并发的 I/O 密集型任务。 当大量请求需要等待外部资源(如数据库、外部 API、网络延迟)时,CPU 不会空闲等待,而是可以切换去处理其他请求,极大提高了吞吐量响应速度
  • time.sleep() vs asyncio.sleep()
    • time.sleep(N)同步阻塞的,它会占用 CPU 和 GIL,导致整个事件循环卡住 N 秒,其他任何请求都无法处理。
    • await asyncio.sleep(N)异步非阻塞的,它会暂停协程,释放 CPU 和 GIL,让事件循环去处理其他任务。这使得多个 asyncio.sleep() 操作可以“同时”进行(因为它们各自的等待由操作系统并行计时),从而实现并发响应。
  • 对 CPU 密集型任务的局限:协程本身不能使 CPU 密集型任务并行。一个纯 CPU 密集型任务会一直霸占 CPU,直到完成,从而阻塞整个事件循环(因为没有 await 的机会让出控制权)。
    • 解决方案:对于 async def 函数中的 CPU 密集型任务,应使用 await asyncio.to_thread(sync_blocking_function) (Python 3.9+) 或 loop.run_in_executor() 将其放入单独的线程池中执行,从而不阻塞主事件循环。

FastAPI 的本质与并发机制

  • 本质:FastAPI 是一个用于构建 API 的现代、快速的 Web 框架。它基于 Python 类型提示,利用 Starlette(Web 部分)和 Pydantic(数据验证),并且是异步框架
  • 服务器:FastAPI 本身不是服务器,它需要一个 ASGI 服务器(如 Uvicorn)来运行。Uvicorn 负责监听网络请求并将其传递给 FastAPI 应用。
  • Uvicorn 的并发
    • Uvicorn 的每个工作进程内部都运行一个单线程的 asyncio 事件循环
    • 对于 async def 定义的路由函数,Uvicorn 能高效地调度协程,利用 await 实现高并发 I/O。
    • 对于 def 定义的同步路由函数,Uvicorn 会自动将它们放到一个内部的线程池中执行,以避免阻塞主事件循环,从而提供并发处理能力(但线程池有大小限制)。
    • 多进程模式:为了充分利用多核 CPU,生产环境中通常会启动多个 Uvicorn 工作进程 (uvicorn --workers N),每个进程都是一个独立的 Python 进程,各自拥有独立的事件循环和 GIL,从而实现真并行处理。
  • 测试并发性:手动刷新浏览器测试并发可能不准确,因为浏览器对同一域名的并发连接有限制。推荐使用 curl 或其他命令行工具、并发测试脚本来验证 FastAPI 的并发能力。

AI 模型训练与 Python 并发

  • 核心计算:AI 模型训练的核心计算(如矩阵乘法、卷积)是在 GPU(或 TPU/NPU) 等专用硬件上进行的。
  • GIL 的规避:当 Python 代码调用像 PyTorch、TensorFlow 这样的深度学习框架的底层 C++/CUDA 函数时,GIL 会被释放。这意味着 GPU 可以在后台并行计算,而 Python 线程可以同时处理其他任务(如数据 I/O)。
  • I/O 与预处理:数据加载和预处理(通常是 I/O 密集型)可以通过 Python 的多线程或异步 I/O 进行并发处理,甚至可以使用多进程 (multiprocessing) 来并行化数据加载,避免 GPU 等待数据。
  • Python 的优势:丰富的生态系统、高开发效率和强大的社区支持,使得 Python 成为 AI 领域的首选语言。

个人总结

python 程序的并发与并行

python 程序由于 GIL 的存在,会导致在同一时刻,只有一个 python 线程执行 python 字节码。因此,对于 CPU 来说,单个 python 程序应用是无法实现并行的。但是对于 I/O 密集型任务,或是网络通信等任务,在执行时,python 能够释放 GIL,允许其他线程运行,从而允许 python 程序通过多线程或协程的方式实现并发。
async 包装一个函数,这个函数就变成了一个协程函数,协程函数在执行时会遇到 await 关键字时主动让出控制权,允许其他协程运行,从而实现并发。而 asyncio 能够通过事件循环来调度协程的执行,它可以通过 asyncio.to_thread()loop.run_in_executor() 包装一个 def 同步函数,使其能够在单独的线程中执行,从而避免阻塞事件循环。但是如果这个被包装的函数是一个 CPU 密集型任务,由于它无法释放 GIL,那么它仍然会占用 CPU,导致事件循环无法调度其他协程。
对于 python 中协程最好的理解方式,就是将事件循环想象成一个摩天轮,每个协程就像是摩天轮上的一个座位,而每个车厢就相当于一个任务,用来包装这个协程。只有当协程/车厢运行到底部时,才真正进入主线程执行。而每台车厢虽然不由主线程执行,但它们可以通过其它线程来执行,来执行 IO 密集型,或是 http 请求等任务(这些可以真正并行的操作)。至于 CPU 密集型任务的多个事件,则是并发进行的。

协程辨析

需要注意的是,python 中并不会主动自发地将由 async defawait 关键字包装的协程函数放入线程池中运行,其本质是调用了 asyncio 提供的真正能调度任务、切换协程、执行非阻塞操作的能力。

反面案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class LocalFileClass:
def __init__(self, file):
self.file = file
self.path = data_root / "local" / file
self.ctime = time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(os.path.getctime(self.path))
)
self.size = os.path.getsize(self.path)

async def read(self):
return open(self.path, "rb")

async def write(self, data):
with open(self.path, "w") as f:
f.write(data)

async def delete(self):
os.remove(self.path)

async def exists(self):
return os.path.exists(self.path)

上述的代码,虽然看起来使用了 async def,但实际上并没有真正实现异步操作。因为 open()os.path.getctime()os.path.getsize() 等函数都是同步阻塞的操作,它们在运行时仍然会占用 GIL,而阻塞事件循环,导致无法实现真正的并发。

Python 中的线程间通信

而不同线程之间的通信,可以通过如下方式实现:

  • 共享变量:多个线程可以访问同一个全局变量,但需要注意线程安全问题,可以使用 threading.Lock 来保护共享变量。

    1
    2
    3
    4
    5
    from threading import Lock
    state_lock = Lock()

    with state_lock:
    shared_state["key"] = ch
  • 队列:使用 queue.Queue 来实现线程间的消息传递,线程可以将数据放入队列中,其他线程从队列中取出数据进行处理。

    1
    2
    3
    4
    5
    6
    7
    from queue import Queue
    queue = Queue()

    # 在一个线程中放入数据
    queue.put(data)
    # 在另一个线程中取出数据
    data = queue.get()
  • 事件:使用 threading.Event 来实现线程间的通知机制,一个线程可以设置事件,其他线程可以等待这个事件的发生。

    1
    2
    3
    4
    5
    6
    7
    from threading import Event
    event = Event()

    # 在一个线程中设置事件
    event.set()
    # 在另一个线程中等待事件
    event.wait()

asyncio 的常用函数

  • asyncio.run(coro): 在同步上下文中运行一个协程函数,方便的语法糖;但过多的使用会丢失异步的优势。
  • asyncio.to_thread(func, *args, **kwargs): 将一个同步函数放入线程池中执行,返回一个协程对象,适用于需要在协程中调用阻塞的同步函数。
  • asyncio.create_task(coro): 在事件循环中创建一个任务,相当于将这个任务扔到了后台执行,运行前台进行执行其它操作,而等到需要这个任务的结果时再去获取。
  • asyncio.gather(*coros): 并发运行多个协程,等待所有协程完成并返回结果。
  • asyncio.sleep(delay): 异步睡眠函数,非阻塞的等待指定时间,允许事件循环处理其他任务。

FastAPI 的并发与并行

FastAPI 本身是一个异步框架,但是后端服务本身并不由 FastAPI 提供,而是由 Uvicorn 这样的 ASGI 服务器提供。
在实际测试时发现,如果在 fastapi 的路由函数中设置一个 CPU 密集型任务,例如 time.sleep(10),会导致整个 fastapi 服务在执行该任务时无法响应其他请求。

1
2
3
4
5
6
7
8
9
10
@app.get("/items/{item_id}")
async def read_item(item_id: str):
# 记录请求时间
request_time = datetime.now().isoformat()
time.sleep(5)
if item_id not in items:
raise HTTPException(status_code=404, detail="Item not found")
# 记录返回时间
response_time = datetime.now().isoformat()
return {"item_id": item_id, "request_time": request_time, "response_time": response_time}

经过测试得到的响应结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 第一次请求
{
"item_id": "foo",
"request_time": "2025-05-29T17:31:30.202113",
"response_time": "2025-05-29T17:31:35.207623"
}
// 第二次请求
{
"item_id": "bar",
"request_time": "2025-05-29T17:31:35.209146",
"response_time": "2025-05-29T17:31:40.221867"
}

可以看到,第二次请求的响应时间是第一次请求的响应时间加上 5 秒,这说明在执行第一个请求时,fastapi 服务无法响应其他请求。
而如果将 time.sleep(5) 替换为 await asyncio.sleep(5),则可以实现并发响应。

1
2
3
4
5
6
7
8
9
10
11
@app.get("/items/{item_id}")
async def read_item(item_id: str):
# 记录请求时间
request_time = datetime.now().isoformat()
# time.sleep(5)
await asyncio.sleep(5)
if item_id not in items:
raise HTTPException(status_code=404, detail="Item not found")
# 记录返回时间
response_time = datetime.now().isoformat()
return {"item_id": item_id, "request_time": request_time, "response_time": response_time}

经过测试得到的响应结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
// 第一次请求
{
"item_id": "foo",
"request_time": "2025-05-29T17:32:24.459306",
"response_time": "2025-05-29T17:32:29.466886"
}
// 第二次请求
{
"item_id": "bar",
"request_time": "2025-05-29T17:32:25.904128",
"response_time": "2025-05-29T17:32:30.911707"
}

axios 通信规范

后端响应结构

axios 请求后端,响应的原始结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
// 1. 响应数据(服务器返回的实际数据)
data: { Response[T] },

// 2. 响应状态码(HTTP 状态码)
status: 200,

// 3. 响应状态信息(与状态码对应的文本描述)
statusText: "OK",

// 4. 响应头信息(服务器返回的HTTP头)
headers: {
"content-type": "application/json",
"date": "Wed, 01 Jan 2024 00:00:00 GMT",
// 其他头信息...
},

// 5. 请求配置信息(发送请求时的配置对象)
config: {
url: "/api/data",
method: "get",
headers: { ... }, // 请求头
params: { ... }, // URL参数
data: { ... }, // 请求体数据(POST/PUT等)
timeout: 0,
// 其他配置...
},

...
}

也就是说后端的路由函数中返回的实际内容是 response.data,首先会经过 axios 的响应拦截器,在拦截器中处理 2xx 之外的错误状态码。正常响应直接返回 response.data,也就是 Response[T]

其中 Response[T] 结构如下:

1
2
3
4
5
{
code: int = 200
message: str = "success"
data: T | None = None
}

api 层和 services 层规范

  • api 层直接调用后端接口,返回 response 对象,也就是 Response[T]
  • services 层通过调用 api 层获取后端数据,处理数据后返回给 UI 层。

后端数据库

当年我心高气傲,以为学了 SQLModel 就能平定后端数据库 ORM,事实证明,只掌握 SQLModel 文档中的内容是远远不够的。因为 SQLModel 只是一个轻量级的 ORM 框架,功能相对简单,很多复杂的数据库操作并没有直接支持。

SQLModel 和 SQLAlchemy 的区别

首先,SQLModel 是基于 SQLAlchemy 和 Pydantic 构建的一个轻量级 ORM 框架,主要设计理念是为了简化数据库模型的定义和操作。

在使用 SQLAlchemy 时,除了定义一个 ORM 模型类之外,往往还需要定义一个相应的 Pydantic 数据模型类,用于数据验证和序列化。而 SQLModel 则将这两者结合在一起,它所定义的继承于 SQLModel 的类既是 ORM 模型类,也是 Pydantic 数据模型类,从而简化了代码量。

但实际情况往往是,后端的 API 并不应该直接返回 SQLModel 定义的类实例,因为这些类实例通常包含了数据库的元信息(如主键、外键等),而这些信息并不适合直接暴露给前端。因此,实际开发中,往往还是需要定义单独的 Pydantic 模型类,例如 UserBase、UserCreate、UserOut 等等,用于 API 的请求和响应。所以,SQLModel 并没有完全替代 SQLAlchemy 和 Pydantic 的必要性。

异步数据库操作

除了上述的问题,我之所以说 SQLModel 不能满足真正的后端数据库需求,是因为 SQLModel 并没有提供对异步数据库操作的简化,这就导致了一个高并发的后端,往往实际使用的还是 SQLAlchemy 的异步操作。

高并发后端与无状态设计

我发现,就算学习了很多后端知识,以及各种优秀的后端架构,但是真正要设计一个高并发的后端系统,还是会遇到很多问题。

就当前我所掌握的知识水平,在设计后端时,往往会忽略掉无状态设计的重要性。而一个高并发的后端系统,往往需要设计成无状态的,这样才能更好地进行水平扩展。

什么是无状态设计

就拿当前我在实践的 AI chat 后端来说,目前的 data pipeline 设计是这样的:database <-> context <-> chat manager <-> providers

如果只是一个本地应用,上述的 pipeline 设计是够用的。但是,如果使用的用户很多,并且请求量很大,那么上述的设计就会暴露出很多问题。

  • 共享状态污染:由于当前的设计是单实例模式,也就是全局共享同一个 chat manager 实例,这就导致了不同用户的请求会共享同一个状态,从而导致状态污染的问题。例如,A用户选择的provider是google,但是在他想调用chat_manager.chat()时,B用户先一步操作,修改了chat_manager.self.provider为openai,导致A用户实际chat的provider变成了openai。

  • 扩展性差:由于当前的设计是单实例模式,这就导致了后端无法进行水平扩展,也就是无法通过增加服务器数量来提升系统的处理能力。

当时如果引入了无状态设计(stateless design)的理念,那么上述的问题就可以得到很好的解决。所谓的无状态设计,就是指每一个请求都是独立的,并且不依赖于之前的请求状态。也就是说,每一个请求都应该包含所有必要的信息,从而使得后端能够独立处理每一个请求,而不需要依赖于之前的请求状态。

最典型的一个示例,就是大多数学过后端都会设计的用户登录服务。后端并不会真正存储当前登录的用户对象,而是每次前端用户请求登录时,都会传递所有验证用户身份的必要数据,例如用户名和密码,后端通过验证这些数据来确定用户的身份,而不是依赖于之前的登录状态,并且在验证成功后,也不会改变后端本身的状态,只负责返回 token 等必要的信息。

如何实现无状态设计

要实现无状态设计,首先需要明确每一个请求所需要的所有信息,并且确保这些信息都是由前端传递过来的,而不是依赖于后端的状态。其次,需要设计好请求的处理流程,确保每一个请求都是独立的,并且不依赖于之前的请求状态。

以当前的 AI chat 后端为例,假设要实现无状态设计,那么可以将 chat manager 的状态信息全部放在请求中,例如:

1
2
3
4
5
6
7
{
"user_id": "12345",
"provider": "openai",
"model": "gpt-4",
"query": "你好,世界!",
"conversation_id": "uuid-xxxx-xxxx-xxxx"
}

所谓必要信息,就是指能够使得后端独立处理这个请求所需要的所有信息。这样一来,后端就不需要依赖于之前的请求状态,从而实现了无状态设计。但是也不是说所有状态信息都由前端传递,不然每次传输的数据量会很大,影响通信性能;而且更多的状态信息,也意味着更高的安全风险。前端应该传输的数据,只要能够使得后端独立处理请求所需要的最小信息即可。例如 tokenconversation_id 等设计。

至于在 AI chat 后端中,每个用户所对应的 conversation context 实际上是存储在数据库中的,前端通过传递的 user_idconversation_id 来唯一标识一个会话,从而让后端自己在数据库中加载对应的 conversation context,而不是依赖于后端的状态。

当然,实现一个无状态设计的后端系统,并不是一件容易的事情,需要考虑很多方面的问题,例如后端异步处理、IO效率、数据一致性等等。就像前文所说的,无状态后端,将会有很多的操作需要依赖于数据库的读写,而数据库的读写往往是后端系统的瓶颈,因此全异步的后端设计、以及高效的数据库设计,都是实现无状态设计的关键。

后端异常处理

在实际开发中,后端异常处理是一个非常重要的环节。一个健壮的后端系统,必须能够有效地处理各种异常情况,并且向前端返回有意义的错误信息。

最基本的异常处理,需要做到对所有可能出现的异常情况进行捕获,并且返回相应的错误信息。例如,数据库连接失败、数据验证失败、权限不足等等。如果异常能够被内部正常处理,就在后端内部解决,并打印日志记录;如果异常无法被后端内部解决,需要响应给前端用户,但是注意,不要暴露过多的内部信息,避免安全风险。

Python 哲学——Protocol/ABC

  • 之后补充完善

Protocol 作为接口使用,可以让代码更加灵活和可扩展。而 ABC 则更适合用于定义抽象基类,提供默认实现和强制子类实现某些方法。

Protocol 更加轻量级,适合用于定义简单的接口,而 ABC 则更加强大,适合用于存在复用代码需求的场景。

FastAPI 源码阅读

理解

说实话,其实感觉fastapi内部并没有什么特殊的魔法代码,大多数的代码都是很平实的实现,没有眼花缭乱的炫技,也没有让人读不懂的硬编码设计。

本质上fastapi框架主要负责的就是Router对象的创建,将用户的路径操作函数实例化为route对象并解析路径操作函数的各项参数;以及depends的解析执行,通过将多个依赖资源注册为Depends反向注入,并最终解析为Dependant依赖树,反向递归执行,最终生成Response响应结果;以及生成可视化的API文档,通过内部解析路径操作函数,生成对应的openapi.json,并结合swagger UI生成最终的UI文档

fastapi内部将整个API函数解析为对应的ASGIApp,并依赖starlette对接ASGI服务器,实现整体的ASGI功能;可视化文档则是由于内部实现了openapi协议,合理对接swaggerUI实现了对应功能。

内部大多数实现,不同于我自己主要的开发领域,更像是一种“更高级的解释器“;像是json库一样,不过fastapi解析的对象是路径操作函数,生成一个执行流程,主要依赖AsyncExitStack提供Depends的反向注入执行能力,内部将Depends解析为AsyncContextManager执行;以及解析API为openapi.json。更多体现的是其”编译解释”的能力。

反思

说实话,读完fastapi的源码,确实让我对于python的底层实现有了更加深刻的理解,对于fastapi内部的函数反射实现也有了具体的理解。

但是说实话,这些技巧大多数非常“高级”,但是又非常“底层”,如果是应用开发,感觉很少会有这种直接上手操作函数的需求,大多数也就是平铺实现,毕竟需要抽象的对象较少。

fastapi内部为了实现抽象,所以使用了大量高级的实现,特别像是AsyncExitStack来管理lifespan, depends;递归执行路径操作函数等;

虽然说,我读完源码不是没有收获,但总感觉这份收获有点”脱离“?我主要的方向依旧是web应用开发,而fastAPI的内容更加贴合库的实现,我依旧缺少对于”高级项目架构设计“的能力;此外,由于fastapi本质的ASGI能力是依赖于starlette实现的,所以我对于ASGI的理解也就只止步于ASGIApp对象生成了,对于scope, receive, send等的理解其实并不多,它们更像是某种黑盒,虽然我也确实目前没有深入理解他们的必要…

  • Title: FastAPI 中的并发/并行问题总结与源码阅读
  • Author: tada-zako
  • Created at : 2025-05-29 00:00:00
  • Updated at : 2026-03-05 14:37:43
  • Link: https://blog.tada-zako.top/2025/技术/fastapi_background/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments