Skip to content

Language Server: Service Onboarding and Workflow Guide

This document provides a clear overview of the Language Server's internal task processing workflows and outlines the essential steps for integrating new services (e.g., translation).

Each service must support two pipelines:

  • Async (Batch) → Goes through Pub/Sub for background processing
  • Sync (Single) → Processes directly in the request cycle without Pub/Sub

A. Language Server Task Processing Workflow

The following flowcharts illustrate the journey of a user request through the Language Server, from initial input to the final result.

1. Async (Batch) Task Workflow

The journey of a batch request through the Language Server:

flowchart LR
    A["User Input (task_name, dataset_ids, payload)"] --> B["Payload & Model Validation"]
    B --> C["Create DB Entries"]
    C --> D["Build Task Message"]
    D --> E["Publish to Pub/Sub"]
    E --> F["Subscriber chooses Handler based on task_name"]
    F --> G["Fetch File Metadata from Dataset"]
    G --> H["File preprocessing (if needed)"]
    H --> I["Handler Executes Task Logic"]
    I --> J["Save Output in task_request_response & Blob Storage"]

2. Sync (Single) Task Workflow

The journey of a single request through the Language Server:

flowchart LR
    A["User Input (task_name, file, payload)"] --> B["Payload & Model Validation"]
    B --> C["File Upload to Cloud Storage"]
    C --> D["Create Dataset & File Records"]
    D --> E["Create Task Entry"]
    E --> F["Thread Pool Executor"]
    F --> G["Task-Specific Handler"]
    G --> H["Update Task Status & Response"]
    H --> I["Return Direct Response to User"]

Key Features:

  • Immediate Processing: Returns results directly without queuing
  • 🔄 Thread Pool: Uses thread pool executor to prevent main thread blocking
  • 📁 File Management: Automatically uploads files to cloud storage (Azure/GCS)
  • 💾 Database Updates: Updates task status and response atomically

B. Onboarding a New Service: A Step-by-Step Guide

This section provides a detailed, code-oriented guide for developers to onboard a new service into the Language Server. We will cover both asynchronous (batch) and synchronous (single) pipelines.

Folder Structure for a New Service

When adding a new service, let's say translation, you will need to create the following directory structure:

.
├── tasks
│   └── translation
│       ├── batch
│       │   ├── task_handler.py     # 1. Async task handler
│       │   └── preprocessor.py     # 2. Async payload preprocessor
│       ├── single
│       │   ├── task_handler.py     # 3. Sync task handler
│       │   └── preprocessor.py     # 4. Sync payload preprocessor
│       └── utils                   # 5. Shared utilities
│           └── validate_translation_files.py

File Descriptions:

  1. app/tasks/{service_name}/batch/task_handler.py: Contains the core business logic for the asynchronous (batch) version of your service. It is triggered by a message from the Pub/Sub queue and is responsible for executing the long-running task, processing the data, and storing the results.

  2. app/tasks/{service_name}/batch/preprocessor.py: Contains payload preprocessing and validation logic for your async service before the task is published to the queue.

  3. app/tasks/{service_name}/single/task_handler.py: Holds the core business logic for the synchronous (single) version of your service. It's called directly from the API endpoint to process the request in real-time and return a response to the user.

  4. app/tasks/{service_name}/single/preprocessor.py: Contains payload preprocessing and validation logic for your sync service before the task is executed by the handler.

  5. app/tasks/{service_name}/utils/: For any utility functions or configurations specific to the translation service, such as file validation logic.


I. Onboarding an Async (Batch) Service

Here's how to add a new asynchronous service, like batch_translation.

1. Define the Request Payload

  • What: Define the structure of the request payload for your service
  • Where: common/types/request_types.py
  • How: Create a new Pydantic model that inherits from BaseModel

Example for BatchTranslationRequest:

class BatchTranslationRequest(BaseModel):
    project_name: str
    user_email: str
    dataset_ids: List[str]
    provider: str = "GOOGLE"
    models: List[str]
    source_language: str
    target_language: str

2. Create the Endpoint

  • What: Define the API endpoint that will receive the batch task request
  • Where: app/routes/v1/root.py
  • How: Add a new POST endpoint. This endpoint will call handle_async_task_request to perform initial processing and then publish a message to the Pub/Sub queue

Example endpoint:

@router.post("/task/batch/translation")
async def create_batch_translation(
    request: Request,
    payload: Dict[str, Any],
    db: Session = Depends(get_db),
) -> Dict[str, Any]:
    task, payload, dataset_ids = handle_async_task_request(
        request, db, payload, "translation"
    )

    message_data = TranslationMessage(
        job_id=task.id,
        dataset_ids=dataset_ids,
        # ... other fields from payload
    ).model_dump()

    queue.publish_task(message_data)

    return {
        "task_id": task.id,
        "dataset_ids": dataset_ids,
        "status": "PENDING",
    }

3. Implement Payload Preprocessing

  • What: Add service-specific validation for the request payload
  • Where: app/controllers/async_task_request.py and create a new preprocessor file in app/tasks/{service_name}/batch/preprocessor.py
  • How:
  • Add your new request type to the request_type_map in app/controllers/async_task_request.py
  • Create a preprocessor function in app/tasks/{service_name}/batch/preprocessor.py to handle validation
  • Add your preprocessor to the task_preprocessor_map in app/controllers/async_task_request.py

In app/controllers/async_task_request.py:

# Add to request_type_map
request_type_map = {
    "transcription": BatchTranscriptionRequest,
    "completion": BatchLLMRequest,
    "translation": BatchTranslationRequest, # Add this
}

# Add to task_preprocessor_map
task_preprocessor_map = {
    "transcription": validate_transcription_payload,
    "completion": preprocess_batch_completion,
    "translation": preprocess_batch_translation, # Add this
}

4. Define the Pub/Sub Message

  • What: Define the structure of the message that will be published to the Pub/Sub queue
  • Where: common/types/message_types.py
  • How: Create a new Pydantic model for your message

Example message:

class TranslationMessage(BaseModel):
    job_id: str
    dataset_ids: List[str]
    # ... other fields

5. Implement the Task Handler

  • What: Write the core business logic for your service. This code will be executed by the Pub/Sub subscriber
  • Where: app/tasks/{service_name}/batch/task_handler.py
  • How: Create a handler function that takes the message as input, processes the files, and performs the task

6. Route the Task to the Handler

  • What: Map the task_name to your new handler function
  • Where: app/handlers/handler_router.py
  • How: Add an entry to the MESSAGE_TYPE_HANDLERS dictionary

Example routing:

MESSAGE_TYPE_HANDLERS = {
    "transcription": handle_transcription,
    "completion": handle_batch_completion,
    "translation": handle_batch_translation, # Add this
}

II. Onboarding a Sync (Single) Service

Here's how to add a new synchronous service, like single_translation.

1. Define the Request Payload

  • What: Define the payload structure for the sync service
  • Where: common/types/request_types.py
  • How: Create a new Pydantic model

Example payload:

class SingleTranslationRequest(BaseModel):
    project_name: str
    user_email: str
    provider: str = "GOOGLE"
    # ... other fields

2. Create the Endpoint

  • What: Define the API endpoint for the synchronous task
  • Where: app/routes/v1/root.py
  • How: Add a new POST endpoint that calls handle_sync_task_request. The sync flow automatically handles file upload, task creation, and execution via the thread pool executor

3. Implement Payload Preprocessing

  • What: Add service-specific validation for the sync request payload
  • Where: app/controllers/sync_task_request.py and app/tasks/{service_name}/single/preprocessor.py
  • How: Update request_map and payload_validators_map in controllers/sync_task_request.py

4. Implement File Validation

  • What: Create file validation function for your service
  • Where: app/tasks/{service_name}/utils/validate_{service_name}_files.py
  • How: Create a validation function and add it to file_validators_map in controllers/sync_task_request.py

Example file validation:

file_validators_map = {
    "transcription": validate_transcription_audios,
    "completion": validate_upload_files,
    "translation": validate_translation_files, # Add this
}

5. Implement the Task Handler

  • What: Write the core business logic for your sync service
  • Where: app/tasks/{service_name}/single/task_handler.py
  • How: Create a handler function that takes (db, task_id, payload_dict, dataset_id) and returns the result. The handler runs in a thread pool and automatically updates task status

6. Register the Handler

  • What: Map your task type to the handler function
  • Where: app/controllers/sync_task_executor.py
  • How: Add your handler to the task_handlers dictionary

C. Maps Reference Guide

When onboarding a new service, you need to update several maps across different files. Here's a complete reference:

Async (Batch) Service Maps

Map Name File Location Purpose Example Entry
request_type_map app/controllers/async_task_request.py Maps task names to request models "translation": BatchTranslationRequest
task_preprocessor_map app/controllers/async_task_request.py Maps task names to preprocessor functions "translation": preprocess_batch_translation
MESSAGE_TYPE_HANDLERS app/handlers/handler_router.py Maps task names to handler functions "translation": handle_batch_translation

Sync (Single) Service Maps

Map Name File Location Purpose Example Entry
request_map app/controllers/sync_task_request.py Maps task names to request models "translation": SingleTranslationRequest
payload_validators_map app/controllers/sync_task_request.py Maps task names to payload validators "translation": validate_translation_payload
file_validators_map app/controllers/sync_task_request.py Maps task names to file validators "translation": validate_translation_files
task_handlers app/controllers/sync_task_executor.py Maps task names to handler functions "translation": sync_translation_handler

D. New Service Onboarding Checklists

To ensure a smooth onboarding process, please use the following checklists.

1. Async (Batch) Service Onboarding

Step Task File/Location Done (✓)
Define Payloads Create the Pydantic model for the batch request payload common/types/request_types.py
Create the Pydantic model for the Pub/Sub message common/types/message_types.py
Preprocessing Implement the service-specific payload preprocessor function tasks/{service_name}/batch/preprocessor.py
Register the new request type and preprocessor in the controller maps app/controllers/async_task_request.py
Endpoint Create the new API endpoint for the batch service app/routes/v1/root.py
In the endpoint, call handle_async_task_request and publish the message app/routes/v1/root.py
Core Logic Implement the core service logic in the task handler app/tasks/{service_name}/batch/task_handler.py
Register the new task handler in the handler router app/handlers/handler_router.py
Utilities (Optional) Create file validation or other utility functions app/tasks/{service_name}/utils/
Finalization In the handler, update the task status and response in the database app/tasks/{service_name}/batch/task_handler.py

2. Sync (Single) Service Onboarding

Step Task File/Location Done (✓)
Define Payload Create the Pydantic model for the single request payload common/types/request_types.py
Preprocessing Implement the service-specific payload preprocessor function app/tasks/{service_name}/single/preprocessor.py
Register the new request type and preprocessor in the controller maps app/controllers/sync_task_request.py
File Validation Implement file validation function for your service app/tasks/{service_name}/utils/
Register the file validator in the controller maps app/controllers/sync_task_request.py
Core Logic Implement the core service logic in the task handler app/tasks/{service_name}/single/task_handler.py
Handler Registration Register the task handler in the sync executor app/controllers/sync_task_executor.py
Endpoint Create the new API endpoint for the single service app/routes/v1/root.py
Utilities (Optional) Create other utility functions app/tasks/{service_name}/utils/