第一个 FastAPI 应用

你的第一个 FastAPI 应用:从零开始

让我们一步步创建一个完整、可运行的 FastAPI 应用!🚀


📁 第一步:项目结构

my_first_fastapi/
├── main.py          # 主应用文件
├── requirements.txt # 依赖文件
└── README.md       # 项目说明

📦 第二步:安装依赖

# 创建项目目录
mkdir my_first_fastapi
cd my_first_fastapi

# 创建虚拟环境
python -m venv venv

# 激活虚拟环境
# Windows:
venv\Scripts\activate
# macOS/Linux:
source venv/bin/activate

# 安装 FastAPI 和 Uvicorn
pip install fastapi uvicorn[standard]

# 生成 requirements.txt
pip freeze > requirements.txt

🚀 第三步:创建主文件 main.py

"""
我的第一个 FastAPI 应用 🎉
功能:用户管理 API(CRUD)
"""

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime

# 创建 FastAPI 应用实例
app = FastAPI(
    title="我的第一个 FastAPI 应用",
    description="一个简单的用户管理系统 API",
    version="1.0.0",
    docs_url="/docs",  # Swagger UI 地址
    redoc_url="/redoc"  # ReDoc 地址
)

# Pydantic 模型 - 数据验证和序列化
class UserCreate(BaseModel):
    """创建用户时使用的模型"""
    username: str
    email: str
    age: int
    bio: Optional[str] = None

class UserResponse(BaseModel):
    """返回用户数据时的模型"""
    id: int
    username: str
    email: str
    age: int
    bio: Optional[str] = None
    created_at: datetime

    class Config:
        from_attributes = True  # 支持 ORM 模式

class UserUpdate(BaseModel):
    """更新用户时使用的模型"""
    username: Optional[str] = None
    email: Optional[str] = None
    age: Optional[int] = None
    bio: Optional[str] = None

# 模拟数据库 - 内存存储
fake_database: List[dict] = []
user_counter = 1

# 路由:首页
@app.get("/", tags=["首页"])
def read_root():
    """
    欢迎页面 - 你的第一个 FastAPI 端点!
    """
    return {
        "message": "🎉 欢迎来到你的第一个 FastAPI 应用!",
        "docs": "访问 /docs 查看交互式 API 文档",
        "version": "1.0.0",
        "endpoints": [
            "GET / - 当前页面",
            "GET /users/ - 获取所有用户",
            "POST /users/ - 创建新用户",
            "GET /users/{user_id} - 获取单个用户",
            "PUT /users/{user_id} - 更新用户",
            "DELETE /users/{user_id} - 删除用户"
        ]
    }

# 用户路由组
@app.get("/users/", tags=["用户"], response_model=List[UserResponse])
def get_users(skip: int = 0, limit: int = 10):
    """
    获取用户列表

    - **skip**: 跳过前 N 条记录(分页)
    - **limit**: 每页显示 N 条记录(默认 10)
    """
    users = fake_database[skip:skip + limit]
    return [UserResponse(**user) for user in users]

@app.post("/users/", tags=["用户"], response_model=UserResponse, status_code=201)
def create_user(user: UserCreate):
    """
    创建新用户

    **请求体示例**:
    ```json
    {
        "username": "alice",
        "email": "alice@example.com",
        "age": 25,
        "bio": "热爱编程的程序员"
    }
    ```
    """
    global user_counter

    # 检查用户名是否已存在
    if any(u["username"] == user.username for u in fake_database):
        raise HTTPException(
            status_code=400, 
            detail=f"用户名 '{user.username}' 已存在"
        )

    # 检查邮箱是否已存在
    if any(u["email"] == user.email for u in fake_database):
        raise HTTPException(
            status_code=400, 
            detail=f"邮箱 '{user.email}' 已注册"
        )

    # 创建新用户
    new_user = {
        "id": user_counter,
        "username": user.username,
        "email": user.email,
        "age": user.age,
        "bio": user.bio,
        "created_at": datetime.now()
    }

    fake_database.append(new_user)
    user_counter += 1

    return UserResponse(**new_user)

@app.get("/users/{user_id}", tags=["用户"], response_model=UserResponse)
def get_user(user_id: int):
    """
    获取单个用户信息

    - **user_id**: 用户 ID
    """
    for user in fake_database:
        if user["id"] == user_id:
            return UserResponse(**user)

    raise HTTPException(
        status_code=404, 
        detail=f"用户 ID {user_id} 不存在"
    )

@app.put("/users/{user_id}", tags=["用户"], response_model=UserResponse)
def update_user(user_id: int, user_update: UserUpdate):
    """
    更新用户信息

    **请求体示例**:
    ```json
    {
        "username": "alice_updated",
        "age": 26
    }
    ```
    """
    # 查找用户
    for user in fake_database:
        if user["id"] == user_id:
            # 更新字段(仅更新提供的字段)
            update_data = user_update.dict(exclude_unset=True)
            for field, value in update_data.items():
                if value is not None:
                    user[field] = value

            # 更新时间戳
            user["updated_at"] = datetime.now()
            return UserResponse(**user)

    raise HTTPException(
        status_code=404, 
        detail=f"用户 ID {user_id} 不存在"
    )

