FileCodeBox 项目源码阅读记录

tada-zako

项目介绍

  • 项目名:FileCodeBox
  • 项目定位:基于 Fastapi 的文件快递柜项目,带有管理面板

项目结构

FileCodeBox 项目目录结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
├─apps
│ │ __init__.py
│ │
│ ├─admin
│ │ dependencies.py # 管理面板依赖定义 鉴权逻辑实现
│ │ schemas.py # 管理面板 Pydantic 模型定义
│ │ services.py # 管理员文件/配置/本地文件服务实现
│ │ views.py # 管理员面板路由定义
│ │ __init__.py
│ │
│ └─base
│ │ dependencies.py # 基础的IP访问频率限制实现
│ │ models.py # 数据库模型定义
│ │ schemas.py # 主页面 Pydantic 模型定义
│ │ utils.py # 工具函数
│ │ views.py # 主页面路由定义
│ │ __init__.py
│ │
│ └─migrations # 数据库迁移脚本目录(自定义脚本 现代化推荐使用 Alembic 自动化工具)
│ migrations_001.py
│ migrations_002.py

├─core
│ database.py # 创建数据库 执行数据库迁移脚本
│ logger.py # 日志实例配置
│ response.py # 统一响应体封装
│ settings.py # 全局配置管理
│ storage.py # 文件存储接口定义和实现
│ tasks.py # 后台运行任务调用
│ utils.py # 部分工具函数
│ __init__.py

├─docs
│ │ changelog.md
│ │ contributing.md
│ │ index.md
│ │ package.json
│ │ pnpm-lock.yaml
│ │
│ ├─.vitepress
│ │ ...
│ │
│ ├─api
│ │ index.md
│ │
│ ├─en
│ │ ...
│ │
│ ├─guide
│ │ ...
│ │
│ └─public
|
├─main.py # FastAPI 应用入口 使用 SPA 单例模式传输前端

封装设计理念

统一响应体封装

核心思想

  • 基于 Pydantic 实现类型安全的响应体
  • 结合 response_model 自动进行响应内容的序列化和 api 文档生成
  • 使用异常处理器统一处理错误响应
  • 采用泛型(Generics)实现通用响应模型

定义带有泛型的统一响应模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
from typing import Generic, TypeVar, Any
from pydantic import BaseModel

T = TypeVar('T')

class ApiResponse(BaseModel, Generic[T]):
code: int = 200
message: str = "success"
data: T | None = None

# 添加静态方法简化创建响应体的过程
@classmethod
def success(cls, data: T | None = None, message: str = "success") -> "ApiResponse[T]":
return cls(code=200, message=message, data=data)

@classmethod
def error(cls, code: int, message: str = "error", data: Any = None) -> "ApiResponse[Any]":
return cls(code=code, message=message, data=data)

# 定义具体的响应模型
class UserBase(BaseModel):
id: int
username: str
email: str

class UserPublic(UserBase):
pass

路径操作函数设置 response_model

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from fastapi import APIRouter, Depends, HTTPException

from app.core.response import ApiResponse, UserPublic
from app.db.models.user import UserInDB
from app.crud.user_crud import get_user_by_id, get_all_users

router = APIRouter(prefix="/users", tags=["users"])

@router.get("/{user_id}", response_model=ApiResponse[UserPublic], summary="获取用户信息")
async def get_user(user_id: int):
# 这里返回的是 UserInDB 模型实例
user_db = await get_user_by_id(user_id)
if not user_db:
raise HTTPException(status_code=404, detail="User not found")
# # ERROR_1
# return UserPublic.model_validate(user_db)
# 注意:这里必须直接返回一个 ApiResponse 实例
# response_model 可以过滤 pydantic 模型中多余的属性,但是不能直接将 UserInDB 实例转换为 UserPublic 实例
# # ERROR_2
# return ApiResponse.success(data=UserPublic.model_validate(user_db))
# 这里必须先将 UserInDB 实例转换为 UserPublic 实例
# user_public = UserPublic.model_validate(user_db)
# model_validate 并不能用于将 Pydantic 模型实例转换为另一个模型实例
# 而是用于将字典、JSON或同类数据转换为 Pydantic 模型实例
user_public = UserPublic(**user_db.model_dump()) # pydantic.v2 会自动忽略多余的属性
return ApiResponse.success(data=user_public)


@router.get("/", response_model=ApiResponse[list[UserPublic]], summary="获取所有用户")
async def get_users():
users_db = await get_all_users()
users_public = [UserPublic(**user_db.model_dump()) for user_db in users_db]
return ApiResponse.success(data=users_public)

统一处理异常

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
from fastapi import FastAPI, Request, status, HTTPException
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError

from app.core.response import ApiResponse

app = FastAPI()


@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
return JSONResponse(
status_code=exc.status_code,
content=ApiResponse.error(
code=exc.status_code,
content=ApiResponse.error(
code=exc.status_code,
message=exc.detail
).model_dump()
)
)


@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
first_error_mst = exc.errors()[0].get("msg", "Validation error")
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=ApiResponse.error(
code=status.HTTP_422_UNPROCESSABLE_ENTITY,
message=f"Validation error: {first_error_mst}",
data=exc.errors()
).model_dump()
)


@app.exception_handler(Exception)
async def generic_exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=ApiResponse.error(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=ApiResponse.error(
code=status.HTTP_500_INTERNAL_SERVER_ERROR,
message="服务器内部错误"
).model_dump()
)
)

