Python ContextVar 与 asyncio:从 Go 开发者视角理解

2026-03-28 • 2.6分钟 • 1.0千字

作为一个 Go 开发者,初次接触 Python 的 asyncio 和 ContextVar 时,最大的困惑是:为什么 Python 的日志库不需要传 context.Context 参数就能串联 trace ID?这篇文章从 Go 的视角解释 Python 的并发模型和上下文传递机制。

Python 的”单线程”是什么意思

常听说 Python 是单线程,指的是 GIL (Global Interpreter Lock)

Go:     1 进程 → N 个 OS 线程 → M 个 goroutine(真并行)
Python: 1 进程 → N 个 OS 线程(但同一时刻只有 1 个执行 Python 字节码)

Python 可以创建多线程,但 GIL 保证同一时刻只有一个线程在执行 Python 代码。类比 Go:想象所有 goroutine 共享一把全局互斥锁,每执行一条指令都要持锁。

所以: - CPU 密集型:多线程没用,GIL 串行化了 - IO 密集型:有用,线程等 IO 时会释放 GIL

Python Web 服务的并发模型

FastAPI + uvicorn 走的是 asyncio 模型:

Go:
1 个进程 → M 个 OS 线程 → N 个 goroutine (M:N 模型,真并行)

Python asyncio:
1 个进程 → 1 个 OS 线程 → N 个 asyncio Task (1:N 模型,协作式切换)

关键理解:1 个线程 → 1 个 event loop → N 个 asyncio Task(通常 N = 成百上千)

实际验证

import asyncio
import threading

async def worker(name):
    print(f"{name}: thread_id = {threading.current_thread().ident}")
    await asyncio.sleep(0.1)
    print(f"{name}: still thread_id = {threading.current_thread().ident}")

async def main():
    # 创建 1000 个 Task
    tasks = [asyncio.create_task(worker(f"Task-{i}")) for i in range(1000)]
    await asyncio.gather(*tasks)

asyncio.run(main())

输出:所有 Task 的 thread_id 都相同 — 1000 个并发请求 = 1000 个 asyncio Task = 1 个线程

对比图

Go 的 goroutine:
┌─────────────────────────────────────┐
│  OS Thread 1  │  OS Thread 2  │ ... │  ← 多个线程真并行
├───────────────┼───────────────┼─────┤
│ goroutine A   │ goroutine D   │     │
│ goroutine B   │ goroutine E   │     │
│ goroutine C   │ goroutine F   │     │
└─────────────────────────────────────┘

Python asyncio:
┌─────────────────────────────────────┐
│         OS Thread 1 (唯一)           │  ← 只有 1 个线程
├─────────────────────────────────────┤
│ Task A ──► Task B ──► Task C ──► ...│  ← 串行切换
│   ↑await      ↑await      ↑await   │
└─────────────────────────────────────┘

关键区别: - async def = 定义 coroutine(类似 goroutine,但不是抢占式) - await = 主动让出执行权(类似 goroutine 遇到 channel/IO 时让出) - goroutine 是抢占式调度,coroutine 是协作式调度

ContextVar 的原理

ContextVar 不是绑定在线程上,而是绑定在 asyncio Task 上

与 Go context 的对比

// Go: context 必须显式传递
func handler(ctx context.Context) {
    traceID := ctx.Value("trace_id")
    doSomething(ctx)  // 必须传 ctx
}
# Python: ContextVar 隐式绑定到当前 Task
_trace_id = ContextVar("trace_id")

async def handler():
    # 不需要参数,直接读取当前 Task 的值
    trace_id = _trace_id.get()
    await do_something()  # 不需要传 ctx

asyncio Task 如何持有 ContextVar

每个 asyncio Task 内部持有一个 Context 对象(不可变字典):

# 简化版伪代码(真实实现在 CPython C 层)
class Context:
    """不可变字典,存储 ContextVar → value 映射"""
    def __init__(self, parent=None):
        # copy-on-write:从父 Context 拷贝
        self._data = parent._data.copy() if parent else {}

    def get(self, var):
        return self._data.get(var, var.default)

    def set(self, var, value):
        # 返回新 Context(不可变,copy-on-write)
        new_ctx = Context(parent=self)
        new_ctx._data[var] = value
        return new_ctx


