You can programmatically execute and manage batch jobs using the Blaxel API and Python SDK.
Both synchronous and asynchronous usage is supported: use method_name for synchronous operations and amethod_name() for asynchronous operations.
The code examples in this section assume that your batch job has already been successfully deployed on Blaxel.
Dispatch job execution
Dispatch a job and wait for execution to start:
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import CreateJobExecutionRequest
job = bl_job( "my-job" )
request = CreateJobExecutionRequest( tasks = [{ "duration" : 60 }])
execution_id = job.create_execution(request)
result = job.wait_for_execution(execution_id, max_wait = 180 )
Time units in the Python SDK are specified in seconds.
Dispatch job execution with tasks
Dispatch a new job with tasks:
# synchronous
request = CreateJobExecutionRequest( tasks = [{ "duration" : 30 }, { "duration" : 60 }])
execution_id = job.create_execution(request)
# asynchronous
execution_id = await job.acreate_execution(request)
Get execution details
Get full execution details:
# synchronous
execution = job.get_execution(execution_id)
# asynchronous
execution = await job.aget_execution(execution_id)
Get status
The possible job states are:
pending → running → completed
↘ failed
Get only the job status (faster than retrieving full execution details):
# synchronous
status = job.get_execution_status(execution_id)
# asynchronous
status = await job.aget_execution_status(execution_id)
List executions
List all executions for a job:
# synchronous
executions = job.list_executions( limit = 50 )
# asynchronous
executions = await job.alist_executions( limit = 50 )
Wait for completion
Poll until execution completes:
# synchronous
result = job.wait_for_execution(
execution_id,
max_wait = 300 , # 5 minutes (seconds)
interval = 2 # Poll every 2 seconds (seconds)
)
# asynchronous
result = await job.await_for_execution(execution_id, max_wait = 300 , interval = 2 )
When polling:
adjust the interval based on the expected task duration
set the max_wait timeout longer than the expected total task duration
Complete example
Here is a complete example using synchronous operations:
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import CreateJobExecutionRequest
job = bl_job( "my-job" )
# create execution
request = CreateJobExecutionRequest( tasks = [{ "name" : "John" }, { "name" : "Jane" }])
execution_id = job.create_execution(request)
# monitor
status = job.get_execution_status(execution_id)
print ( f "Status: { status } " )
# wait
try :
result = job.wait_for_execution(execution_id, max_wait = 180 , interval = 5 )
print ( f "Completed: { result.status } " )
except Exception as error:
print ( f "Timeout: { error } " )
Here is a complete example using asynchronous operations:
import asyncio
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import CreateJobExecutionRequest
async def main ():
job = bl_job( "my-job" )
request = CreateJobExecutionRequest( tasks = [{ "name" : "John" }, { "name" : "Jane" }])
execution_id = await job.acreate_execution(request)
status = await job.aget_execution_status(execution_id)
print ( f "Status: { status } " )
result = await job.await_for_execution(execution_id, max_wait = 180 , interval = 5 )
print ( f "Completed: { result.status } " )
asyncio.run(main())
Deploy a job Learn how to deploy your AI batch jobs on Blaxel as a serverless auto-scalable endpoint.