管理全局系统配置属性

Gemini 提供的回答:

你对 Settings 类的分析非常准确,并且你指出的几个缺点(缺乏 IDE 提示、无法防止引入多余属性、没有类型验证)都非常到位。

分析这个 Settings 类的设计

这个 Settings 类使用 __getattr____setattr__ 魔术方法,实现了一种 动态属性访问 的方式。它尝试提供一种灵活的、类似字典又类似对象的配置访问方式,并且支持默认值和用户配置的合并。

优点:

  • 灵活的属性访问:可以通过 settings.some_key 像访问对象属性一样访问配置,而不是 settings['some_key']
  • 默认值与用户配置合并:优雅地处理了默认配置和用户自定义配置的层级关系,用户配置会覆盖默认配置。
  • 一定程度的封装:将配置的存储和访问逻辑封装在一个类中。

缺点(你提出的,也是关键问题):

  1. 缺乏 IDE 内联提示 (Autocompletion):这是最大的痛点。IDE 无法静态分析 __getattr____setattr__ 内部的逻辑,因此在输入 settings. 后,无法提供可用的配置项列表。这极大地降低了开发效率,并增加了出错的可能性(比如拼写错误)。
  2. 无法防止引入多余属性:由于 __setattr__ 会将所有非 default_configuser_config 的属性直接写入 user_config 字典,这意味着你可以随意设置 settings.foobar = "something",即使 foobar 不是一个合法的配置项。这可能导致配置项的混乱和难以排查的错误。
  3. 没有类型验证:配置项的值没有类型约束。你可以将期望是整数的配置项赋值为字符串,例如 settings.port = "8000",这会引发运行时错误,而不是在开发阶段就被捕获。
  4. 运行时错误风险:如果尝试访问一个不存在的配置项,只有在运行时才会抛出 AttributeError,而不是在编码阶段得到提示。
这是否是一个合理的设计?

对于一个现代的、需要高可维护性、可测试性、团队协作以及避免运行时错误的 Python 后端项目来说,这个 Settings 不是一个合理的设计

尤其在 FastAPI 这样的框架中,它强调类型提示、数据验证和自动文档生成。这个 Settings 类的设计与 FastAPI 推崇的现代 Python 开发范式是相悖的。

更好的替代方案:Pydantic Settings

FastAPI 的作者创建了 Pydantic Settings (现在已经集成到 Pydantic v2 的 pydantic-settings 模块中),它就是为解决这类问题而生,并且是现代 FastAPI 项目中管理配置的标准和推荐方式

Pydantic Settings 的优势完美解决了你提出的所有问题:

  1. IDE 内联提示 (Autocompletion):Pydantic 模型是基于类的定义,IDE 可以完美识别其属性,提供全面的代码补全。
  2. 防止引入多余属性:Pydantic 模型默认是严格的,你不能随意添加未在类中定义的属性。尝试设置一个未定义的属性会报错。
  3. 强大的类型验证:Pydantic 会在启动时(或加载配置时)自动验证所有配置项的类型。如果环境变量或配置文件中的值类型不匹配,会立即报错,而不是在运行时引发意外。
  4. 默认值管理:可以直接在 Pydantic 模型中定义默认值。
  5. 多来源配置加载:可以从环境变量、.env 文件、INI 文件、JSON 文件等多种来源加载配置,并支持优先级覆盖。
  6. 文档生成:Pydantic 模型本身就是自我文档化的。
  7. 与 FastAPI 无缝集成:FastAPI 的依赖注入可以直接使用 Pydantic Settings 实例。

Pydantic Settings 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# app/core/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import Field # 可以用来设置字段的额外信息

class Settings(BaseSettings):
# 配置项及其类型和默认值
APP_NAME: str = "My FastAPI App"
DEBUG_MODE: bool = False
DATABASE_URL: str = "sqlite:///./sql_app.db"
SECRET_KEY: str = "your-super-secret-key" # 建议从环境变量加载

# 如果需要更复杂的配置,比如端口可以从环境变量PORT加载,如果没有则默认为8000
PORT: int = Field(8000, env="PORT")

# 配置 Pydantic Settings 如何加载环境变量
model_config = SettingsConfigDict(
env_file=".env", # 从 .env 文件加载环境变量
env_file_encoding='utf-8', # .env 文件的编码
extra='ignore' # 忽略 .env 文件中未在模型中定义的变量
)

# 创建一个 Settings 实例,在应用启动时加载
settings = Settings()

# ----------------- 在其他模块中使用 -----------------
# 例如在 app/main.py
# from app.core.config import settings

# @app.get("/config")
# async def get_app_config():
# return {
# "app_name": settings.APP_NAME,
# "debug_mode": settings.DEBUG_MODE
# }

# 或者通过依赖注入
# from fastapi import Depends
# from app.core.config import Settings
# def get_settings() -> Settings:
# return Settings() # 每次请求都创建一个新实例或使用缓存的实例

# @app.get("/some_route")
# async def some_route(settings: Settings = Depends(get_settings)):
# return {"db_url": settings.DATABASE_URL}

配置项 & 权限的实现

简单的 IP 访问频率限制

