Skip to content

Language Server: Service Onboarding and Workflow Guide

This document provides a clear overview of the Language Server's internal task processing workflows and outlines the essential steps for integrating new services (e.g., translation).

Each service must support two pipelines:

  • Async (Batch) → Goes through Pub/Sub for background processing
  • Sync (Single) → Processes directly in the request cycle without Pub/Sub

A. Language Server Task Processing Workflow

The following flowcharts illustrate the journey of a user request through the Language Server, from initial input to the final result.

1. Async (Batch) Task Workflow

The journey of a batch request through the Language Server:

flowchart LR
    A["User Input (task_name, dataset_ids, payload)"] --> B["Payload & Model Validation"]
    B --> C["Create DB Entries\nstatus = PENDING"]
    C --> D["Build Task Message"]
    D --> E["Publish to Pub/Sub"]
    E --> F["Subscriber chooses Handler based on task_name"]
    F --> G["Fetch File Metadata from Dataset"]
    G --> H["File preprocessing (if needed)"]
    H --> I["Handler submits jobs to provider\nmark_task_in_progress → IN_PROGRESS"]
    I --> J["Background Reconciler\nevery 120 s"]
    J -->|"all jobs terminal"| K["Finalize: upload result\nCOMPLETED / PARTIAL_COMPLETE / FAILED"]
    J -->|"jobs still running"| J

Two-phase design

The Pub/Sub handler only submits jobs to the provider and exits. The Background Reconciler polls provider APIs every 120 seconds and writes the final result. See Batch Task State Machine for the full status lifecycle and provider_state schema.

2. Sync (Single) Task Workflow

The journey of a single request through the Language Server:

flowchart LR
    A["Route\nPOST /task/{type}"] --> B["Acquire TASK_SEMAPHORE\nmax 50 concurrent"]
    B --> C["_validate_payload\nproject access + Pydantic\n+ task preprocessor"]
    C -->|"has files"| D["File Upload Pipeline\n_process_and_upload_files"]
    C -->|"no files"| E["create_task_checkpoint\nstatus = PENDING"]
    D --> E
    E --> F["_execute_task"]
    F -->|"success"| G["mark_task_completed"]
    F -->|"failure"| H["mark_task_failed"]
    G --> I["Return\n{task_id, status, result}"]
    H --> J["HTTP error"]

One-phase design

Single tasks run fully inside the HTTP request — no Pub/Sub, no background polling. See Single Task State Machine for the full status lifecycle, file pipeline, and dispatcher registry.

Key Features:

  • Immediate Processing: Returns results directly without queuing
  • Async Processing: Uses async handlers for concurrent execution
  • 📁 File Management: Automatically uploads files to cloud storage (Azure/GCS)
  • 💾 Database Updates: Updates task status and response atomically

B. Onboarding a New Service: A Step-by-Step Guide

This section provides a detailed, code-oriented guide for developers to onboard a new service into the Language Server. We will cover both asynchronous (batch) and synchronous (single) pipelines.

Folder Structure for a New Service

When adding a new service, let's say translation, you will need to create the following directory structure:

.
├── tasks
│   └── translation
│       ├── batch
│       │   ├── task_handler.py     # 1. Async task handler
│       │   └── preprocessor.py     # 2. Async payload preprocessor
│       ├── single
│       │   ├── task_handler.py     # 3. Sync task handler
│       │   └── preprocessor.py     # 4. Sync payload preprocessor
│       └── utils                   # 5. Shared utilities
│           └── validate_translation_files.py

File Descriptions:

  1. app/tasks/{service_name}/batch/task_handler.py: Triggered by a Pub/Sub message. Validates inputs, submits jobs to the provider, writes provider_state to the database, and calls mark_task_in_progress. It does not finalize the task — that is done by the background reconciler.

  2. app/tasks/{service_name}/batch/preprocessor.py: Contains payload preprocessing and validation logic for your async service before the task is published to the queue.

  3. app/tasks/{service_name}/single/task_handler.py: Holds the core business logic for the synchronous (single) version of your service. It's called directly from the API endpoint to process the request in real-time and return a response to the user.

  4. app/tasks/{service_name}/single/preprocessor.py: Contains payload preprocessing and validation logic for your sync service before the task is executed by the handler.

  5. app/tasks/{service_name}/utils/: For any utility functions or configurations specific to the translation service, such as file validation logic.


I. Onboarding an Async (Batch) Service

Here's how to add a new asynchronous service, like batch_translation.

1. Define the Request Payload

  • What: Define the structure of the request payload for your service
  • Where: common/types/request_types.py
  • How: Create a new Pydantic model that inherits from BaseModel

