Skip to content

Batch Task State Machine

This page is the authoritative reference for how batch tasks move through the system — from initial API request to final result. It covers the Task-level status transitions, the reconciler loop, per-job lifecycles, and the exact shape of the provider_state JSONB column for every supported provider.

Audience: contributors adding new batch services or debugging stuck/failed tasks.


Two-Phase Design

Batch tasks are intentionally split into two phases to decouple HTTP request latency from long-running provider operations.

Phase Where What happens
Submit app/tasks/{type}/batch/task_handler.py Validate inputs, call the provider API to start the job(s), write provider_state to the DB, mark task IN_PROGRESS. Returns immediately.
Poll & Finalize core/background_tasks/reconciler/ Every 120 s, query all IN_PROGRESS tasks, ask each provider if the job finished, update provider_state, and on completion write the output file and flip the task to a terminal status.

This means the Pub/Sub handler is not responsible for finalizing the task. It only submits and exits. The reconciler owns every status transition after IN_PROGRESS.


Task-Level State Machine

flowchart TD
    A["API Request"] --> B["create_task_checkpoint\nstatus = PENDING"]
    B --> C["Pub/Sub delivers message\nto task handler"]
    C -->|"submission succeeds"| D["mark_task_in_progress\nPENDING → IN_PROGRESS\nprovider_state written"]
    C -->|"submission fails"| E["mark_task_failed\n→ FAILED"]
    D --> F["Reconciler loop\nevery 120 s"]
    F -->|"provider_state is NULL"| E
    F -->|"task age > 26 h"| E
    F -->|"all jobs terminal,\n≥1 succeeded"| G["finalize_batch_checkpoint\n→ COMPLETED or PARTIAL_COMPLETE"]
    F -->|"all jobs FAILED"| E
    F -->|"jobs still running"| F

Status values

Status Who sets it Meaning
PENDING create_task_checkpoint Task created; waiting for Pub/Sub delivery
IN_PROGRESS mark_task_in_progress Jobs submitted to provider; reconciler polling
COMPLETED finalize_batch_checkpoint All jobs succeeded
PARTIAL_COMPLETE finalize_batch_checkpoint At least one job succeeded; some failed
FAILED mark_task_failed All jobs failed, submission error, timeout, or NULL state

Idempotency

mark_task_in_progress uses a compound WHERE id = ? AND status = 'PENDING' update. If Pub/Sub re-delivers the message, the second handler invocation sees rows_updated = 0 and skips without submitting duplicate jobs.


Reconciler Loop

File: core/background_tasks/reconciler/loop.py

batch_polling_loop
  └─ reconcile_in_progress_tasks   (called every POLL_INTERVAL_SECONDS)
       ├─ get_in_progress_tasks()
       ├─ guard: provider_state NULL → mark_task_failed
       ├─ guard: age > TASK_TIMEOUT_HOURS → mark_task_failed
       └─ dispatch on provider_state["task_type"]
            ├─ "transcription" → reconcile_transcription_task
            └─ "completion"    → reconcile_completion_task

Knobs

Constant Value Effect
POLL_INTERVAL_SECONDS 120 Sleep between reconcile cycles
TASK_TIMEOUT_HOURS 26 Age at which a stuck IN_PROGRESS task is force-failed

Per-Job Lifecycle — Transcription

File: core/background_tasks/reconciler/transcription.py

Each entry in provider_state["jobs"] moves through these statuses independently:

flowchart LR
    SUBMITTED --> Polling["polling\nprovider-specific status"]
    Polling -->|"provider reports done"| COMPLETED
    Polling -->|"provider reports failure"| FAILED
    Polling -->|"transient ERROR, attempt < 3"| Polling
    Polling -->|"transient ERROR, attempt = 3"| FAILED

Transient errors: If the provider poll call returns status = "ERROR", the job's transient_failures counter is incremented. The job is retried on the next reconciler cycle. After 3 transient errors the job is forced to FAILED.

Finalization: Once every job is in a terminal state:

  • All jobs FAILEDmark_task_failed (task = FAILED)
  • At least one job COMPLETED → upload merged result, call finalize_batch_checkpoint
  • All succeeded → COMPLETED
  • Mixed → PARTIAL_COMPLETE