实现思路

  • 将请求访问的 IP 记录到内存中,并记录每个 IP 的访问次数和访问时间戳
  • 每次请求时检查该 IP 在一定时间窗口内的访问次数,若访问次数是超过限制,则拒绝访问
  • 通过 FastAPI 的依赖注入机制,对于发送请求的 IP 地址进行检查

代码实现

IP 访问限制类实现
IPRateLimit 类的实现,基于 python3.9+ 版本的实现方式,适合于小型项目的 IP 访问频率限制。它使用内存字典来存储 IP 访问记录,并提供了检查和添加 IP 的方法。每次请求时,都会检查该 IP 是否超过访问限制,并在必要时抛出 HTTP 异常。
无法真正解决分布式 DDOS 攻击问题,但可以防止单个 IP 的恶意刷请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
from typing import Dict, Union
from datetime import datetime, timedelta
from fastapi import HTTPException, Request


class IPRateLimit:
def __init__(self, count: int, minutes: int):
"""初始化 IP 访问频率限制器"""
"""
ips: Dict[str, Dict[str, Union[int, datetime]]]
{
"ip_address": {
"count": int, # 访问次数
"time": datetime # 最后访问时间
}
}
"""
self.ips: Dict[str, Dict[str, Union[int, datetime]]] = {}
self.count = count
self.minutes = minutes

def check_ip(self, ip: str) -> bool:
"""检查 IP 是否超过访问限制"""
if ip not in self.ips:
# 如果 IP 不在记录中
return True

ip_info = self.ips[ip]

# 检查 IP 的访问时间是否在限制范围内
if ip_info["time"] + timedelta(minutes=self.minutes) < datetime.now():
self.ips.pop(ip)
return True

if ip_info["count"] >= self.count:
return False

# 如果 IP 在限制时间内且访问次数未超过限制
return True

def add_ip(self, ip: str) -> int:
"""添加 IP 访问记录"""
ip_info = self.ips.get(ip, {"count": 0, "time": datetime.now()})
ip_info["count"] += 1
self.ips[ip] = ip_info
return ip_info["count"]

async def remove_expired_ip(self) -> None:
"""移除过期的 IP 访问记录"""
now = datetime.now()
expiration = timedelta(minutes=self.minutes)
self.ips = {
ip: info
for ip, info in self.ips.items()
if info["time"] + expiration >= now
}

def __call__(self, request: Request) -> str:
ip = (
request.headers.get("X-Real-IP")
or request.headers.get("X-Forwarded-For")
or request.client.host
)
if not self.check_ip(ip):
raise HTTPException(status_code=423, detail="请求次数过多,请稍后再试")
return ip

使用示例
在实际使用 IPRateLimit 时,可以将其实例化并作为 FastAPI 的依赖注入到需要限制访问的路由中:

1
2
3
4
5
6
ip_limit = IPRateLimit(count=10, minutes=1)  # 每分钟最多允许 10 次请求

@app.post("/upload")
async def upload_file(ip: str = Depends(ip_limit)):
# 这时 __call__ 会被 FastAPI 自动调用
pass

这里的 ip_limitIPRateLimit 的实例,可以在 FastAPI 的依赖注入中使用。
由于 FastAPI 的核心机制:对于依赖注入传入的类实例,会自动调用其 __call__ 方法,进而可以在 __call__ 方法中获取请求的 IP 地址,并进行访问频率限制检查。

动态配置项管理

实现思路

动态配置项管理的实现目的是允许前端通过管理面板来修改和管理系统配置项,而不是直接修改代码或配置文件。

  • 使用 Pydantic 模型定义配置项
  • 使用数据库持久化存储配置项
  • 提供 API 接口供前端管理配置项
    相较于部署时固定所有配置项,部署时固定更推荐 .env 文件的方式来管理配置项。

代码实现

这里就不直接给出对应代码了,只是给出一个大概的思路和示例代码。

  • 配置项模型定义
1
2
3
4
5
6
7
8
9
10
11
from pydantic import BaseSettings

class Settings(BaseSettings):
app_name: str = "FileCodeBox"
debug_mode: bool = False
database_url: str = "sqlite:///./sql_app.db"
secret_key: str = "your-super-secret-key"

class Config:
env_file = ".env" # 从 .env 文件加载环境变量
env_file_encoding = 'utf-8' # .env 文件的编码

代码实现

响应 & 错误处理

变量类型检查

错误现象

有些时候,python 提供的变量类型检查并不足够智能,在经过某些逻辑判断之后,类型检查器可能不会按照预期自动收窄,例如如下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
async def get_code_file_by_code(code, check=True) -> tuple[bool, Union[FileCodes, str]]:
file_code = await FileCodes.filter(code=code).first()
if not file_code:
return False, "文件不存在"
if await file_code.is_expired() and check:
return False, "文件已过期"
return True, file_code


async def update_file_usage(file_code):
...


@share_api.get("/select/")
async def get_code_file(code: str, ip: str = Depends(ip_limit["error"])):
file_storage: FileStorageInterface = storages[settings.file_storage]()
has, file_code = await get_code_file_by_code(code)
if not has:
ip_limit["error"].add_ip(ip)

return APIResponse(code=404, detail=file_code)

await update_file_usage(file_code)
return await file_storage.get_file_response(file_code)

