Language Server: Service Onboarding and Workflow Guide¶
This document provides a clear overview of the Language Server's internal task processing workflows and outlines the essential steps for integrating new services (e.g., translation).
Each service must support two pipelines:¶
- Async (Batch) → Goes through Pub/Sub for background processing
- Sync (Single) → Processes directly in the request cycle without Pub/Sub
A. Language Server Task Processing Workflow¶
The following flowcharts illustrate the journey of a user request through the Language Server, from initial input to the final result.
1. Async (Batch) Task Workflow¶
The journey of a batch request through the Language Server:
flowchart LR
A["User Input (task_name, dataset_ids, payload)"] --> B["Payload & Model Validation"]
B --> C["Create DB Entries\nstatus = PENDING"]
C --> D["Build Task Message"]
D --> E["Publish to Pub/Sub"]
E --> F["Subscriber chooses Handler based on task_name"]
F --> G["Fetch File Metadata from Dataset"]
G --> H["File preprocessing (if needed)"]
H --> I["Handler submits jobs to provider\nmark_task_in_progress → IN_PROGRESS"]
I --> J["Background Reconciler\nevery 120 s"]
J -->|"all jobs terminal"| K["Finalize: upload result\nCOMPLETED / PARTIAL_COMPLETE / FAILED"]
J -->|"jobs still running"| J
Two-phase design
The Pub/Sub handler only submits jobs to the provider and exits. The Background Reconciler polls provider APIs every 120 seconds and writes the final result. See Batch Task State Machine for the full status lifecycle and provider_state schema.
2. Sync (Single) Task Workflow¶
The journey of a single request through the Language Server:
flowchart LR
A["Route\nPOST /task/{type}"] --> B["Acquire TASK_SEMAPHORE\nmax 50 concurrent"]
B --> C["_validate_payload\nproject access + Pydantic\n+ task preprocessor"]
C -->|"has files"| D["File Upload Pipeline\n_process_and_upload_files"]
C -->|"no files"| E["create_task_checkpoint\nstatus = PENDING"]
D --> E
E --> F["_execute_task"]
F -->|"success"| G["mark_task_completed"]
F -->|"failure"| H["mark_task_failed"]
G --> I["Return\n{task_id, status, result}"]
H --> J["HTTP error"]
One-phase design
Single tasks run fully inside the HTTP request — no Pub/Sub, no background polling. See Single Task State Machine for the full status lifecycle, file pipeline, and dispatcher registry.
Key Features:¶
- ✅ Immediate Processing: Returns results directly without queuing
- ⚡ Async Processing: Uses async handlers for concurrent execution
- 📁 File Management: Automatically uploads files to cloud storage (Azure/GCS)
- 💾 Database Updates: Updates task status and response atomically
B. Onboarding a New Service: A Step-by-Step Guide¶
This section provides a detailed, code-oriented guide for developers to onboard a new service into the Language Server. We will cover both asynchronous (batch) and synchronous (single) pipelines.
Folder Structure for a New Service¶
When adding a new service, let's say translation, you will need to create the following directory structure:
.
├── tasks
│ └── translation
│ ├── batch
│ │ ├── task_handler.py # 1. Async task handler
│ │ └── preprocessor.py # 2. Async payload preprocessor
│ ├── single
│ │ ├── task_handler.py # 3. Sync task handler
│ │ └── preprocessor.py # 4. Sync payload preprocessor
│ └── utils # 5. Shared utilities
│ └── validate_translation_files.py
File Descriptions:
-
app/tasks/{service_name}/batch/task_handler.py: Triggered by a Pub/Sub message. Validates inputs, submits jobs to the provider, writesprovider_stateto the database, and callsmark_task_in_progress. It does not finalize the task — that is done by the background reconciler. -
app/tasks/{service_name}/batch/preprocessor.py: Contains payload preprocessing and validation logic for your async service before the task is published to the queue. -
app/tasks/{service_name}/single/task_handler.py: Holds the core business logic for the synchronous (single) version of your service. It's called directly from the API endpoint to process the request in real-time and return a response to the user. -
app/tasks/{service_name}/single/preprocessor.py: Contains payload preprocessing and validation logic for your sync service before the task is executed by the handler. -
app/tasks/{service_name}/utils/: For any utility functions or configurations specific to thetranslationservice, such as file validation logic.
I. Onboarding an Async (Batch) Service¶
Here's how to add a new asynchronous service, like batch_translation.
1. Define the Request Payload¶
- What: Define the structure of the request payload for your service
- Where:
common/types/request_types.py - How: Create a new Pydantic model that inherits from
BaseModel
Example for BatchTranslationRequest:
class BatchTranslationRequest(BaseModel):
project_name: str
user_email: str
dataset_ids: List[str]
provider: str = "GOOGLE"
models: List[str]
source_language: str
target_language: str
2. Create the Endpoint¶
- What: Define the API endpoint that will receive the batch task request
- Where:
app/routes/v1/root.py - How: Add a new POST endpoint. This endpoint will call
handle_async_task_requestto perform initial processing and then publish a message to the Pub/Sub queue
Example endpoint:
@router.post("/task/batch/translation")
async def create_batch_translation(
request: Request,
payload: Dict[str, Any],
db: Session = Depends(get_db),
) -> Dict[str, Any]:
task, payload, dataset_ids = handle_async_task_request(
request, db, payload, "translation"
)
message_data = TranslationMessage(
job_id=task.id,
dataset_ids=dataset_ids,
# ... other fields from payload
).model_dump()
queue.publish_task(message_data)
return {
"task_id": task.id,
"dataset_ids": dataset_ids,
"status": "PENDING",
}
3. Implement Payload Preprocessing¶
- What: Add service-specific validation for the request payload
- Where:
app/controllers/async_task_request.pyand create a new preprocessor file inapp/tasks/{service_name}/batch/preprocessor.py - How:
- Add your new request type to the
request_type_mapinapp/controllers/async_task_request.py - Create a preprocessor function in
app/tasks/{service_name}/batch/preprocessor.pyto handle validation - Add your preprocessor to the
task_preprocessor_mapinapp/controllers/async_task_request.py
In app/controllers/async_task_request.py:
# Add to request_type_map
request_type_map = {
"transcription": BatchTranscriptionRequest,
"completion": BatchLLMRequest,
"translation": BatchTranslationRequest, # Add this
}
# Add to task_preprocessor_map
task_preprocessor_map = {
"transcription": validate_transcription_payload,
"completion": preprocess_batch_completion,
"translation": preprocess_batch_translation, # Add this
}
4. Define the Pub/Sub Message¶
- What: Define the structure of the message that will be published to the Pub/Sub queue
- Where:
common/types/message_types.py - How: Create a new Pydantic model for your message
Example message:
5. Implement the Task Handler¶
- What: Submit jobs to the provider, persist state, and mark the task
IN_PROGRESS. Do not finalize here — the reconciler handles that. - Where:
app/tasks/{service_name}/batch/task_handler.py - How: Create a handler function that validates inputs, calls the provider API, builds a
provider_statedict (including"task_type": "<your-type>"), and callsmark_task_in_progress(task_id, provider_state). Return{"status": "IN_PROGRESS"}on success.
provider_state = {
"task_type": "translation", # required — reconciler uses this to route
"provider": provider,
"dataset_ids": dataset_ids,
"jobs": [{"external_job_id": job_id, "status": "SUBMITTED"}],
}
await mark_task_in_progress(task_id, provider_state)
return {"status": "IN_PROGRESS"}
6. Route the Task to the Handler¶
- What: Map the
task_nameto your new handler function - Where:
app/handlers/handler_router.py - How: Add an entry to the
MESSAGE_TYPE_HANDLERSdictionary
Example routing:
MESSAGE_TYPE_HANDLERS = {
"transcription": handle_transcription,
"completion": handle_batch_completion,
"translation": handle_batch_translation, # Add this
}
7. Add Reconciler Support¶
- What: Teach the background reconciler how to poll your service's provider jobs and finalize the task.
- Where:
core/background_tasks/reconciler/ - How:
- Implement a poll function (e.g.
_poll_translation_job_once(provider, job, provider_state) -> dict) that returns{"status": "COMPLETED"|"FAILED"|"ERROR"|<running-status>, "result": ..., "error": ...}. - Either add your
task_typeto an existing reconciler module, or create a newtranslation.pymodule alongsidetranscription.pyandcompletion.py. - Add a dispatch branch in
reconcile_in_progress_tasksincore/background_tasks/reconciler/loop.py:
See Batch Task State Machine for the full provider_state schema conventions and finalization rules.
II. Onboarding a Sync (Single) Service¶
Here's how to add a new synchronous service, like single_translation.
1. Define the Request Payload¶
- What: Define the payload structure for the sync service
- Where:
common/types/request_types.py - How: Create a new Pydantic model
Example payload:
class SingleTranslationRequest(BaseModel):
project_name: str
user_email: str
provider: str = "GOOGLE"
# ... other fields
2. Create the Endpoint¶
- What: Define the API endpoint for the single task
- Where:
app/routes/v1/root.py - How: Add a new POST endpoint that calls
dispatch_single_task. The flow automatically handles file upload, task creation, and execution.
3. Implement Payload Preprocessing¶
- What: Add service-specific validation for the request payload
- Where:
app/controllers/single_task_dispatcher.pyandapp/tasks/{service_name}/single/preprocessor.py - How: Update
request_mapandpayload_validators_mapincontrollers/single_task_dispatcher.py
4. Implement File Validation¶
- What: Create file validation function for your service
- Where:
app/tasks/{service_name}/utils/validate_{service_name}_files.py - How: Create a validation function and add it to
file_validators_mapincontrollers/single_task_dispatcher.py
Example file validation:
file_validators_map = {
"transcription": validate_transcription_audios,
"completion": validate_upload_files,
"translation": validate_translation_files, # Add this
}
5. Implement the Task Handler¶
- What: Write the core business logic for your single service
- Where:
app/tasks/{service_name}/single/task_handler.py - How: Create a handler function that takes
(db, task_id, payload_dict, dataset_id)and returns the result. The handler runs asynchronously and automatically updates task status
6. Register the Handler¶
- What: Map your task type to the handler function
- Where:
app/controllers/single_task_dispatcher.py - How: Add your handler to the
_task_handlersdictionary
C. Maps Reference Guide¶
When onboarding a new service, you need to update several maps across different files. Here's a complete reference:
Async (Batch) Service Maps¶
| Map Name | File Location | Purpose | Example Entry |
|---|---|---|---|
request_type_map |
app/controllers/async_task_request.py |
Maps task names to request models | "translation": BatchTranslationRequest |
task_preprocessor_map |
app/controllers/async_task_request.py |
Maps task names to preprocessor functions | "translation": preprocess_batch_translation |
MESSAGE_TYPE_HANDLERS |
app/handlers/handler_router.py |
Maps task names to handler functions | "translation": handle_batch_translation |
Sync (Single) Service Maps¶
| Map Name | File Location | Purpose | Example Entry |
|---|---|---|---|
request_map |
app/controllers/single_task_dispatcher.py |
Maps task names to request models | "translation": SingleTranslationRequest |
payload_validators_map |
app/controllers/single_task_dispatcher.py |
Maps task names to payload validators | "translation": validate_translation_payload |
file_validators_map |
app/controllers/single_task_dispatcher.py |
Maps task names to file validators | "translation": validate_translation_files |
_task_handlers |
app/controllers/single_task_dispatcher.py |
Maps task names to handler functions | "translation": async_translation_handler |
D. New Service Onboarding Checklists¶
To ensure a smooth onboarding process, please use the following checklists.
1. Async (Batch) Service Onboarding¶
| Step | Task | File/Location | Done (✓) |
|---|---|---|---|
| Define Payloads | Create the Pydantic model for the batch request payload | common/types/request_types.py |
☐ |
| Create the Pydantic model for the Pub/Sub message | common/types/message_types.py |
☐ | |
| Preprocessing | Implement the service-specific payload preprocessor function | tasks/{service_name}/batch/preprocessor.py |
☐ |
| Register the new request type and preprocessor in the controller maps | app/controllers/async_task_request.py |
☐ | |
| Endpoint | Create the new API endpoint for the batch service | app/routes/v1/root.py |
☐ |
In the endpoint, call handle_async_task_request and publish the message |
app/routes/v1/root.py |
☐ | |
| Core Logic | Implement the task handler: submit jobs and call mark_task_in_progress with provider_state |
app/tasks/{service_name}/batch/task_handler.py |
☐ |
Set provider_state["task_type"] to your service name so the reconciler can route it |
app/tasks/{service_name}/batch/task_handler.py |
☐ | |
| Register the new task handler in the handler router | app/handlers/handler_router.py |
☐ | |
| Reconciler | Implement a poll function that checks provider job status | core/background_tasks/reconciler/ |
☐ |
Add a dispatch branch for your task_type in reconcile_in_progress_tasks |
core/background_tasks/reconciler/loop.py |
☐ | |
| Utilities | (Optional) Create file validation or other utility functions | app/tasks/{service_name}/utils/ |
☐ |
2. Sync (Single) Service Onboarding¶
| Step | Task | File/Location | Done (✓) |
|---|---|---|---|
| Define Payload | Create the Pydantic model for the single request payload | common/types/request_types.py |
☐ |
| Preprocessing | Implement the service-specific payload preprocessor function | app/tasks/{service_name}/single/preprocessor.py |
☐ |
| Register the new request type and preprocessor in the controller maps | app/controllers/single_task_dispatcher.py |
☐ | |
| File Validation | Implement file validation function for your service | app/tasks/{service_name}/utils/ |
☐ |
| Register the file validator in the controller maps | app/controllers/single_task_dispatcher.py |
☐ | |
| Core Logic | Implement the core service logic in the task handler | app/tasks/{service_name}/single/task_handler.py |
☐ |
| Handler Registration | Register the task handler in the dispatcher | app/controllers/single_task_dispatcher.py |
☐ |
| Endpoint | Create the new API endpoint for the single service | app/routes/v1/root.py |
☐ |
| Utilities | (Optional) Create other utility functions | app/tasks/{service_name}/utils/ |
☐ |