Skip to content

Operations Management

The Strata Cloud Manager SDK provides capabilities for managing operations beyond just configuration objects, including candidate configurations, commits, and job monitoring.

Candidate Configurations

In Palo Alto Networks environments, configuration changes are made to a candidate configuration before being committed to the running configuration. The SDK provides methods for managing these candidate configurations.

Pushing Candidate Configurations

After making changes to your configuration, you need to push those changes to make them active:

from scm.models.operations.candidate_push import CandidatePushRequestModel

# Create some objects
client.address.create({"name": "server1", "folder": "Shared", "ip_netmask": "192.168.1.100/32"})
client.address.create({"name": "server2", "folder": "Shared", "ip_netmask": "192.168.1.101/32"})

# Commit the changes
response = client.candidate_push({
    "description": "Adding server addresses",
    "admin_name": "admin"
})

# Get the job ID from the response
job_id = response["id"]

Getting Commit Status

After initiating a commit, you can check its status:

# Check the status of the commit job
job_status = client.jobs.status(job_id)
print(f"Job status: {job_status['status']}")

# Wait for the job to complete
import time
while job_status["status"] not in ["COMPLETED", "FAILED"]:
    time.sleep(5)
    job_status = client.jobs.status(job_id)
    print(f"Job status: {job_status['status']}")

if job_status["status"] == "COMPLETED":
    print("Commit completed successfully")
else:
    print(f"Commit failed: {job_status.get('message', 'Unknown error')}")

Job Monitoring

Many operations in the Strata Cloud Manager are asynchronous and return a job ID. The SDK provides methods for monitoring these jobs.

Checking Job Status

# Check the status of a job
job_status = client.jobs.status(job_id)
print(f"Job status: {job_status['status']}")

Waiting for Jobs to Complete

Here's a helper function to wait for a job to complete:

def wait_for_job_completion(client, job_id, interval=5, timeout=600):
    import time
    start_time = time.time()

    while time.time() - start_time < timeout:
        job_status = client.jobs.status(job_id)
        if job_status["status"] in ["COMPLETED", "FAILED"]:
            return job_status

        print(f"Job status: {job_status['status']}")
        time.sleep(interval)

    raise TimeoutError(f"Job did not complete within {timeout} seconds")

# Usage
job_result = wait_for_job_completion(client, job_id)
if job_result["status"] == "COMPLETED":
    print("Job completed successfully")
else:
    print(f"Job failed: {job_result.get('message', 'Unknown error')}")

Error Handling

Operations can sometimes fail due to various reasons. The SDK provides structured error handling:

from scm.exceptions import ScmError, ApiError, BadResponseError

try:
    # Attempt to push a candidate configuration
    response = client.candidate_push({
        "description": "Adding server addresses",
        "admin_name": "admin"
    })
    job_id = response["id"]

    # Wait for job completion
    job_result = wait_for_job_completion(client, job_id)
    if job_result["status"] != "COMPLETED":
        raise Exception(f"Job failed: {job_result.get('message', 'Unknown error')}")

except ApiError as e:
    print(f"API Error: {e}")
    # Handle API errors (e.g., invalid parameters, insufficient permissions)

except BadResponseError as e:
    print(f"Response Error: {e}")
    # Handle unexpected response format

except ScmError as e:
    print(f"SDK Error: {e}")
    # Handle general SDK errors

except Exception as e:
    print(f"General Error: {e}")
    # Handle other errors

Next Steps

  • Check out Advanced Topics for pagination, filtering, and error handling
  • Explore the API Reference for detailed information on all available methods