在上述代码中,get_code_file_by_code 函数返回的 file_code 可能是 FileCodes 实例,也可能是一个字符串(错误信息)。在 get_code_file 函数中,我们使用了 if not has: 来确保 file_storage.get_file_response(file_code) 这一行中将要使用的 file_code 是一个 FileCodes 实例。
然而,类型检查器可能无法正确收窄变量的类型,并报错:无法将“FileCodes | str”类型的参数分配给函数“get_file_response”中类型为“FileCodes”的参数“file_code”。但作为开发者,我们这里已经控制了流程,所以这里的报错是多余的。

常用解决办法

  1. 使用 assert 类型断言:在调用 get_file_response 之前,手动断言 file_code 的类型。
1
2
3
4
5
6
7
8
9
10
11
12
@share_api.get("/select/")
async def get_code_file(code: str, ip: str = Depends(ip_limit["error"])):
file_storage: FileStorageInterface = storages[settings.file_storage]()
has, file_code = await get_code_file_by_code(code)
if not has:
ip_limit["error"].add_ip(ip)

return APIResponse(code=404, detail=file_code)

await update_file_usage(file_code)
assert isinstance(file_code, FileCodes), "file_code must be an instance of FileCodes"
return await file_storage.get_file_response(file_code)

但是由于 assert 语句的设计初衷是为了 调试和开发阶段的内部一致性检查,所以在生产环境下,assert 语会有几个缺点:

  • 被优化掉:在 Python 的优化模式下(使用 -O 选项运行),所有的 assert 语句都会被忽略,这可能导致在生产环境中无法捕获到错误。
  • 不抛出异常:如果 assert 失败,它会抛出 AssertionError,这不是一个明确的类型错误,可能导致错误信息不够清晰。
  • 性能开销:虽然 assert 的性能开销通常很小,但在高性能要求的代码中,频繁使用 assert 可能会影响性能。
  1. 使用 typing.cast():使用 typing.cast 可以明确告诉类型检查器变量的实际类型。
1
2
3
4
5
6
7
8
9
10
11
12
@share_api.get("/select/")
async def get_code_file(code: str, ip: str = Depends(ip_limit["error"])):
file_storage: FileStorageInterface = storages[settings.file_storage]()
has, file_code_union = await get_code_file_by_code(code)
if not has:
ip_limit["error"].add_ip(ip)

return APIResponse(code=404, detail=file_code_union)

file_code: FileCodes = cast(FileCodes, file_code_union)
await update_file_usage(file_code)
return await file_storage.get_file_response(file_code)

使用 typing.cast() 的好处是:

  • 明确类型:它可以清晰地告诉类型检查器变量的实际类型,而不需要依赖 assert 语句。
  • 不影响运行时cast() 只在类型检查阶段起作用,在运行时不会引入额外的开销或逻辑。
  • 兼容性好:它可以在生产环境中正常工作,不会被优化掉。
  1. 使用 # type: ignore 注释(不推荐):如果确定某个变量的类型是正确的,但类型检查器仍然报错,可以使用 # type: ignore 注释来忽略该行的类型检查错误。

当然,如果追求更加严格的类型检查和代码质量,还是应该通过显式的类型检查和条件判断来确保变量的类型正确,而不是简单地忽略类型检查错误。

文件传输/下载相关

切片上传/文件秒传/断点续传

基础概念

  • 切片上传:将大文件分割成多个小块(切片)进行上传,避免一次性上传大文件导致的网络不稳定或超时问题。
  • 文件秒传:通过计算文件的哈希值(如 MD5、SHA256)来判断文件是否已经存在于服务器上,如果存在则直接返回已存在的文件信息,而不需要重新上传。
  • 断点续传:在上传过程中,如果网络中断或其他原因导致上传失败,可以从上次上传的切片位置继续上传,而不是从头开始。

UML时序图

代码实现

请求切片上传文件

首先由前端发起切片上传的请求,后端接受请求后,判断请求的文件是否已经存在于服务器上,如果存在则直接返回已存在的文件信息,否则返回已经存在的切片信息(如果有的话),并开始上传新的切片。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@chunk_api.post("/upload/init/", dependencies=[Depends(share_required_login)])
async def init_chunk_upload(data: InitChunkUploadModel):
# # 秒传检查
# existing = await FileCodes.filter(file_hash=data.file_hash).first()
# if existing:
# if await existing.is_expired():
# file_storage: FileStorageInterface = storages[settings.file_storage](
# )
# await file_storage.delete_file(existing)
# await existing.delete()
# else:
# return APIResponse(detail={
# "code": existing.code,
# "existed": True,
# "name": f'{existing.prefix}{existing.suffix}'
# })

# 创建上传会话
upload_id = uuid.uuid4().hex
total_chunks = (data.file_size + data.chunk_size - 1) // data.chunk_size
await UploadChunk.create(
upload_id=upload_id,
chunk_index=-1,
total_chunks=total_chunks,
file_size=data.file_size,
chunk_size=data.chunk_size,
chunk_hash=data.file_hash,
file_name=data.file_name,
)
# 获取已上传的分片列表
uploaded_chunks = await UploadChunk.filter(
upload_id=upload_id,
completed=True
).values_list('chunk_index', flat=True)
return APIResponse(detail={
"existed": False,
"upload_id": upload_id,
"chunk_size": data.chunk_size,
"total_chunks": total_chunks,
"uploaded_chunks": uploaded_chunks
})

