Batch Task State Machine¶
This page is the authoritative reference for how batch tasks move through the system — from initial API request to final result. It covers the Task-level status transitions, the reconciler loop, per-job lifecycles, and the exact shape of the provider_state JSONB column for every supported provider.
Audience: contributors adding new batch services or debugging stuck/failed tasks.
Two-Phase Design¶
Batch tasks are intentionally split into two phases to decouple HTTP request latency from long-running provider operations.
| Phase | Where | What happens |
|---|---|---|
| Submit | app/tasks/{type}/batch/task_handler.py |
Validate inputs, call the provider API to start the job(s), write provider_state to the DB, mark task IN_PROGRESS. Returns immediately. |
| Poll & Finalize | core/background_tasks/reconciler/ |
Every 120 s, query all IN_PROGRESS tasks, ask each provider if the job finished, update provider_state, and on completion write the output file and flip the task to a terminal status. |
This means the Pub/Sub handler is not responsible for finalizing the task. It only submits and exits. The reconciler owns every status transition after IN_PROGRESS.
Task-Level State Machine¶
flowchart TD
A["API Request"] --> B["create_task_checkpoint\nstatus = PENDING"]
B --> C["Pub/Sub delivers message\nto task handler"]
C -->|"submission succeeds"| D["mark_task_in_progress\nPENDING → IN_PROGRESS\nprovider_state written"]
C -->|"submission fails"| E["mark_task_failed\n→ FAILED"]
D --> F["Reconciler loop\nevery 120 s"]
F -->|"provider_state is NULL"| E
F -->|"task age > 26 h"| E
F -->|"all jobs terminal,\n≥1 succeeded"| G["finalize_batch_checkpoint\n→ COMPLETED or PARTIAL_COMPLETE"]
F -->|"all jobs FAILED"| E
F -->|"jobs still running"| F
Status values¶
| Status | Who sets it | Meaning |
|---|---|---|
PENDING |
create_task_checkpoint |
Task created; waiting for Pub/Sub delivery |
IN_PROGRESS |
mark_task_in_progress |
Jobs submitted to provider; reconciler polling |
COMPLETED |
finalize_batch_checkpoint |
All jobs succeeded |
PARTIAL_COMPLETE |
finalize_batch_checkpoint |
At least one job succeeded; some failed |
FAILED |
mark_task_failed |
All jobs failed, submission error, timeout, or NULL state |
Idempotency¶
mark_task_in_progress uses a compound WHERE id = ? AND status = 'PENDING' update. If Pub/Sub re-delivers the message, the second handler invocation sees rows_updated = 0 and skips without submitting duplicate jobs.
Reconciler Loop¶
File: core/background_tasks/reconciler/loop.py
batch_polling_loop
└─ reconcile_in_progress_tasks (called every POLL_INTERVAL_SECONDS)
├─ get_in_progress_tasks()
├─ guard: provider_state NULL → mark_task_failed
├─ guard: age > TASK_TIMEOUT_HOURS → mark_task_failed
└─ dispatch on provider_state["task_type"]
├─ "transcription" → reconcile_transcription_task
└─ "completion" → reconcile_completion_task
Knobs¶
| Constant | Value | Effect |
|---|---|---|
POLL_INTERVAL_SECONDS |
120 |
Sleep between reconcile cycles |
TASK_TIMEOUT_HOURS |
26 |
Age at which a stuck IN_PROGRESS task is force-failed |
Per-Job Lifecycle — Transcription¶
File: core/background_tasks/reconciler/transcription.py
Each entry in provider_state["jobs"] moves through these statuses independently:
flowchart LR
SUBMITTED --> Polling["polling\nprovider-specific status"]
Polling -->|"provider reports done"| COMPLETED
Polling -->|"provider reports failure"| FAILED
Polling -->|"transient ERROR, attempt < 3"| Polling
Polling -->|"transient ERROR, attempt = 3"| FAILED
Transient errors: If the provider poll call returns status = "ERROR", the job's transient_failures counter is incremented. The job is retried on the next reconciler cycle. After 3 transient errors the job is forced to FAILED.
Finalization: Once every job is in a terminal state:
- All jobs
FAILED→mark_task_failed(task =FAILED) - At least one job
COMPLETED→ upload merged result, callfinalize_batch_checkpoint - All succeeded →
COMPLETED - Mixed →
PARTIAL_COMPLETE
Result merging by provider:
| Provider | Merge strategy |
|---|---|
Per-model dict: {model: {"transcriptions": result}} |
|
| AZURE, SARVAM, AWS | Flat list: {"transcriptions": [...]} |
Per-Job Lifecycle — Completion¶
File: core/background_tasks/reconciler/completion.py
Completion delegates status checking to batch_completion.check_status, which returns is_complete plus updated per-model outcomes. The reconciler:
- Calls
provider_state_to_outcomesto deserialize the stored state. - Calls
check_status(outcomes, models)— one pass, no retry loop. - Writes
outcomes_to_provider_state(updated_outcomes)back to the DB. - If
is_complete, fetches results and finalizes.
Terminal status aggregation:
| Condition | Final status |
|---|---|
All models COMPLETED |
COMPLETED |
At least one COMPLETED or PARTIAL_COMPLETE |
PARTIAL_COMPLETE |
| No model succeeded | FAILED |
provider_state JSONB Schema¶
provider_state is written by the task handler when the task moves to IN_PROGRESS and updated in-place by the reconciler on each poll cycle.
Transcription — AZURE¶
{
"task_type": "transcription",
"provider": "AZURE",
"dataset_ids": ["<dataset-uuid>"],
"invalid_urls": ["<url-that-failed-validation>"],
"jobs": [
{
"external_job_id": "<azure-transcription-job-url>",
"status": "SUBMITTED",
"result": [{ ... }],
"error": "<set on failure>"
}
]
}
Transcription — SARVAM¶
{
"task_type": "transcription",
"provider": "SARVAM",
"dataset_ids": ["<dataset-uuid>"],
"invalid_urls": [],
"gcs_map": { "<job-id>": "<gcs-uri>" },
"jobs": [
{
"external_job_id": "<sarvam-job-id>",
"status": "SUBMITTED",
"result": [{ ... }],
"error": "<set on failure>",
"transient_failures": 0
}
]
}
gcs_map is passed to poll_sarvam_once so it can resolve the source GCS URI for a given job ID.
Transcription — AWS¶
{
"task_type": "transcription",
"provider": "AWS",
"dataset_ids": ["<dataset-uuid>"],
"invalid_urls": [],
"jobs": [
{
"external_job_id": "<aws-transcription-job-name>",
"gcs_uri": "<source-gcs-uri>",
"filename": "<original-filename>",
"status": "SUBMITTED",
"result": [{ ... }],
"error": "<set on failure>",
"transient_failures": 0
}
]
}
Transcription — GOOGLE¶
{
"task_type": "transcription",
"provider": "GOOGLE",
"dataset_ids": ["<dataset-uuid>"],
"invalid_urls": [],
"jobs": [
{
"model": "long",
"external_job_id": "<google-lro-operation-name>",
"status": "SUBMITTED",
"result": [{ ... }],
"error": "<set on failure>",
"transient_failures": 0
}
]
}
One job object per model (e.g. long, short, telephony).
Completion¶
{
"task_type": "completion",
"input_dataset_ids": ["<dataset-uuid>"],
"jobs": [
{
"model": "gemini-2.5-flash",
"status": "SUBMITTED",
"external_job_id": "<provider-batch-job-id>",
"output_uri": "<gcs-uri-to-results>",
"error": "<set on failure>",
"transient_failures": 0
}
]
}
One job object per model. output_uri is populated by check_status once the provider reports the batch is done.
Adding a New Batch Service to the Reconciler¶
When onboarding a new batch service (see Service Onboarding for the full checklist), the reconciler needs two additions:
-
A poll function — implement
_poll_<service>_job_once(provider, job, provider_state) -> dictthat returns{"status": "COMPLETED"|"FAILED"|"ERROR"|<running-status>, "result": ..., "error": ...}. Add it in an existing or new module undercore/background_tasks/reconciler/. -
A route in
reconcile_in_progress_tasks— add a branch inloop.py:
The task handler must write provider_state["task_type"] = "<your-type>" so the reconciler can route it correctly.