class Task:
    """asyncio Task 对象"""
    def __init__(self, coro):
        self._coro = coro
        # 创建时捕获当前 Context(继承父 Task)
        self._context = contextvars.copy_context()  # ← 关键

    def __step(self):
        # 执行 coroutine 的每一步时,激活这个 Task 的 Context
        old_context = _current_context.get()
        _current_context.set(self._context)  # ← 切换上下文
        try:
            self._coro.send(None)  # 执行一步
        finally:
            _current_context.set(old_context)  # 恢复


class ContextVar:
    def __init__(self, name, default=None):
        self.name = name
        self.default = default

    def get(self):
        ctx = _current_context.get()  # 获取当前激活的 Context
        return ctx.get(self)

    def set(self, value):
        ctx = _current_context.get()
        new_ctx = ctx.set(self, value)
        _current_context.set(new_ctx)  # 更新当前 Context


# 全局变量:当前正在执行的 Context
_current_context: ContextVar[Context] = ContextVar("current_context")

核心机制: 1. 每个 Task 创建时,copy_context() 拷贝父 Task 的 Context 字典 2. event loop 调度 Task 时,切换 _current_context 指向该 Task 的 Context 3. ContextVar.get()_current_context 读值 4. ContextVar.set() 创建新 Context(copy-on-write)

执行时间线

同一个线程,微观串行:

t1: event loop 调度 Task A
    → _current_context = Task A._context ({"_trace_id": "abc"})
    → Task A 执行: _trace_id.get() → 读到 "abc"
    → Task A 遇到 await,让出 CPU

t2: event loop 调度 Task B
    → _current_context = Task B._context ({"_trace_id": "def"})  ← 切换了
    → Task B 执行: _trace_id.get() → 读到 "def"
    → Task B 遇到 await,让出 CPU

t3: event loop 再次调度 Task A
    → _current_context = Task A._context ({"_trace_id": "abc"})  ← 切回来
    → Task A 继续: _trace_id.get() → 还是 "abc"

ContextVar 是全局变量,但值是 per-Task 的

# config/logger.py — 模块级定义,进程内唯一
_ctx_trace_id: ContextVar[str] = ContextVar("trace_id", default=None)

这个变量本身是全局的(所有代码都能 import),但它存储的是 per-Task 的。

类比 Go(虽然 Go 没有直接等价物):

// 想象有个全局 map,但 key 是 goroutine ID
var _trace_id = &sync.Map{}

func Get() string {
    return _trace_id.Get(currentGoroutineID())  // 自动用当前 goroutine ID 查
}
func Set(v string) {
    _trace_id.Set(currentGoroutineID(), v)
}

本质_ctx_trace_id 是双重身份: - 作为 Python 对象:它是全局变量,谁都能访问 - 作为 Context 的 key:它用来从当前 Task 的字典里查对应的值

# 等价于:
Task_A._context = { _ctx_trace_id: "abc", _ctx_user_id: "100" }
Task_B._context = { _ctx_trace_id: "def", _ctx_user_id: "200" }

# _ctx_trace_id.get() 就是:
# asyncio.current_task()._context[_ctx_trace_id]

Copy-on-Write 机制

子 Task 修改 ContextVar 时,不会影响父 Task:

async def parent():
    _trace_id.set("parent-trace")
    task = asyncio.create_task(child())
    await task
    print(_trace_id.get())  # → "parent-trace" ← 没被子改掉

async def child():
    print(_trace_id.get())  # → "parent-trace" ← 继承了父的值
    _trace_id.set("child-trace")  # 触发 copy-on-write
    print(_trace_id.get())  # → "child-trace"

底层发生了什么:

1. child Task 创建时:
   child._context = parent._context.copy()  # 浅拷贝字典
   # child._context = {"_trace_id": "parent-trace"}

2. child 调用 _trace_id.set("child-trace"):
   new_ctx = Context(parent=child._context)  # 创建新 Context
   new_ctx._data = child._context._data.copy()  # 拷贝字典
   new_ctx._data[_trace_id] = "child-trace"  # 修改副本
   child._context = new_ctx  # 替换 Task 的 Context

3. parent Task 的 Context 没有变化:
   parent._context = {"_trace_id": "parent-trace"}  # 还是原值

这就是 copy-on-write:读时共享,写时拷贝。

实际应用:trace ID 串联

1. 中间件设置 trace_id

# middleware/logging_middleware.py
from opentelemetry import trace
from config.logger import set_log_context