上述被注释的代码实现了文件的「秒传功能」,通过上传文件的哈希值(file_hash)来判断文件是否已经存在于服务器上。如果文件存在且未过期,则直接返回已存在的文件信息。
如果文件不存在或已过期,则创建一个新的上传会话,并返回上传会话的 ID(upload_id)、切片大小(chunk_size)、总切片数(total_chunks)以及已上传的切片索引列表(uploaded_chunks)。

上传切片

前端在确认后端返回的切片上传响应后,经过调整:跳过已经上传的片段或是确认不需要重复上传,开始向后端发送切片文件上传的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@chunk_api.post("/upload/chunk/{upload_id}/{chunk_index}", dependencies=[Depends(share_required_login)])
async def upload_chunk(
upload_id: str,
chunk_index: int,
chunk: UploadFile = File(...),
):
# 获取上传会话信息
chunk_info = await UploadChunk.filter(upload_id=upload_id, chunk_index=-1).first()
if not chunk_info:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="上传会话不存在")

# 检查分片索引有效性
if chunk_index < 0 or chunk_index >= chunk_info.total_chunks:
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="无效的分片索引")

# 读取分片数据并计算哈希
chunk_data = await chunk.read()
chunk_hash = hashlib.sha256(chunk_data).hexdigest()

# 更新或创建分片记录
await UploadChunk.update_or_create(
upload_id=upload_id,
chunk_index=chunk_index,
defaults={
'chunk_hash': chunk_hash,
'completed': True,
'file_size': chunk_info.file_size,
'total_chunks': chunk_info.total_chunks,
'chunk_size': chunk_info.chunk_size,
'file_name': chunk_info.file_name
}
)
# 获取文件路径
_, _, _, _, save_path = await get_chunk_file_path_name(chunk_info.file_name, upload_id)
# 保存分片到存储
storage = storages[settings.file_storage]()
await storage.save_chunk(upload_id, chunk_index, chunk_data, chunk_hash, save_path)
return APIResponse(detail={"chunk_hash": chunk_hash})

前端循环请求上述 api 接口,上传每个切片的文件数据。

完成切片上传

当所有切片上传完成后,前端会发送一个完成上传的请求,后端会检查所有切片是否都已上传完成,并将切片合并成一个完整的文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@chunk_api.post("/upload/complete/{upload_id}", dependencies=[Depends(share_required_login)])
async def complete_upload(upload_id: str, data: CompleteUploadModel, ip: str = Depends(ip_limit["upload"])):
# 获取上传基本信息
chunk_info = await UploadChunk.filter(upload_id=upload_id, chunk_index=-1).first()
if not chunk_info:
raise HTTPException(status.HTTP_404_NOT_FOUND, detail="上传会话不存在")

storage = storages[settings.file_storage]()
# 验证所有分片
completed_chunks = await UploadChunk.filter(
upload_id=upload_id,
completed=True
).count()
if completed_chunks != chunk_info.total_chunks:
raise HTTPException(status.HTTP_400_BAD_REQUEST, detail="分片不完整")
# 获取文件路径
path, suffix, prefix, _, save_path = await get_chunk_file_path_name(chunk_info.file_name, upload_id)
# 合并文件并计算哈希
await storage.merge_chunks(upload_id, chunk_info, save_path)
# 创建文件记录
expired_at, expired_count, used_count, code = await get_expire_info(data.expire_value, data.expire_style)
await FileCodes.create(
code=code,
file_hash=chunk_info.chunk_hash,
is_chunked=True,
upload_id=upload_id,
size=chunk_info.file_size,
expired_at=expired_at,
expired_count=expired_count,
used_count=used_count,
file_path=path,
uuid_file_name=f"{prefix}{suffix}",
prefix=prefix,
suffix=suffix
)
# 清理临时文件
await storage.clean_chunks(upload_id, save_path)
return APIResponse(detail={"code": code, "name": chunk_info.file_name})

要点分辨

文件的断点续传,分为下载端断点续传上传端断点续传。其中,上述代码体现的是上传端断点续传的实现方式。
而对于下载端断点续传,则是通过 HTTP 协议的 Range 请求头来实现的。后端需要支持 Range 请求,并根据请求头返回指定范围的文件内容。fastapi.responses.FileResponse 默认支持 Range 请求,能自动实现断点续传、流式传输等功能。前端推荐直接使用浏览器的原生下载功能 window.open() 或者 a 标签的 download 属性来自动实现断点续传;手动通过 fetchXMLHttpRequest 实现下载时,难度较大,需要手动处理 Range 请求和响应。

服务器本地 Web 文件存储(Local File Storage)

方案设计

  • 存储方式:将文件存储在服务器本地的指定目录下。
  • 文件存储:带日期分层 + UUID 随机命名目录 + 保留原始文件名

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async def get_file_path_name(file: UploadFile) -> Tuple[str, str, str, str, str]:
"""获取文件路径和文件名"""
today = datetime.datetime.now()
storage_path = settings.storage_path.strip("/") # 移除开头和结尾的斜杠
file_uuid = uuid.uuid4().hex
filename = await sanitize_filename(file.filename)
# 使用 UUID 作为子目录名
base_path = f"share/data/{today.strftime('%Y/%m/%d')}/{file_uuid}"

# 如果设置了存储路径,将其添加到基础路径中
path = f"{storage_path}/{base_path}" if storage_path else base_path

prefix, suffix = os.path.splitext(filename)
# 保持原始文件名
save_path = f"{path}/{filename}"
return path, suffix, prefix, filename, save_path