Result merging by provider:

Provider Merge strategy
GOOGLE Per-model dict: {model: {"transcriptions": result}}
AZURE, SARVAM, AWS Flat list: {"transcriptions": [...]}

Per-Job Lifecycle — Completion

File: core/background_tasks/reconciler/completion.py

Completion delegates status checking to batch_completion.check_status, which returns is_complete plus updated per-model outcomes. The reconciler:

  1. Calls provider_state_to_outcomes to deserialize the stored state.
  2. Calls check_status(outcomes, models) — one pass, no retry loop.
  3. Writes outcomes_to_provider_state(updated_outcomes) back to the DB.
  4. If is_complete, fetches results and finalizes.

Terminal status aggregation:

Condition Final status
All models COMPLETED COMPLETED
At least one COMPLETED or PARTIAL_COMPLETE PARTIAL_COMPLETE
No model succeeded FAILED

provider_state JSONB Schema

provider_state is written by the task handler when the task moves to IN_PROGRESS and updated in-place by the reconciler on each poll cycle.

Transcription — AZURE

{
  "task_type": "transcription",
  "provider": "AZURE",
  "dataset_ids": ["<dataset-uuid>"],
  "invalid_urls": ["<url-that-failed-validation>"],
  "jobs": [
    {
      "external_job_id": "<azure-transcription-job-url>",
      "status": "SUBMITTED",
      "result": [{ ... }],
      "error": "<set on failure>"
    }
  ]
}

Transcription — SARVAM

{
  "task_type": "transcription",
  "provider": "SARVAM",
  "dataset_ids": ["<dataset-uuid>"],
  "invalid_urls": [],
  "gcs_map": { "<job-id>": "<gcs-uri>" },
  "jobs": [
    {
      "external_job_id": "<sarvam-job-id>",
      "status": "SUBMITTED",
      "result": [{ ... }],
      "error": "<set on failure>",
      "transient_failures": 0
    }
  ]
}

gcs_map is passed to poll_sarvam_once so it can resolve the source GCS URI for a given job ID.

Transcription — AWS

{
  "task_type": "transcription",
  "provider": "AWS",
  "dataset_ids": ["<dataset-uuid>"],
  "invalid_urls": [],
  "jobs": [
    {
      "external_job_id": "<aws-transcription-job-name>",
      "gcs_uri": "<source-gcs-uri>",
      "filename": "<original-filename>",
      "status": "SUBMITTED",
      "result": [{ ... }],
      "error": "<set on failure>",
      "transient_failures": 0
    }
  ]
}

Transcription — GOOGLE

{
  "task_type": "transcription",
  "provider": "GOOGLE",
  "dataset_ids": ["<dataset-uuid>"],
  "invalid_urls": [],
  "jobs": [
    {
      "model": "long",
      "external_job_id": "<google-lro-operation-name>",
      "status": "SUBMITTED",
      "result": [{ ... }],
      "error": "<set on failure>",
      "transient_failures": 0
    }
  ]
}

One job object per model (e.g. long, short, telephony).

Completion

{
  "task_type": "completion",
  "input_dataset_ids": ["<dataset-uuid>"],
  "jobs": [
    {
      "model": "gemini-2.5-flash",
      "status": "SUBMITTED",
      "external_job_id": "<provider-batch-job-id>",
      "output_uri": "<gcs-uri-to-results>",
      "error": "<set on failure>",
      "transient_failures": 0
    }
  ]
}

One job object per model. output_uri is populated by check_status once the provider reports the batch is done.


Adding a New Batch Service to the Reconciler

When onboarding a new batch service (see Service Onboarding for the full checklist), the reconciler needs two additions:

  1. A poll function — implement _poll_<service>_job_once(provider, job, provider_state) -> dict that returns {"status": "COMPLETED"|"FAILED"|"ERROR"|<running-status>, "result": ..., "error": ...}. Add it in an existing or new module under core/background_tasks/reconciler/.

  2. A route in reconcile_in_progress_tasks — add a branch in loop.py:

elif task_type == "translation":
    await reconcile_translation_task(task)

The task handler must write provider_state["task_type"] = "<your-type>" so the reconciler can route it correctly.