class LoggingMiddleware:
    async def __call__(self, scope, receive, send):
        with tracer.start_as_current_span(f"{method} {path}") as span:
            # 生成 trace_id 并设置到 ContextVar
            trace_id = format_trace_id(span.get_span_context().trace_id)
            set_log_context(trace_id=trace_id)  # _ctx_trace_id.set(trace_id)

            await self.app(scope, receive, send)

2. LogContextFilter 自动注入

# config/logger.py
_ctx_trace_id: ContextVar[Optional[str]] = ContextVar("trace_id", default=None)

class LogContextFilter(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        trace_id = _ctx_trace_id.get()  # 从当前 Task 读取
        if trace_id:
            record.msg = f"[trace={trace_id}] {record.msg}"
        return True

# 注册到 root logger(全局生效)
logging.getLogger().addFilter(LogContextFilter())

3. 业务代码无感知

# 任何地方调用 logger,自动携带 trace_id
async def service_a():
    logger.info("Processing image")  # → [trace=abc123] Processing image

async def service_b():
    logger.info("Uploading to BFS")  # → [trace=abc123] Uploading to BFS

为什么不需要传参: - _ctx_trace_id 是全局变量,所有模块都能 import - _ctx_trace_id.get() 内部调用 asyncio.current_task()._context[_ctx_trace_id] - 每个 Task 有自己的 Context 字典,所以值是隔离的

跨线程传递(executor 场景)

ContextVar 不会自动跨线程传递,因为 executor 线程没有 asyncio Task:

asyncio event loop (主线程)          executor 线程池中的线程
┌──────────────────────┐           ┌──────────────────────┐
│ Task A               │           │ OS Thread            │
│  _trace_id = "abc"   │           │  _trace_id = ???     │
│                      │  ──────►  │                      │
│  run_in_executor()   │           │  这是另一个线程,       │
│                      │           │  没有 asyncio Task,   │
│                      │           │  ContextVar 读不到值   │
└──────────────────────┘           └──────────────────────┘

需要手动复制和恢复:

from config.logger import copy_log_context, restore_log_context

async def handler():
    # 复制当前 Task 的上下文
    _log_ctx = copy_log_context()

    def _run_sync():
        # 在 executor 线程中恢复
        restore_log_context(_log_ctx)
        return sync_function()  # 现在 logger 能读到 trace_id

    result = await loop.run_in_executor(None, _run_sync)

实现:

def copy_log_context() -> Dict[str, Optional[str]]:
    """从当前 Task 的 Context 读取并序列化"""
    return {
        "trace_id": _ctx_trace_id.get(),
        "user_id": _ctx_user_id.get(),
    }

def restore_log_context(ctx: Dict[str, Optional[str]]) -> None:
    """在新线程中恢复(直接 set,不依赖 Task)"""
    if ctx.get("trace_id"):
        _ctx_trace_id.set(ctx["trace_id"])
    if ctx.get("user_id"):
        _ctx_user_id.set(ctx["user_id"])

Python 如何真正多线程

1. run_in_executor(线程池)

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def main():
    loop = asyncio.get_event_loop()

    # 默认线程池(通常 5 个线程)
    result = await loop.run_in_executor(None, sync_function)

    # 自定义线程池
    executor = ThreadPoolExecutor(max_workers=10)
    result = await loop.run_in_executor(executor, sync_function)

用途: - 调用不支持 async 的第三方库(如 OpenAI SDK 的同步版本) - CPU 密集型计算(图片处理、加密)

2. threading.Thread(手动创建)

import threading

def worker():
    print(f"Running in thread {threading.current_thread().ident}")

thread = threading.Thread(target=worker)
thread.start()
thread.join()

注意:手动创建的线程没有 asyncio Task,ContextVar 需要手动传递。

3. 多进程部署(生产环境)

单进程只用 1 个 CPU 核,生产部署用多进程

# gunicorn 起 4 个 worker 进程
gunicorn -w 4 -k uvicorn.workers.UvicornWorker app:app
CPU Core 1 ← Worker 进程 1 (1 线程, N Tasks)
CPU Core 2 ← Worker 进程 2 (1 线程, N Tasks)
CPU Core 3 ← Worker 进程 3 (1 线程, N Tasks)
CPU Core 4 ← Worker 进程 4 (1 线程, N Tasks)

每个进程完全独立: - 自己的 event loop - 自己的内存空间 - 自己的 ContextVar - 进程间不共享内存(类比 Go 起了 4 个独立程序,nginx 负载均衡)

4. multiprocessing(进程池)

from multiprocessing import Pool

def cpu_intensive(x):
    return x ** 2

with Pool(processes=4) as pool:
    results = pool.map(cpu_intensive, range(1000))

用途:CPU 密集型任务,绕过 GIL 限制。

什么必须 async

# ❌ 错误:同步阻塞会卡死整个 event loop
async def handler():
    time.sleep(5)  # 💀 5 秒内所有请求都卡住
    return "ok"

# ✅ 正确:异步让出 CPU
async def handler():
    await asyncio.sleep(5)  # ✅ 其他 Task 继续执行
    return "ok"
操作类型 必须 async 原因
IO 操作 否则阻塞 event loop
- HTTP 请求 await httpx.get() 不能用 requests.get()
- 数据库查询 await db.execute() 不能用同步 driver
- Redis await redis.get() 不能用同步 client
- 文件读写 await aiofiles.open() 或用 run_in_executor
CPU 密集 run_in_executor 否则阻塞
- 图片处理 await loop.run_in_executor(None, resize)
- 加密计算 await loop.run_in_executor(None, hash)
纯计算 不需要 async 微秒级无所谓
- 字符串拼接 name = f"{first} {last}"
- 简单逻辑 if x > 10: ...

灾难示例

import requests  # 同步 HTTP 库

async def handler():
    # 这个请求耗时 2 秒,整个服务卡死 2 秒
    resp = requests.get("https://slow-api.com")
    # 期间所有其他请求都在等待
    return resp.json()

实际影响:

t0: 请求 A 到达 → 开始执行 requests.get() (阻塞 2s)
t1: 请求 B 到达 → 等待...
t2: 请求 C 到达 → 等待...
t3: 请求 A 的 requests.get() 完成 → 请求 B 才开始执行

验证代码

import asyncio
from contextvars import ContextVar

_trace_id = ContextVar("trace_id")

async def worker(name, trace):
    _trace_id.set(trace)
    print(f"{name} set trace_id to {trace}")
    await asyncio.sleep(0.1)  # 模拟 IO,让出 CPU

    # 恢复后读取,应该还是自己的值
    value = _trace_id.get()
    print(f"{name} read trace_id: {value}")
    assert value == trace

async def main():
    # 同一个线程,3 个 Task 并发
    await asyncio.gather(
        worker("Task-A", "trace-111"),
        worker("Task-B", "trace-222"),
        worker("Task-C", "trace-333"),
    )

asyncio.run(main())

输出:

Task-A set trace_id to trace-111
Task-B set trace_id to trace-222
Task-C set trace_id to trace-333
Task-A read trace_id: trace-111  ← 没有串
Task-B read trace_id: trace-222  ← 没有串
Task-C read trace_id: trace-333  ← 没有串

对比总结

特性 Go context.Context Python ContextVar
传递方式 显式传参 func(ctx) 隐式(自动继承)
跨 goroutine/Task 必须显式传递 子 Task 自动继承
跨线程 必须显式传递 需要手动 copy/restore
隔离单位 goroutine asyncio Task
调度方式 抢占式,多核并行 协作式,单线程切换
值存储 ctx.Value(key) ContextVar.get()
修改影响 子不影响父 子不影响父(copy-on-write)
跨任务读取 ❌ 不能 ❌ 不能
底层实现 goroutine 栈空间隔离 Task 持有 Context 字典

真实 CPython 实现

实际是用 C 实现的 HAMT (Hash Array Mapped Trie),性能优化的不可变字典:

// Include/internal/pycore_context.h
typedef struct {
    PyObject_HEAD
    PyHamtObject *ctx_vars;  // HAMT 存储 ContextVar → value
    PyObject *ctx_weakreflist;
} PyContext;

// 每个 Task 持有一个 PyContext 指针
typedef struct {
    ...
    PyContext *task_context;  // Task 的 Context
} PyTaskObject;

HAMT 是持久化数据结构,支持高效的 copy-on-write: - 读操作 O(log n) - 写操作返回新树,共享未修改的节点 - 类似 Git 的 commit tree

小结

类比 Go: - Task ≈ goroutine - ContextVar ≈ goroutine-local storage - 区别:Go 靠栈隔离,Python 靠 Context 字典隔离

这就是 Python asyncio 能用单线程处理高并发的原因——虽然微观串行,但每个请求的上下文是隔离的。对于 IO 密集型的 Web 服务,asyncio 的性能与 Go 差距不大。