Example for BatchTranslationRequest:

class BatchTranslationRequest(BaseModel):
    project_name: str
    user_email: str
    dataset_ids: List[str]
    provider: str = "GOOGLE"
    models: List[str]
    source_language: str
    target_language: str

2. Create the Endpoint

  • What: Define the API endpoint that will receive the batch task request
  • Where: app/routes/v1/root.py
  • How: Add a new POST endpoint. This endpoint will call handle_async_task_request to perform initial processing and then publish a message to the Pub/Sub queue

Example endpoint:

@router.post("/task/batch/translation")
async def create_batch_translation(
    request: Request,
    payload: Dict[str, Any],
    db: Session = Depends(get_db),
) -> Dict[str, Any]:
    task, payload, dataset_ids = handle_async_task_request(
        request, db, payload, "translation"
    )

    message_data = TranslationMessage(
        job_id=task.id,
        dataset_ids=dataset_ids,
        # ... other fields from payload
    ).model_dump()

    queue.publish_task(message_data)

    return {
        "task_id": task.id,
        "dataset_ids": dataset_ids,
        "status": "PENDING",
    }

3. Implement Payload Preprocessing

  • What: Add service-specific validation for the request payload
  • Where: app/controllers/async_task_request.py and create a new preprocessor file in app/tasks/{service_name}/batch/preprocessor.py
  • How:
  • Add your new request type to the request_type_map in app/controllers/async_task_request.py
  • Create a preprocessor function in app/tasks/{service_name}/batch/preprocessor.py to handle validation
  • Add your preprocessor to the task_preprocessor_map in app/controllers/async_task_request.py

In app/controllers/async_task_request.py:

# Add to request_type_map
request_type_map = {
    "transcription": BatchTranscriptionRequest,
    "completion": BatchLLMRequest,
    "translation": BatchTranslationRequest, # Add this
}

# Add to task_preprocessor_map
task_preprocessor_map = {
    "transcription": validate_transcription_payload,
    "completion": preprocess_batch_completion,
    "translation": preprocess_batch_translation, # Add this
}

4. Define the Pub/Sub Message

  • What: Define the structure of the message that will be published to the Pub/Sub queue
  • Where: common/types/message_types.py
  • How: Create a new Pydantic model for your message

Example message:

class TranslationMessage(BaseModel):
    job_id: str
    dataset_ids: List[str]
    # ... other fields

5. Implement the Task Handler

  • What: Submit jobs to the provider, persist state, and mark the task IN_PROGRESS. Do not finalize here — the reconciler handles that.
  • Where: app/tasks/{service_name}/batch/task_handler.py
  • How: Create a handler function that validates inputs, calls the provider API, builds a provider_state dict (including "task_type": "<your-type>"), and calls mark_task_in_progress(task_id, provider_state). Return {"status": "IN_PROGRESS"} on success.
provider_state = {
    "task_type": "translation",   # required — reconciler uses this to route
    "provider": provider,
    "dataset_ids": dataset_ids,
    "jobs": [{"external_job_id": job_id, "status": "SUBMITTED"}],
}
await mark_task_in_progress(task_id, provider_state)
return {"status": "IN_PROGRESS"}

6. Route the Task to the Handler

  • What: Map the task_name to your new handler function
  • Where: app/handlers/handler_router.py
  • How: Add an entry to the MESSAGE_TYPE_HANDLERS dictionary

Example routing:

MESSAGE_TYPE_HANDLERS = {
    "transcription": handle_transcription,
    "completion": handle_batch_completion,
    "translation": handle_batch_translation, # Add this
}

7. Add Reconciler Support

  • What: Teach the background reconciler how to poll your service's provider jobs and finalize the task.
  • Where: core/background_tasks/reconciler/
  • How:
  • Implement a poll function (e.g. _poll_translation_job_once(provider, job, provider_state) -> dict) that returns {"status": "COMPLETED"|"FAILED"|"ERROR"|<running-status>, "result": ..., "error": ...}.
  • Either add your task_type to an existing reconciler module, or create a new translation.py module alongside transcription.py and completion.py.
  • Add a dispatch branch in reconcile_in_progress_tasks in core/background_tasks/reconciler/loop.py:
elif task_type == "translation":
    await reconcile_translation_task(task)

See Batch Task State Machine for the full provider_state schema conventions and finalization rules.


II. Onboarding a Sync (Single) Service

Here's how to add a new synchronous service, like single_translation.

1. Define the Request Payload

  • What: Define the payload structure for the sync service
  • Where: common/types/request_types.py
  • How: Create a new Pydantic model

Example payload:

class SingleTranslationRequest(BaseModel):
    project_name: str
    user_email: str
    provider: str = "GOOGLE"
    # ... other fields

