Σ
MCLAVIER
Docs/Async Deep Dive

Async Deep Dive

The marketplace is built on Python's asyncio event loop from top to bottom. This page explains how every async-related piece fits together — and why certain design choices were made.

Why asyncio matters here

Each actuarial model can take 5–30 seconds to execute. A synchronous HTTP server would block the entire process for that duration, making the service unusable under concurrent load.

With asyncio:

  • The event loop can handle hundreds of concurrent connections while a model runs
  • await asyncio.sleep() inside function.py yields control back to the loop without blocking
  • WebSocket polling loops run concurrently with other HTTP requests
  • Database queries (via asyncpg) don't block while waiting for PostgreSQL

The mental model: there is one thread, but many coroutines taking turns on the event loop. Any await is a yield point where other work can run.

BackgroundTasks vs asyncio.create_task

The backend uses FastAPI's BackgroundTasks to schedule job execution:

@router.post("/apps/{app_id}/run")
async def trigger_run(
    app_id: int,
    body: RunRequest,
    background_tasks: BackgroundTasks,
    db: AsyncSession = Depends(get_db),
):
    run = JobRun(user_id=1, app_id=app_id, inputs=body.inputs)
    db.add(run)
    await db.commit()
    await db.refresh(run)

    background_tasks.add_task(_execute_job, run.id, app.function_path, body.inputs)

    return RunResponse(run_id=run.id, status=run.status)

BackgroundTasks.add_task schedules _execute_job to run after the response is sent — not in a separate thread, not in a separate process. It runs on the same event loop, immediately after the response is streamed to the client.

Why not asyncio.create_task?

asyncio.create_task would also work, but BackgroundTasks integrates with FastAPI's lifecycle:

  • Tasks are awaited before the server shuts down (graceful shutdown)
  • Errors in background tasks are logged via FastAPI's error handling
  • The dependency injection context (like db) is properly cleaned up

The practical difference is small for this use case, but BackgroundTasks is the idiomatic FastAPI approach.

Warning

BackgroundTasks run on the main event loop, in the main process. A CPU-intensive model that never yields (no await) will block the event loop and freeze all other requests during its execution. Always include at least one await (e.g. await asyncio.sleep(0)) in long-running synchronous computations to yield control periodically.

The _execute_job coroutine

async def _execute_job(run_id: int, function_path: str, inputs: dict) -> None:
    from database import AsyncSessionLocal

    # Phase 1: mark RUNNING
    async with AsyncSessionLocal() as db:
        run = await db.get(JobRun, run_id)
        run.status = JobStatus.RUNNING
        await db.commit()

    # Phase 2: execute the model
    try:
        fn = load_function(function_path)
        payload = await fn(inputs)
    except Exception:
        async with AsyncSessionLocal() as db:
            run = await db.get(JobRun, run_id)
            run.status = JobStatus.FAILED
            run.finished_at = datetime.utcnow()
            await db.commit()
        return

    # Phase 3: persist SUCCESS
    async with AsyncSessionLocal() as db:
        run = await db.get(JobRun, run_id)
        run.status = JobStatus.SUCCESS
        run.finished_at = datetime.utcnow()
        result = JobResult(run_id=run_id, payload=payload)
        db.add(result)
        await db.commit()

Three key design decisions:

  1. Session-per-phase: each phase opens and closes its own AsyncSessionLocal. This is necessary because _execute_job runs outside the HTTP request lifecycle — the original request session is already closed when this coroutine starts.

  2. Exception safety: wrapping the fn(inputs) call in try/except Exception ensures any failure in user code is caught and recorded rather than crashing the background task silently.

  3. No shared state: run_id, function_path, and inputs are passed as arguments rather than captured from a closure, which makes the coroutine safe to run after the request context is torn down.

WebSocket polling loop

The WebSocket handler polls the database on a fixed 3-second interval:

@router.websocket("/ws/runs/{run_id}")
async def ws_run_status(websocket: WebSocket, run_id: int):
    await websocket.accept()
    try:
        while True:
            async with AsyncSessionLocal() as db:
                result = await db.execute(
                    select(JobRun)
                    .where(JobRun.id == run_id)
                    .options(selectinload(JobRun.result))
                )
                run = result.scalar_one_or_none()

            if not run:
                await websocket.send_json({"status": "NOT_FOUND"})
                break

            await websocket.send_json({
                "status": run.status.value,
                "run_id": run.id,
                "result": run.result.payload if run.result else None,
            })

            if run.status.value in ("SUCCESS", "FAILED"):
                break

            await asyncio.sleep(3)
    except WebSocketDisconnect:
        pass
    finally:
        try:
            await websocket.close()
        except Exception:
            pass

await asyncio.sleep(3) is the critical line — it yields control to the event loop for 3 seconds, during which other WebSocket connections, HTTP requests, and the job's own await expressions can all progress concurrently.

The selectinload(JobRun.result) eagerly loads the job_results row in the same query to avoid a separate N+1 query when accessing run.result.

Tip

For latency-critical production deployments, replace asyncio.sleep(3) with a PostgreSQL LISTEN/NOTIFY pattern. The handler subscribes to a channel, and _execute_job issues a NOTIFY on status change. This reduces result latency from up to 3 seconds to near-zero.

importlib dynamic loading

The backend loads function.py from each registered app at runtime using Python's standard importlib:

import importlib.util
from pathlib import Path


def load_function(function_path: str):
    mod_path = Path(function_path) / "function.py"
    if not mod_path.exists():
        raise FileNotFoundError(f"function.py not found at {mod_path}")

    spec = importlib.util.spec_from_file_location("function", mod_path)
    mod = importlib.util.module_from_spec(spec)
    spec.loader.exec_module(mod)
    return mod.run

spec_from_file_location creates a module spec from an absolute file path, bypassing Python's normal package import machinery. exec_module(mod) executes the file in the new module's namespace, making run accessible as mod.run.

Implications:

  • The module is re-executed on every call to load_function — there is no caching
  • Module-level code in function.py (e.g. loading a CSV into a DataFrame) runs on every job execution
  • The module name is always "function" regardless of the app — don't rely on __name__
  • No sys.modules entry is created, so the module is completely isolated between runs
Note

Because exec_module re-runs the module on every job, expensive initialisation (model loading, data loading) adds latency to every run. For production models with heavy initialisation, consider caching with a module-level dict keyed by function_path.

asyncpg session management

SQLAlchemy 2's async support wraps asyncpg in a familiar ORM interface:

# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker

engine = create_async_engine(DATABASE_URL)  # DATABASE_URL uses postgresql+asyncpg://
AsyncSessionLocal = async_sessionmaker(engine, expire_on_expire=False)

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

Two session patterns are used:

  1. Request-scoped via get_db(): created when a request handler starts, yielded to the handler as a dependency, automatically committed or rolled back and closed when the handler returns. Used in all HTTP route handlers.

  2. Manual async with AsyncSessionLocal(): used in _execute_job and the WebSocket handler, which run outside the request lifecycle. Each async with block opens a new connection from the pool, commits or rolls back, and releases the connection on exit.

The engine maintains a connection pool (asyncpg's built-in pool). Each async with AsyncSessionLocal() checks out a connection and returns it on exit — connections are reused efficiently across concurrent coroutines.

Warning

expire_on_expire=False is set on AsyncSessionLocal to prevent SQLAlchemy from lazily reloading attributes after a commit. In async contexts, lazy loading would trigger an implicit await on attribute access — which is not allowed outside an async with db block.

← PreviousAPI ReferenceNext →Build Logs
Edit this page on GitHub