文件路径示例:share/data/2025/06/12/a1b2c3d4e5f67890abcdef1234567890/test.txt

S3 兼容存储(S3 Compatible Storage)

方案设计

  • 存储方式:使用 S3 兼容的对象存储服务(如 MinIO、AWS S3 等)。
  • 文件存储:带日期分层 + UUID 随机命名目录 + 保留原始文件名
  • 文件上传:对于轻量级文件上传操作,使用 s3 提供的 aioboto3 库内部的 put_object 方法进行文件异步上传。
  • 切片文件上传:对于大文件上传,使用 s3 提供的 aioboto3 库内部的 create_multipart_upload 方法进行分片上传。
  • 文件下载:使用 s3 提供的 aioboto3 库内部的 get_object 方法进行文件异步下载。
  • 手动流式输出:对于大文件下载,使用 get_object 方法获取文件流,并通过 FastAPI 的 StreamingResponse 进行流式输出。
  • 手动分片传输:对于大文件下载,还需要支持文件分片传输,对于前端请求中的 Range 请求头进行处理,返回指定范围的文件内容。

代码实现

小文件上传

对于体积不大的文件,S3 兼容存储可以直接使用 put_object 方法进行异步上传。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import aioboto3

async def save_file(self, file: UploadFile, save_path: str):
async with self.session.client(
"s3",
endpoint_url=self.endpoint_url,
aws_session_token=self.aws_session_token,
region_name=self.region_name,
config=Config(signature_version=self.signature_version),
) as s3:
await s3.put_object(
Bucket=self.bucket_name,
Key=save_path,
Body=await file.read(),
ContentType=file.content_type,
)

此处 s3.put_object 方法中的 Key 参数指定了文件在 S3 中的存储路径,并且存储路径的结构同上述的 get_file_path_name 方法生成的路径一致。

大文件上传(切片上传)

对于大文件上传,S3 兼容存储需要使用分片上传的方式。首先创建一个多部分上传会话,然后逐个上传切片,最后完成上传。

  • 创建分片上传会话

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    async def init_upload_chunk(self, save_path: str):
    async with self.session.client(
    "s3",
    endpoint_url=self.endpoint_url,
    aws_session_token=self.aws_session_token,
    region_name=self.region_name,
    config=Config(signature_version=self.signature_version),
    ) as s3:
    response = await s3.create_multipart_upload(
    Bucket=self.bucket_name,
    Key=save_path,
    )
    return response['UploadId']
  • 上传切片

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    async def upload_chunk(self, upload_id: str, chunk_index: int, chunk_data: bytes, save_path: str):
    async with self.session.client(
    "s3",
    endpoint_url=self.endpoint_url,
    aws_session_token=self.aws_session_token,
    region_name=self.region_name,
    config=Config(signature_version=self.signature_version),
    ) as s3:
    response = await s3.upload_part(
    Bucket=self.bucket_name,
    Key=save_path,
    PartNumber=chunk_index + 1, # S3 的分片编号从 1 开始
    UploadId=upload_id,
    Body=chunk_data,
    )
    # 记录分片信息
    self.parts.append({
    'ETag': response['ETag'],
    'PartNumber': chunk_index + 1
    })
  • 完成分片上传

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    async def complete_upload_chunk(self, upload_id: str, save_path: str):
    async with self.session.client(
    "s3",
    endpoint_url=self.endpoint_url,
    aws_session_token=self.aws_session_token,
    region_name=self.region_name,
    config=Config(signature_version=self.signature_version),
    ) as s3:
    await s3.complete_multipart_upload(
    Bucket=self.bucket_name,
    Key=save_path,
    UploadId=upload_id,
    MultipartUpload={'Parts': self.parts}
    )

上述展示的文件分片上传代码只是简单演示,实际使用时需要考虑更多的异常处理和边界情况,例如:

  • 分片上传失败时的重试机制
  • 分片上传过程中网络中断的处理
    FileCodeBox 项目中,S3 兼容存储的分片上传对于「中断处理」是在 api 调用时处理的,上述代码只负责切片上传的相关逻辑。分片上传的记录和状态管理是通过 UploadChunk 模型来实现的。

注意分辨要点:基于 AI 的生成式回复,S3 兼容存储的分片上传是需要通过调用 create_multipart_upload 方法来创建上传会话的,且不支持手动设置 UploadId 参数以调用 upload_part 进行分片上传的。所以,在 FileCodeBox 项目中,S3 兼容存储的分片上传相关逻辑可能存在错误。

手动流式输出/分片传输配置

由于 FileCodeBox 项目中的 S3 兼容存储文件下载逻辑只简单的通过

1
2
3
async with aiohttp.ClientSession() as session:
async with session.get(link) as resp:
tmp.write(await resp.read())

来获取文件内容并写入临时文件,然后通过 Response 返回文件内容,没有实现流式输出和分片传输,并且由于一次性将所有的文件内容读取到内存中,可能会导致内存占用过高的问题。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
async def get_file_response(self, file_code: FileCodes, request: Request = None):
try:
async with self.session.client(
"s3",
endpoint_url=self.endpoint_url,
aws_session_token=self.aws_session_token,
region_name=self.region_name,
config=Config(signature_version=self.signature_version),
) as s3:
# 获取文件元数据
obj_head = await s3.head_object(
Bucket=self.bucket_name,
Key=await file_code.get_file_path(),
)
file_size = obj_head['ContentLength']

