Single Task State Machine¶
This page is the authoritative reference for how single (sync) tasks move through the system — from initial API request to direct HTTP response. It covers status transitions, the full dispatch flowchart, file upload pipeline, per-handler provider routing, and the dispatcher registry.
Audience: contributors adding new single services or debugging failed single tasks.
One-Phase Design¶
Single tasks run entirely inside the HTTP request — there is no Pub/Sub queue and no background reconciler.
| Batch | Single | |
|---|---|---|
| Execution | Background (Pub/Sub + reconciler) | In-request (synchronous) |
| Status during processing | IN_PROGRESS |
PENDING |
| Result delivery | Polling / webhook | Direct HTTP response |
| File handling | Dataset IDs pre-exist | Files uploaded inline |
Task-Level State Machine¶
flowchart TD
A["API Request"] --> B["create_task_checkpoint\nstatus = PENDING"]
B --> C["_execute_task"]
C -->|"success"| D["mark_task_completed\nPENDING → COMPLETED"]
C -->|"failure"| E["mark_task_failed\nPENDING → FAILED"]
D --> F["HTTP 200\n{task_id, status, result}"]
E --> G["HTTP 4xx/5xx"]
Idempotency¶
TASK_SEMAPHORE (max 50 concurrent, from core/task_semaphores.py) gates all single task executions. This prevents runaway concurrency and ensures the server is not overwhelmed by duplicate in-flight requests for the same resource.
Full Dispatch Flowchart¶
File: app/controllers/single_task_dispatcher.py
flowchart LR
A["Route\nPOST /task/{type}"] --> B["Acquire TASK_SEMAPHORE\nmax 50 concurrent"]
B --> C["_validate_payload\nproject access + Pydantic\n+ task preprocessor"]
C -->|"has files"| D["File Upload Pipeline\n_process_and_upload_files"]
C -->|"no files"| E["create_task_checkpoint\nstatus = PENDING"]
D --> E
E --> F["_execute_task"]
F -->|"success"| G["mark_task_completed"]
F -->|"failure"| H["mark_task_failed"]
G --> I["Return\n{task_id, status, result}"]
H --> J["HTTP error"]
File Upload Sub-Flow¶
Function: _process_and_upload_files
flowchart TD
A["UploadFile list"] --> B["validate_and_process_files\nfile_validators_map"]
B -->|"invalid"| C["HTTPException 400"]
B -->|"ok"| D["create_dataset_checkpoint\nstatus = PENDING"]
D --> E["Upload each file\nto Azure Blob / GCS"]
E -->|"upload error"| F["update_dataset_status FAILED\nHTTPException"]
E -->|"all uploaded"| G["create_file_records_checkpoint\ndataset COMPLETED"]
G --> H["Attach dataset_id to payload"]
| Task type | Files required? |
|---|---|
transcription |
Always |
completion |
Optional |
translation |
Never |
text_to_speech |
Never |
Dispatcher Registry¶
File: app/controllers/single_task_dispatcher.py
| Task type | Payload validator | File validator | Handler |
|---|---|---|---|
transcription |
validate_transcription_payload |
validate_transcription_audios |
async_transcription_handler |
completion |
validate_chat_completion_payload |
validate_upload_files |
async_chat_completion_task |
translation |
validate_translation_payload |
— (no files) | async_translation_handler |
text_to_speech |
validate_text_to_speech_payload |
— (no files) | async_text_to_speech_handler |
Per-Handler Provider Routing¶
Transcription¶
flowchart LR
A["async_transcription_handler"] -->|"AZURE"| B["Azure Batch Transcription API"]
A -->|"SARVAM"| C["Sarvam API"]
A -->|"AWS"| D["AWS Transcribe"]
A -->|"GOOGLE"| E["Google Speech-to-Text\none job per model"]
Completion¶
flowchart LR
A["async_chat_completion_task"] -->|"ANTHROPIC"| B["Anthropic API"]
A -->|"GOOGLE"| C["Vertex AI"]
A -->|"AZURE"| D["Azure OpenAI"]
Translation¶
flowchart LR
A["async_translation_handler"] -->|"AZURE"| B["Azure Translator"]
A -->|"GOOGLE"| C["Google Translate"]
A -->|"SARVAM"| D["Sarvam Translate"]
A -->|"NLLB / KARYA_LOCAL"| E["NLLB model"]
Text-to-Speech¶
flowchart LR
A["async_text_to_speech_handler"] -->|"GOOGLE"| B["Google TTS"]
A -->|"AZURE"| C["Azure TTS"]
A -->|"SARVAM"| D["Sarvam TTS"]
Status Values¶
Reference: core/db/model.py TaskStatus literal.
| Status | Who sets it | Meaning |
|---|---|---|
PENDING |
create_task_checkpoint |
Task created; handler executing |
COMPLETED |
mark_task_completed |
Task finished successfully |
FAILED |
mark_task_failed |
Task encountered an error |
Note
IN_PROGRESS and PARTIAL_COMPLETE are batch-only statuses and are never set by the single task pipeline.
Adding a New Single Service¶
When onboarding a new single service (see Service Onboarding for the full checklist), update these three maps in app/controllers/single_task_dispatcher.py:
payload_validators_map— register your payload validator. If the service accepts files, also register the file validator infile_validators_map._task_handlers— register your handler function.request_map— register your request model.