Skip to content

Single Task State Machine

This page is the authoritative reference for how single (sync) tasks move through the system — from initial API request to direct HTTP response. It covers status transitions, the full dispatch flowchart, file upload pipeline, per-handler provider routing, and the dispatcher registry.

Audience: contributors adding new single services or debugging failed single tasks.


One-Phase Design

Single tasks run entirely inside the HTTP request — there is no Pub/Sub queue and no background reconciler.

Batch Single
Execution Background (Pub/Sub + reconciler) In-request (synchronous)
Status during processing IN_PROGRESS PENDING
Result delivery Polling / webhook Direct HTTP response
File handling Dataset IDs pre-exist Files uploaded inline

Task-Level State Machine

flowchart TD
    A["API Request"] --> B["create_task_checkpoint\nstatus = PENDING"]
    B --> C["_execute_task"]
    C -->|"success"| D["mark_task_completed\nPENDING → COMPLETED"]
    C -->|"failure"| E["mark_task_failed\nPENDING → FAILED"]
    D --> F["HTTP 200\n{task_id, status, result}"]
    E --> G["HTTP 4xx/5xx"]

Idempotency

TASK_SEMAPHORE (max 50 concurrent, from core/task_semaphores.py) gates all single task executions. This prevents runaway concurrency and ensures the server is not overwhelmed by duplicate in-flight requests for the same resource.


Full Dispatch Flowchart

File: app/controllers/single_task_dispatcher.py

flowchart LR
    A["Route\nPOST /task/{type}"] --> B["Acquire TASK_SEMAPHORE\nmax 50 concurrent"]
    B --> C["_validate_payload\nproject access + Pydantic\n+ task preprocessor"]
    C -->|"has files"| D["File Upload Pipeline\n_process_and_upload_files"]
    C -->|"no files"| E["create_task_checkpoint\nstatus = PENDING"]
    D --> E
    E --> F["_execute_task"]
    F -->|"success"| G["mark_task_completed"]
    F -->|"failure"| H["mark_task_failed"]
    G --> I["Return\n{task_id, status, result}"]
    H --> J["HTTP error"]

File Upload Sub-Flow

Function: _process_and_upload_files

flowchart TD
    A["UploadFile list"] --> B["validate_and_process_files\nfile_validators_map"]
    B -->|"invalid"| C["HTTPException 400"]
    B -->|"ok"| D["create_dataset_checkpoint\nstatus = PENDING"]
    D --> E["Upload each file\nto Azure Blob / GCS"]
    E -->|"upload error"| F["update_dataset_status FAILED\nHTTPException"]
    E -->|"all uploaded"| G["create_file_records_checkpoint\ndataset COMPLETED"]
    G --> H["Attach dataset_id to payload"]
Task type Files required?
transcription Always
completion Optional
translation Never
text_to_speech Never

Dispatcher Registry

File: app/controllers/single_task_dispatcher.py

Task type Payload validator File validator Handler
transcription validate_transcription_payload validate_transcription_audios async_transcription_handler
completion validate_chat_completion_payload validate_upload_files async_chat_completion_task
translation validate_translation_payload — (no files) async_translation_handler
text_to_speech validate_text_to_speech_payload — (no files) async_text_to_speech_handler

Per-Handler Provider Routing

Transcription

flowchart LR
    A["async_transcription_handler"] -->|"AZURE"| B["Azure Batch Transcription API"]
    A -->|"SARVAM"| C["Sarvam API"]
    A -->|"AWS"| D["AWS Transcribe"]
    A -->|"GOOGLE"| E["Google Speech-to-Text\none job per model"]

Completion

flowchart LR
    A["async_chat_completion_task"] -->|"ANTHROPIC"| B["Anthropic API"]
    A -->|"GOOGLE"| C["Vertex AI"]
    A -->|"AZURE"| D["Azure OpenAI"]

Translation

flowchart LR
    A["async_translation_handler"] -->|"AZURE"| B["Azure Translator"]
    A -->|"GOOGLE"| C["Google Translate"]
    A -->|"SARVAM"| D["Sarvam Translate"]
    A -->|"NLLB / KARYA_LOCAL"| E["NLLB model"]

Text-to-Speech

flowchart LR
    A["async_text_to_speech_handler"] -->|"GOOGLE"| B["Google TTS"]
    A -->|"AZURE"| C["Azure TTS"]
    A -->|"SARVAM"| D["Sarvam TTS"]

Status Values

Reference: core/db/model.py TaskStatus literal.

Status Who sets it Meaning
PENDING create_task_checkpoint Task created; handler executing
COMPLETED mark_task_completed Task finished successfully
FAILED mark_task_failed Task encountered an error

Note

IN_PROGRESS and PARTIAL_COMPLETE are batch-only statuses and are never set by the single task pipeline.


Adding a New Single Service

When onboarding a new single service (see Service Onboarding for the full checklist), update these three maps in app/controllers/single_task_dispatcher.py:

  1. payload_validators_map — register your payload validator. If the service accepts files, also register the file validator in file_validators_map.
  2. _task_handlers — register your handler function.
  3. request_map — register your request model.