Skip to main content
You can programmatically execute and manage batch jobs using the Blaxel API and Python SDK. Both synchronous and asynchronous usage is supported: use method_name for synchronous operations and amethod_name() for asynchronous operations.
The code examples in this section assume that your batch job has already been successfully deployed on Blaxel.

Dispatch job execution

Dispatch a job and wait for execution to start:
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import CreateJobExecutionRequest

job = bl_job("my-job")
request = CreateJobExecutionRequest(tasks=[{"duration": 60}])
execution_id = job.create_execution(request)
result = job.wait_for_execution(execution_id, max_wait=180)
Time units in the Python SDK are specified in seconds.

Dispatch job execution with tasks

Dispatch a new job with tasks:
# synchronous
request = CreateJobExecutionRequest(tasks=[{"duration": 30}, {"duration": 60}])
execution_id = job.create_execution(request)

# asynchronous
execution_id = await job.acreate_execution(request)

Get execution details

Get full execution details:
# synchronous
execution = job.get_execution(execution_id)

# asynchronous
execution = await job.aget_execution(execution_id)

Get status

The possible job states are:
pending → running → completed
                 ↘ failed
Get only the job status (faster than retrieving full execution details):
# synchronous
status = job.get_execution_status(execution_id)

# asynchronous
status = await job.aget_execution_status(execution_id)

List executions

List all executions for a job:
# synchronous
executions = job.list_executions(limit=50)

# asynchronous
executions = await job.alist_executions(limit=50)

Wait for completion

Poll until execution completes:
# synchronous
result = job.wait_for_execution(
    execution_id,
    max_wait=300,  # 5 minutes (seconds)
    interval=2     # Poll every 2 seconds (seconds)
)

# asynchronous
result = await job.await_for_execution(execution_id, max_wait=300, interval=2)
When polling:
  • adjust the interval based on the expected task duration
  • set the max_wait timeout longer than the expected total task duration

Override environment and memory for a single job execution

Override memory and/or environment variables for a single job execution, without modifying the deployed job configuration:
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import (
    CreateJobExecutionRequest,
)
from blaxel.core.client.models.create_job_execution_request_env import (
    CreateJobExecutionRequestEnv,
)

job = bl_job("my-job")

env = CreateJobExecutionRequestEnv()
env["CUSTOM_ENV"] = "OVERRIDE"
env["LOG_LEVEL"] = "debug"

request = CreateJobExecutionRequest(
    tasks=[{"name": "Combined"}],
    memory=1024,  # MB (must be <= deployed job memory)
    env=env,
)

# synchronous
execution_id = job.create_execution(request)
execution = job.get_execution(execution_id)

# asynchronous
execution_id = await job.acreate_execution(request)
execution = await job.aget_execution(execution_id)

print(execution.status)
  • Memory overrides are expressed in MB and are downward-only: the value must be <= the memory configured at deployment time. For example, if the job is deployed with 4096 MB, execution overrides must be <= 4096 MB.
  • env supports multiple environment variables
  • memory and env can be set independently or together
  • Overrides apply only to the current execution

Complete example

Here is a complete example using synchronous operations:
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import CreateJobExecutionRequest

job = bl_job("my-job")

# create execution
request = CreateJobExecutionRequest(tasks=[{"name": "John"}, {"name": "Jane"}])
execution_id = job.create_execution(request)

# monitor
status = job.get_execution_status(execution_id)
print(f"Status: {status}")

# wait
try:
    result = job.wait_for_execution(execution_id, max_wait=180, interval=5)
    print(f"Completed: {result.status}")
except Exception as error:
    print(f"Timeout: {error}")
Here is a complete example using asynchronous operations:
import asyncio
from blaxel.core.jobs import bl_job
from blaxel.core.client.models.create_job_execution_request import CreateJobExecutionRequest

async def main():
    job = bl_job("my-job")

    request = CreateJobExecutionRequest(tasks=[{"name": "John"}, {"name": "Jane"}])
    execution_id = await job.acreate_execution(request)

    status = await job.aget_execution_status(execution_id)
    print(f"Status: {status}")

    result = await job.await_for_execution(execution_id, max_wait=180, interval=5)
    print(f"Completed: {result.status}")

asyncio.run(main())

Deploy a job

Learn how to deploy your AI batch jobs on Blaxel as a serverless auto-scalable endpoint.