# 创建变量
start = 0
end = file_size - 1
status_code = 200
content_range_header = None
s3_range_param = None

# 检查请求头中的 Range 字段
range_header = request.headers.get('Range')
range_match = re.match(r'bytes=(\d+)-(\d*)', range_header or "")

if range_match:
start = int(range_match.group(1))
end = int(range_match.group(2)) if range_match.group(2) else file_size - 1
# 检查范围是否有效
if start < 0 or start >= file_size or start > end:
raise HTTPException(status_code=416, detail="Requested Range Not Satisfiable")
if end >= file_size:
end = file_size - 1

content_range_header = f'bytes {start}-{end}/{file_size}'
status_code = 206 # Partial Content
s3_range_param = f'bytes={start}-{end}'

response = await s3.get_object(
Bucket=self.bucket_name,
Key=await file_code.get_file_path(),
Range=s3_range_param # 如果有 Range 参数,则传递给 S3
)

file = response['Body']

# 创建可迭代的异步生成器
async def file_stream():
while True:
chunk = await file.read(8192) # 每次读取 8KB
if not chunk:
break
yield chunk

# 设置响应头
headers = {
'Content-Disposition': f'attachment; filename="{file_code.uuid_file_name}"',
'Accept-Ranges': 'bytes',
}

if content_range_header:
headers['Content-Range'] = content_range_header
headers['Content-Length'] = str(end - start + 1)
else:
headers['Content-Length'] = str(file_size)

return StreamingResponse(
file_stream(),
status_code=status_code,
media_type="application/octet-stream",
headers=headers,
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"文件下载失败: {str(e)}")

由于需要处理 Range 请求头,所以上述方法在实现时,需要外部额外传入 request 参数,以获取请求头中的 Range 字段。在 FileCodeBox 项目中,由于面向接口实现的底层设计,导致无法直接传入 request 参数。

测试代码

架构设计

架构模式

  1. 单体架构(Monolithic Architecture):
    • 特点:所有功能打包在一个应用中。
    • 优点:开发简单,部署方便,初期成本低。
    • 缺点:难以扩展,维护复杂,团队协作困难,技术栈锁定。
    • 适用场景:初期小项目、创业公司,需求不复杂且变化不频繁。
  2. 微服务架构(Microservices Architecture):
    • 特点:将应用拆分成多个小型、独立部署的服务,通过 API 通信。
    • 优点:高内聚低耦合,易于扩展和维护,技术栈自由选择,团队独立开发。
    • 缺点:分布式系统复杂性高(数据一致性、服务发现、通信),运维成本高。
    • 适用场景:复杂的大型项目、需要高并发和高扩展性、大型团队协作。
  3. 前后端分离(Separate Frontend/Backend):
    • 特点:前端负责 UI 和交互,后端负责数据和业务逻辑,通过 API 通信。
    • 优点:团队并行开发,技术栈独立,部署独立,便于多端支持。
    • 缺点:增加协调成本,部署和调试可能更复杂。
    • 适用场景:几乎所有现代 Web 应用,尤其适用于需要丰富交互和多端支持的场景。
  4. SPA (Single-Page Application) / MPA (Multi-Page Application):
    • SPA:单页面应用,首次加载后,后续只更新部分内容。
    • MPA:多页面应用,每次导航都重新加载整个页面。
    • 选择:如果你注重用户体验、流畅交互,倾向 SPA;如果注重 SEO、首次加载速度(无 SSR),内容型网站,倾向 MPA 或 SSR/SSG。
  5. SSR (Server-Side Rendering) / SSG (Static Site Generation) / CSR (Client-Side Rendering):
    • CSR(客户端渲染):纯前端渲染,SEO 和首次加载可能差。
    • SSR(服务端渲染):服务器预渲染 HTML,利于 SEO 和首屏加载,但服务器压力大。
    • SSG(静态站点生成):构建时生成所有 HTML 文件,部署到 CDN,极致性能和 SEO。
    • 选择:取决于对 SEO、性能、实时性、开发复杂度的权衡。

FileCodeBox 前后端部署集成模式

前言

FileCodeBox 项目采用了 SPA + 后端静态文件托管 + API 服务 的架构模式。

架构代码

FileCodeBox 项目通过 FastAPI 的静态文件托管功能,将前端构建后的静态资源(HTML、CSS、JavaScript)部署在后端服务器上。用户通过访问后端服务路径,得到前端应用入口页面(index.html),并通过 fastapi 的 StaticFiles 提供静态资源(例如 /assets
内的 .js, .css)服务。
后端服务根路径操作函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@app.exception_handler(404)
@app.get("/")
async def index(request=None, exc=None):
return HTMLResponse(
content=open(
BASE_DIR / f"{settings.themesSelect}/index.html", "r", encoding="utf-8"
)
.read()
.replace("{{title}}", str(settings.name))
.replace("{{description}}", str(settings.description))
.replace("{{keywords}}", str(settings.keywords))
.replace("{{opacity}}", str(settings.opacity))
.replace('"/assets/', '"assets/')
.replace("{{background}}", str(settings.background)),
media_type="text/html",
headers={"Cache-Control": "no-cache"},
)

通过 .replace() 方法替换 HTML 模板中的变量,生成最终的 HTML 内容。