2. Create the Endpoint

  • What: Define the API endpoint for the single task
  • Where: app/routes/v1/root.py
  • How: Add a new POST endpoint that calls dispatch_single_task. The flow automatically handles file upload, task creation, and execution.

3. Implement Payload Preprocessing

  • What: Add service-specific validation for the request payload
  • Where: app/controllers/single_task_dispatcher.py and app/tasks/{service_name}/single/preprocessor.py
  • How: Update request_map and payload_validators_map in controllers/single_task_dispatcher.py

4. Implement File Validation

  • What: Create file validation function for your service
  • Where: app/tasks/{service_name}/utils/validate_{service_name}_files.py
  • How: Create a validation function and add it to file_validators_map in controllers/single_task_dispatcher.py

Example file validation:

file_validators_map = {
    "transcription": validate_transcription_audios,
    "completion": validate_upload_files,
    "translation": validate_translation_files, # Add this
}

5. Implement the Task Handler

  • What: Write the core business logic for your single service
  • Where: app/tasks/{service_name}/single/task_handler.py
  • How: Create a handler function that takes (db, task_id, payload_dict, dataset_id) and returns the result. The handler runs asynchronously and automatically updates task status

6. Register the Handler

  • What: Map your task type to the handler function
  • Where: app/controllers/single_task_dispatcher.py
  • How: Add your handler to the _task_handlers dictionary

C. Maps Reference Guide

When onboarding a new service, you need to update several maps across different files. Here's a complete reference:

Async (Batch) Service Maps

Map Name File Location Purpose Example Entry
request_type_map app/controllers/async_task_request.py Maps task names to request models "translation": BatchTranslationRequest
task_preprocessor_map app/controllers/async_task_request.py Maps task names to preprocessor functions "translation": preprocess_batch_translation
MESSAGE_TYPE_HANDLERS app/handlers/handler_router.py Maps task names to handler functions "translation": handle_batch_translation

Sync (Single) Service Maps

Map Name File Location Purpose Example Entry
request_map app/controllers/single_task_dispatcher.py Maps task names to request models "translation": SingleTranslationRequest
payload_validators_map app/controllers/single_task_dispatcher.py Maps task names to payload validators "translation": validate_translation_payload
file_validators_map app/controllers/single_task_dispatcher.py Maps task names to file validators "translation": validate_translation_files
_task_handlers app/controllers/single_task_dispatcher.py Maps task names to handler functions "translation": async_translation_handler

D. New Service Onboarding Checklists

To ensure a smooth onboarding process, please use the following checklists.

1. Async (Batch) Service Onboarding

Step Task File/Location Done (✓)
Define Payloads Create the Pydantic model for the batch request payload common/types/request_types.py
Create the Pydantic model for the Pub/Sub message common/types/message_types.py
Preprocessing Implement the service-specific payload preprocessor function tasks/{service_name}/batch/preprocessor.py
Register the new request type and preprocessor in the controller maps app/controllers/async_task_request.py
Endpoint Create the new API endpoint for the batch service app/routes/v1/root.py
In the endpoint, call handle_async_task_request and publish the message app/routes/v1/root.py
Core Logic Implement the task handler: submit jobs and call mark_task_in_progress with provider_state app/tasks/{service_name}/batch/task_handler.py
Set provider_state["task_type"] to your service name so the reconciler can route it app/tasks/{service_name}/batch/task_handler.py
Register the new task handler in the handler router app/handlers/handler_router.py
Reconciler Implement a poll function that checks provider job status core/background_tasks/reconciler/
Add a dispatch branch for your task_type in reconcile_in_progress_tasks core/background_tasks/reconciler/loop.py
Utilities (Optional) Create file validation or other utility functions app/tasks/{service_name}/utils/

2. Sync (Single) Service Onboarding

Step Task File/Location Done (✓)
Define Payload Create the Pydantic model for the single request payload common/types/request_types.py
Preprocessing Implement the service-specific payload preprocessor function app/tasks/{service_name}/single/preprocessor.py
Register the new request type and preprocessor in the controller maps app/controllers/single_task_dispatcher.py
File Validation Implement file validation function for your service app/tasks/{service_name}/utils/
Register the file validator in the controller maps app/controllers/single_task_dispatcher.py
Core Logic Implement the core service logic in the task handler app/tasks/{service_name}/single/task_handler.py
Handler Registration Register the task handler in the dispatcher app/controllers/single_task_dispatcher.py
Endpoint Create the new API endpoint for the single service app/routes/v1/root.py
Utilities (Optional) Create other utility functions app/tasks/{service_name}/utils/