You can programmatically execute and manage batch jobs using the Blaxel API and Python SDK.
Both synchronous and asynchronous usage is supported: use method_name for synchronous operations and amethod_name() for asynchronous operations.
The code examples in this section assume that your batch job has already been successfully deployed on Blaxel.
Dispatch job execution
Dispatch a job and wait for execution to start:
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import CreateJobExecutionRequest
job = bl_job( "my-job" )
request = CreateJobExecutionRequest( tasks = [{ "duration" : 60 }])
execution_id = job.create_execution(request)
result = job.wait_for_execution(execution_id, max_wait = 180 )
Time units in the Python SDK are specified in seconds.
Dispatch job execution with tasks
Dispatch a new job with tasks:
# synchronous
request = CreateJobExecutionRequest( tasks = [{ "duration" : 30 }, { "duration" : 60 }])
execution_id = job.create_execution(request)
# asynchronous
execution_id = await job.acreate_execution(request)
Get execution details
Get full execution details:
# synchronous
execution = job.get_execution(execution_id)
# asynchronous
execution = await job.aget_execution(execution_id)
Get status
The possible job states are:
pending → running → completed
↘ failed
Get only the job status (faster than retrieving full execution details):
# synchronous
status = job.get_execution_status(execution_id)
# asynchronous
status = await job.aget_execution_status(execution_id)
List executions
List all executions for a job:
# synchronous
executions = job.list_executions( limit = 50 )
# asynchronous
executions = await job.alist_executions( limit = 50 )
Wait for completion
Poll until execution completes:
# synchronous
result = job.wait_for_execution(
execution_id,
max_wait = 300 , # 5 minutes (seconds)
interval = 2 # Poll every 2 seconds (seconds)
)
# asynchronous
result = await job.await_for_execution(execution_id, max_wait = 300 , interval = 2 )
When polling:
adjust the interval based on the expected task duration
set the max_wait timeout longer than the expected total task duration
Override environment and memory for a single job execution
Override memory and/or environment variables for a single job execution, without modifying the deployed job configuration:
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import (
CreateJobExecutionRequest,
)
from blaxel.core.client.models.create_job_execution_request_env import (
CreateJobExecutionRequestEnv,
)
job = bl_job( "my-job" )
env = CreateJobExecutionRequestEnv()
env[ "CUSTOM_ENV" ] = "OVERRIDE"
env[ "LOG_LEVEL" ] = "debug"
request = CreateJobExecutionRequest(
tasks = [{ "name" : "Combined" }],
memory = 1024 , # MB (must be <= deployed job memory)
env = env,
)
# synchronous
execution_id = job.create_execution(request)
execution = job.get_execution(execution_id)
# asynchronous
execution_id = await job.acreate_execution(request)
execution = await job.aget_execution(execution_id)
print (execution.status)
Memory overrides are expressed in MB and are downward-only: the value must be <= the memory configured at deployment time. For example, if the job is deployed with 4096 MB, execution overrides must be <= 4096 MB.
env supports multiple environment variables
memory and env can be set independently or together
Overrides apply only to the current execution
Complete example
Here is a complete example using synchronous operations:
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import CreateJobExecutionRequest
job = bl_job( "my-job" )
# create execution
request = CreateJobExecutionRequest( tasks = [{ "name" : "John" }, { "name" : "Jane" }])
execution_id = job.create_execution(request)
# monitor
status = job.get_execution_status(execution_id)
print ( f "Status: { status } " )
# wait
try :
result = job.wait_for_execution(execution_id, max_wait = 180 , interval = 5 )
print ( f "Completed: { result.status } " )
except Exception as error:
print ( f "Timeout: { error } " )
Here is a complete example using asynchronous operations:
import asyncio
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import CreateJobExecutionRequest
async def main ():
job = bl_job( "my-job" )
request = CreateJobExecutionRequest( tasks = [{ "name" : "John" }, { "name" : "Jane" }])
execution_id = await job.acreate_execution(request)
status = await job.aget_execution_status(execution_id)
print ( f "Status: { status } " )
result = await job.await_for_execution(execution_id, max_wait = 180 , interval = 5 )
print ( f "Completed: { result.status } " )
asyncio.run(main())
Deploy a job Learn how to deploy your AI batch jobs on Blaxel as a serverless auto-scalable endpoint.