@app.delete("/users/{user_id}", tags=["用户"], status_code=204)
def delete_user(user_id: int):
    """
    删除用户

    返回 204 No Content(无响应体)
    """
    for i, user in enumerate(fake_database):
        if user["id"] == user_id:
            fake_database.pop(i)
            return None

    raise HTTPException(
        status_code=404, 
        detail=f"用户 ID {user_id} 不存在"
    )

# 健康检查端点
@app.get("/health", tags=["系统"])
def health_check():
    """系统健康检查"""
    return {
        "status": "healthy",
        "timestamp": datetime.now().isoformat(),
        "users_count": len(fake_database)
    }

# 404 处理
@app.exception_handler(404)
async def not_found_handler(request, exc):
    return {
        "error": "Not Found",
        "message": "请求的资源不存在",
        "path": request.url.path,
        "suggestion": "请检查 API 文档: /docs"
    }

# 启动信息(仅开发环境)
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)

🎯 第四步:运行应用

# 开发模式(推荐)
uvicorn main:app --reload

# 或者直接运行 Python 文件
python main.py

成功输出

INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [xxxxx] using WatchFiles
INFO:     Started server process [xxxxx]
INFO:     Waiting for application startup.
INFO:     Application startup complete.

🌐 第五步:测试你的 API

1. 访问首页

URL: http://127.0.0.1:8000
预期响应:

{
  "message": "🎉 欢迎来到你的第一个 FastAPI 应用!",
  "docs": "访问 /docs 查看交互式 API 文档",
  "version": "1.0.0",
  "endpoints": [
    "GET / - 当前页面",
    "GET /users/ - 获取所有用户",
    "POST /users/ - 创建新用户",
    "GET /users/{user_id} - 获取单个用户",
    "PUT /users/{user_id} - 更新用户",
    "DELETE /users/{user_id} - 删除用户"
  ]
}

2. 交互式文档(推荐!)

Swagger UI: http://127.0.0.1:8000/docs
ReDoc: http://127.0.0.1:8000/redoc

🎉 点击 “Try it out” 按钮,直接在浏览器中测试 API!

3. 使用 curl 测试

# 创建用户
curl -X POST "http://127.0.0.1:8000/users/" \
     -H "Content-Type: application/json" \
     -d '{
       "username": "alice",
       "email": "alice@example.com",
       "age": 25,
       "bio": "热爱编程的程序员"
     }'

# 获取所有用户
curl -X GET "http://127.0.0.1:8000/users/"

# 获取单个用户(假设 ID=1)
curl -X GET "http://127.0.0.1:8000/users/1"

# 更新用户
curl -X PUT "http://127.0.0.1:8000/users/1" \
     -H "Content-Type: application/json" \
     -d '{
       "age": 26,
       "bio": "FastAPI 爱好者"
     }'

# 删除用户
curl -X DELETE "http://127.0.0.1:8000/users/1"

🛠️ 第六步:项目文件完善

requirements.txt

fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0

README.md

# 我的第一个 FastAPI 应用

一个简单的用户管理系统,展示 FastAPI 的核心功能。

## 快速开始

bash

克隆项目

git clone

创建虚拟环境

python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate

安装依赖

pip install -r requirements.txt

启动应用

uvicorn main:app –reload

## API 文档

- Swagger UI: http://127.0.0.1:8000/docs
- ReDoc: http://127.0.0.1:8000/redoc

## 功能

- ✅ 用户创建 (POST /users/)
- ✅ 用户列表 (GET /users/)
- ✅ 用户详情 (GET /users/{id})
- ✅ 用户更新 (PUT /users/{id})
- ✅ 用户删除 (DELETE /users/{id})
- ✅ 自动 API 文档生成
- ✅ 数据验证和错误处理

🎉 恭喜!你已完成第一个 FastAPI 应用!

你学到了什么:

特性代码位置
应用创建app = FastAPI(...)
路径参数/users/{user_id}
查询参数skip: int = 0, limit: int = 10
请求体user: UserCreate
响应模型response_model=UserResponse
数据验证Pydantic 模型
错误处理HTTPException
自动文档/docs/redoc
标签分组tags=["用户"]

下一步学习建议:

主题关键词
数据库集成(SQLAlchemy + PostgreSQL)数据库
用户认证(JWT)认证
Docker 部署docker
测试(Pytest)测试
前端集成(HTML 模板)前端

现在,去浏览器访问 http://127.0.0.1:8000/docs,体验交互式 API 吧! 🎊

需要我解释某个特定功能的实现细节吗?或者想学习数据库集成?告诉我你的下一步!

类似文章

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注