前端静态资源托管

1
2
3
4
5
6
app.mount(
"/assets",
StaticFiles(directory=f"./{settings.themesSelect}/assets"),
name="assets",
)

前端构建后的静态资源目录(例如 ./themes/default/assets)挂载到 /assets 路径下,用户可以通过访问 /assets/xxx.js/assets/xxx.css 来获取前端资源。

特殊技巧

数据库迁移

Alembic 的使用

使用 Alembic 进行数据库迁移(脚本执行操作)

CI/CD 流程中的数据库迁移

单例模式

使用 __new__ 实现单例模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class SingletonMeta:
_instance = None

def __new__(cls, *args, **kwargs):
if not cls._instance:
cls._instance = super().__new__(cls)
return cls._instance

def __init__(self, name="Default Singleton"):
if not hasattr(self, '_initialized'):
self.name = name
self._initialized = True
else:
pass

使用装饰器实现单例模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import threading

def singleton(cls):
_instance = {}
_lock = threading.Lock()

def get_instance(*args, **kwargs):
with _lock:
if cls not in _instance:
_instance[cls] = cls(*args, **kwargs)
return _instance[cls]
return get_instance

@singleton
class MySingleton:
def __init__(self, name="Default Singleton"):
self.name = name

python 异步 IO 操作

python 内置的文件操作是同步的,会阻塞主进程的进行,而使用 asyncio 或者 aiofiles 可以实现异步的文件操作。

使用 asyncio.to_thread() 实现异步文件操作

asyncio.to_thread() 的使用,本质上是将同步操作放入线程池中运行,同时基于 IO 操作的并行能力,实现文件不阻塞主线程的异步操作。

1
2
3
4
5
6
7
8
import asyncio

async def write_file_async(filename: str, content: str):
await asyncio.to_thread(lambda: open(filename, "w").write(content))

async def read_file_async(filename: str) -> str:
content = await asyncio.to_thread(lambda: open(filename, "r").read())
return content

使用 aiofiles 实现异步文件操作

1
2
3
4
5
6
7
8
9
10
import aiofiles

async def write_file_async(filename: str, content: str):
async with aiofiles.open(filename, "w") as f:
await f.write(content)

async def read_file_async(filename: str) -> str:
async with aiofiles.open(filename, "r") as f:
content = await f.read()
return content

现代文件路径操作模块 pathlib

pathlib 是 Python 3.4 引入的一个模块,用于处理文件系统路径。它提供了面向对象的方式来操作文件路径,支持跨平台的路径操作。

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from pathlib import Path

# 创建一个 Path 对象
p = Path("/path/to/directory")

# 检查路径是否存在
if p.exists():
print("路径存在")
else:
print("路径不存在")

# 检查是否是文件或目录
if p.is_file():
print("这是一个文件")
elif p.is_dir():
print("这是一个目录")

# 获取文件名和扩展名
print("文件名:", p.name)
print("文件扩展名:", p.suffix)

# 拼接路径
new_file = get_fake_path() / p / "new_file.txt"

# 创建新目录
new_dir = p / "new_directory"
new_dir.mkdir(exist_ok=True) # 如果目录已存在则不会报错

# 删除文件
if new_file.exists():
new_file.unlink() # 删除文件

后端响应文件下载

注意点

  • 异常处理:如果文件不存在或已过期,抛出 HTTPException
  • 文件名编码:在 Content-Disposition 头中使用 quote 函数对文件名进行编码,以支持非 ASCII 字符。
  • 响应类型:使用 FileResponse 返回文件内容,确保正确设置响应头(即 Content-Disposition)。
  • Content-Disposition 格式
    • 使用 filename* 可以支持非 ASCII 字符的文件名。
    • 格式为 attachment; filename*=UTF-8''<encoded_filename>filename* 的格式是 charset'lang'encoded-value,其中 charset 设置成 UTF-8 一般够用, <encoded_filename> 是经过 URL 编码的文件名,可以调用 urllib.parse.quote 进行编码。

演示示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
async def get_file_response(self, file_code: FileCodes):
# 1. 获取文件在服务器上的实际物理路径
# 确保 file_code.get_file_path() 返回的是相对于 self.root_path 的安全路径
relative_path_from_code = await file_code.get_file_path()
file_path = self.root_path / relative_path_from_code

# 2. 检查文件是否存在
if not file_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="文件已过期或不存在"
)

# 3. 准备下载文件名
filename_for_download = f"{file_code.prefix}{file_code.suffix}"
# 确保文件名在 Content-Disposition 中正确编码,支持非ASCII字符
encoded_filename_for_headers = quote(filename_for_download, safe='')
content_disposition = f"attachment; filename*=UTF-8''{encoded_filename_for_headers}"

# 4. 返回 FileResponse
return FileResponse(
file_path,
headers={"Content-Disposition": content_disposition}
# filename=filename_for_download # 可以选择不传,避免冗余
)

PS: 该文内容实际上是很久之前写的了,可能存在一些理解错误和有误导的实现方式,后续也不一定会进行更新和修正😋。还望读者仔细甄别,千万不要被我带偏了…

  • Title: FileCodeBox 项目源码阅读记录
  • Author: tada-zako
  • Created at : 2025-05-26 00:00:00
  • Updated at : 2025-06-13 13:19:24
  • Link: https://blog.tada-zako.top/2025/源码/file-code-box/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments