Language Server: Service Onboarding and Workflow Guide¶
This document provides a clear overview of the Language Server's internal task processing workflows and outlines the essential steps for integrating new services (e.g., translation).
Each service must support two pipelines:¶
- Async (Batch) → Goes through Pub/Sub for background processing
- Sync (Single) → Processes directly in the request cycle without Pub/Sub
A. Language Server Task Processing Workflow¶
The following flowcharts illustrate the journey of a user request through the Language Server, from initial input to the final result.
1. Async (Batch) Task Workflow¶
The journey of a batch request through the Language Server:
flowchart LR
A["User Input (task_name, dataset_ids, payload)"] --> B["Payload & Model Validation"]
B --> C["Create DB Entries"]
C --> D["Build Task Message"]
D --> E["Publish to Pub/Sub"]
E --> F["Subscriber chooses Handler based on task_name"]
F --> G["Fetch File Metadata from Dataset"]
G --> H["File preprocessing (if needed)"]
H --> I["Handler Executes Task Logic"]
I --> J["Save Output in task_request_response & Blob Storage"]
2. Sync (Single) Task Workflow¶
The journey of a single request through the Language Server:
flowchart LR
A["User Input (task_name, file, payload)"] --> B["Payload & Model Validation"]
B --> C["File Upload to Cloud Storage"]
C --> D["Create Dataset & File Records"]
D --> E["Create Task Entry"]
E --> F["Thread Pool Executor"]
F --> G["Task-Specific Handler"]
G --> H["Update Task Status & Response"]
H --> I["Return Direct Response to User"]
Key Features:¶
- ✅ Immediate Processing: Returns results directly without queuing
- 🔄 Thread Pool: Uses thread pool executor to prevent main thread blocking
- 📁 File Management: Automatically uploads files to cloud storage (Azure/GCS)
- 💾 Database Updates: Updates task status and response atomically
B. Onboarding a New Service: A Step-by-Step Guide¶
This section provides a detailed, code-oriented guide for developers to onboard a new service into the Language Server. We will cover both asynchronous (batch) and synchronous (single) pipelines.
Folder Structure for a New Service¶
When adding a new service, let's say translation, you will need to create the following directory structure:
.
├── tasks
│ └── translation
│ ├── batch
│ │ ├── task_handler.py # 1. Async task handler
│ │ └── preprocessor.py # 2. Async payload preprocessor
│ ├── single
│ │ ├── task_handler.py # 3. Sync task handler
│ │ └── preprocessor.py # 4. Sync payload preprocessor
│ └── utils # 5. Shared utilities
│ └── validate_translation_files.py
File Descriptions:
-
app/tasks/{service_name}/batch/task_handler.py: Contains the core business logic for the asynchronous (batch) version of your service. It is triggered by a message from the Pub/Sub queue and is responsible for executing the long-running task, processing the data, and storing the results. -
app/tasks/{service_name}/batch/preprocessor.py: Contains payload preprocessing and validation logic for your async service before the task is published to the queue. -
app/tasks/{service_name}/single/task_handler.py: Holds the core business logic for the synchronous (single) version of your service. It's called directly from the API endpoint to process the request in real-time and return a response to the user. -
app/tasks/{service_name}/single/preprocessor.py: Contains payload preprocessing and validation logic for your sync service before the task is executed by the handler. -
app/tasks/{service_name}/utils/: For any utility functions or configurations specific to thetranslationservice, such as file validation logic.
I. Onboarding an Async (Batch) Service¶
Here's how to add a new asynchronous service, like batch_translation.
1. Define the Request Payload¶
- What: Define the structure of the request payload for your service
- Where:
common/types/request_types.py - How: Create a new Pydantic model that inherits from
BaseModel
Example for BatchTranslationRequest:
class BatchTranslationRequest(BaseModel):
project_name: str
user_email: str
dataset_ids: List[str]
provider: str = "GOOGLE"
models: List[str]
source_language: str
target_language: str
2. Create the Endpoint¶
- What: Define the API endpoint that will receive the batch task request
- Where:
app/routes/v1/root.py - How: Add a new POST endpoint. This endpoint will call
handle_async_task_requestto perform initial processing and then publish a message to the Pub/Sub queue
Example endpoint:
@router.post("/task/batch/translation")
async def create_batch_translation(
request: Request,
payload: Dict[str, Any],
db: Session = Depends(get_db),
) -> Dict[str, Any]:
task, payload, dataset_ids = handle_async_task_request(
request, db, payload, "translation"
)
message_data = TranslationMessage(
job_id=task.id,
dataset_ids=dataset_ids,
# ... other fields from payload
).model_dump()
queue.publish_task(message_data)
return {
"task_id": task.id,
"dataset_ids": dataset_ids,
"status": "PENDING",
}
3. Implement Payload Preprocessing¶
- What: Add service-specific validation for the request payload
- Where:
app/controllers/async_task_request.pyand create a new preprocessor file inapp/tasks/{service_name}/batch/preprocessor.py - How:
- Add your new request type to the
request_type_mapinapp/controllers/async_task_request.py - Create a preprocessor function in
app/tasks/{service_name}/batch/preprocessor.pyto handle validation - Add your preprocessor to the
task_preprocessor_mapinapp/controllers/async_task_request.py
In app/controllers/async_task_request.py:
# Add to request_type_map
request_type_map = {
"transcription": BatchTranscriptionRequest,
"completion": BatchLLMRequest,
"translation": BatchTranslationRequest, # Add this
}
# Add to task_preprocessor_map
task_preprocessor_map = {
"transcription": validate_transcription_payload,
"completion": preprocess_batch_completion,
"translation": preprocess_batch_translation, # Add this
}
4. Define the Pub/Sub Message¶
- What: Define the structure of the message that will be published to the Pub/Sub queue
- Where:
common/types/message_types.py - How: Create a new Pydantic model for your message
Example message:
5. Implement the Task Handler¶
- What: Write the core business logic for your service. This code will be executed by the Pub/Sub subscriber
- Where:
app/tasks/{service_name}/batch/task_handler.py - How: Create a handler function that takes the message as input, processes the files, and performs the task
6. Route the Task to the Handler¶
- What: Map the
task_nameto your new handler function - Where:
app/handlers/handler_router.py - How: Add an entry to the
MESSAGE_TYPE_HANDLERSdictionary
Example routing:
MESSAGE_TYPE_HANDLERS = {
"transcription": handle_transcription,
"completion": handle_batch_completion,
"translation": handle_batch_translation, # Add this
}
II. Onboarding a Sync (Single) Service¶
Here's how to add a new synchronous service, like single_translation.
1. Define the Request Payload¶
- What: Define the payload structure for the sync service
- Where:
common/types/request_types.py - How: Create a new Pydantic model
Example payload:
class SingleTranslationRequest(BaseModel):
project_name: str
user_email: str
provider: str = "GOOGLE"
# ... other fields
2. Create the Endpoint¶
- What: Define the API endpoint for the synchronous task
- Where:
app/routes/v1/root.py - How: Add a new POST endpoint that calls
handle_sync_task_request. The sync flow automatically handles file upload, task creation, and execution via the thread pool executor
3. Implement Payload Preprocessing¶
- What: Add service-specific validation for the sync request payload
- Where:
app/controllers/sync_task_request.pyandapp/tasks/{service_name}/single/preprocessor.py - How: Update
request_mapandpayload_validators_mapincontrollers/sync_task_request.py
4. Implement File Validation¶
- What: Create file validation function for your service
- Where:
app/tasks/{service_name}/utils/validate_{service_name}_files.py - How: Create a validation function and add it to
file_validators_mapincontrollers/sync_task_request.py
Example file validation:
file_validators_map = {
"transcription": validate_transcription_audios,
"completion": validate_upload_files,
"translation": validate_translation_files, # Add this
}
5. Implement the Task Handler¶
- What: Write the core business logic for your sync service
- Where:
app/tasks/{service_name}/single/task_handler.py - How: Create a handler function that takes
(db, task_id, payload_dict, dataset_id)and returns the result. The handler runs in a thread pool and automatically updates task status
6. Register the Handler¶
- What: Map your task type to the handler function
- Where:
app/controllers/sync_task_executor.py - How: Add your handler to the
task_handlersdictionary
C. Maps Reference Guide¶
When onboarding a new service, you need to update several maps across different files. Here's a complete reference:
Async (Batch) Service Maps¶
| Map Name | File Location | Purpose | Example Entry |
|---|---|---|---|
request_type_map |
app/controllers/async_task_request.py |
Maps task names to request models | "translation": BatchTranslationRequest |
task_preprocessor_map |
app/controllers/async_task_request.py |
Maps task names to preprocessor functions | "translation": preprocess_batch_translation |
MESSAGE_TYPE_HANDLERS |
app/handlers/handler_router.py |
Maps task names to handler functions | "translation": handle_batch_translation |
Sync (Single) Service Maps¶
| Map Name | File Location | Purpose | Example Entry |
|---|---|---|---|
request_map |
app/controllers/sync_task_request.py |
Maps task names to request models | "translation": SingleTranslationRequest |
payload_validators_map |
app/controllers/sync_task_request.py |
Maps task names to payload validators | "translation": validate_translation_payload |
file_validators_map |
app/controllers/sync_task_request.py |
Maps task names to file validators | "translation": validate_translation_files |
task_handlers |
app/controllers/sync_task_executor.py |
Maps task names to handler functions | "translation": sync_translation_handler |
D. New Service Onboarding Checklists¶
To ensure a smooth onboarding process, please use the following checklists.
1. Async (Batch) Service Onboarding¶
| Step | Task | File/Location | Done (✓) |
|---|---|---|---|
| Define Payloads | Create the Pydantic model for the batch request payload | common/types/request_types.py |
☐ |
| Create the Pydantic model for the Pub/Sub message | common/types/message_types.py |
☐ | |
| Preprocessing | Implement the service-specific payload preprocessor function | tasks/{service_name}/batch/preprocessor.py |
☐ |
| Register the new request type and preprocessor in the controller maps | app/controllers/async_task_request.py |
☐ | |
| Endpoint | Create the new API endpoint for the batch service | app/routes/v1/root.py |
☐ |
In the endpoint, call handle_async_task_request and publish the message |
app/routes/v1/root.py |
☐ | |
| Core Logic | Implement the core service logic in the task handler | app/tasks/{service_name}/batch/task_handler.py |
☐ |
| Register the new task handler in the handler router | app/handlers/handler_router.py |
☐ | |
| Utilities | (Optional) Create file validation or other utility functions | app/tasks/{service_name}/utils/ |
☐ |
| Finalization | In the handler, update the task status and response in the database | app/tasks/{service_name}/batch/task_handler.py |
☐ |
2. Sync (Single) Service Onboarding¶
| Step | Task | File/Location | Done (✓) |
|---|---|---|---|
| Define Payload | Create the Pydantic model for the single request payload | common/types/request_types.py |
☐ |
| Preprocessing | Implement the service-specific payload preprocessor function | app/tasks/{service_name}/single/preprocessor.py |
☐ |
| Register the new request type and preprocessor in the controller maps | app/controllers/sync_task_request.py |
☐ | |
| File Validation | Implement file validation function for your service | app/tasks/{service_name}/utils/ |
☐ |
| Register the file validator in the controller maps | app/controllers/sync_task_request.py |
☐ | |
| Core Logic | Implement the core service logic in the task handler | app/tasks/{service_name}/single/task_handler.py |
☐ |
| Handler Registration | Register the task handler in the sync executor | app/controllers/sync_task_executor.py |
☐ |
| Endpoint | Create the new API endpoint for the single service | app/routes/v1/root.py |
☐ |
| Utilities | (Optional) Create other utility functions | app/tasks/{service_name}/utils/ |
☐ |