Skip to content

PAN-OS API Client Module

The PAN-OS API client module (pan_os_api.py) is responsible for making requests to the Palo Alto Networks XML API and parsing the responses.

PanOSAPIClient Class

class PanOSAPIClient:
    """Client for interacting with the Palo Alto Networks XML API.

    This class provides methods for retrieving data from a Palo Alto Networks
    Next-Generation Firewall (NGFW) or Panorama through its XML API.

    Attributes:
        hostname: The hostname or IP address of the NGFW or Panorama.
        api_key: The API key for authenticating with the NGFW or Panorama.
        client: An httpx AsyncClient for making HTTP requests.
    """

Initialization

def __init__(self, settings: Settings) -> None:
    """Initialize the PanOSAPIClient.

    Args:
        settings: Application settings containing NGFW connection information.
    """
    self.hostname = settings.panos_hostname
    self.api_key = settings.panos_api_key
    self.base_url = f"https://{self.hostname}/api/"
    self.client = httpx.AsyncClient(verify=False)  # In production, use proper cert verification

Async Context Manager Support

The client implements the async context manager protocol, allowing it to be used with the async with statement:

async def __aenter__(self) -> "PanOSAPIClient":
    """Async context manager entry.

    Returns:
        The PanOSAPIClient instance.
    """
    return self

async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: object,
) -> None:
    """Async context manager exit.

    Args:
        exc_type: The exception type, if an exception was raised.
        exc_val: The exception value, if an exception was raised.
        exc_tb: The exception traceback, if an exception was raised.
    """
    await self.client.aclose()

Making Requests

async def _make_request(self, params: dict[str, str]) -> ElementTree.Element:
    """Make a request to the Palo Alto Networks XML API.

    Args:
        params: Dictionary of query parameters to include in the request.

    Returns:
        The XML response as an ElementTree Element.

    Raises:
        httpx.HTTPError: If the HTTP request fails.
        ValueError: If the API returns an error response.
    """
    # Add the API key to the parameters
    params["key"] = self.api_key

    try:
        # Make the request
        response = await self.client.get(self.base_url, params=params)
        response.raise_for_status()

        # Parse the XML response
        root = ElementTree.fromstring(response.text)

        # Check for API errors
        status = root.find(".//status")
        if status is not None and status.text != "success":
            error_msg = root.find(".//msg")
            if error_msg is not None and error_msg.text:
                raise ValueError(f"API error: {error_msg.text}")
            else:
                raise ValueError("Unknown API error")

        return root
    except httpx.HTTPError as e:
        raise httpx.HTTPError(f"HTTP request failed: {str(e)}")

API Methods

get_system_info

async def get_system_info(self) -> dict[str, str]:
    """Get system information from the firewall.

    Returns:
        Dictionary containing system information.
    """
    params = {"type": "op", "cmd": "<show><s><info></info></s></show>"}

    root = await self._make_request(params)
    result = root.find(".//result")

    if result is None:
        raise ValueError("No system information found in response")

    # Extract system information
    system_info = {}
    for child in result:
        if child.text is not None:
            system_info[child.tag] = child.text
        else:
            system_info[child.tag] = ""

    return system_info

This method retrieves system information from the firewall, including hostname, model, serial number, software version, and uptime.

get_address_objects

async def get_address_objects(self) -> list[dict[str, str]]:
    """Get address objects configured on the firewall or Panorama.

    Returns:
        List of dictionaries containing address object information.
    """
    # First, check if this is a Panorama by trying to get device groups
    is_panorama = False
    device_groups = []

    try:
        # Try to get device groups (only works on Panorama)
        dg_params = {
            "type": "config",
            "action": "get",
            "xpath": "/config/devices/entry/device-group",
        }
        dg_root = await self._make_request(dg_params)
        dg_entries = dg_root.findall(".//entry")

        if dg_entries:
            is_panorama = True
            device_groups = [entry.get("name") for entry in dg_entries if entry.get("name")]
            logger.info(f"Detected Panorama with {len(device_groups)} device groups")
    except Exception as e:
        logger.warning(f"Failed to check for device groups, assuming firewall: {str(e)}")

    address_objects = []

    if is_panorama:
        # Process device groups for Panorama
        for dg_name in device_groups:
            if not dg_name:
                continue

            # Get addresses for this device group
            dg_addr_params = {
                "type": "config",
                "action": "get",
                "xpath": f"/config/devices/entry/device-group/entry[@name='{dg_name}']/address",
            }

            dg_addr_root = await self._make_request(dg_addr_params)
            dg_entries = dg_addr_root.findall(".//entry")

            # Process each address object in the device group
            for entry in dg_entries:
                address_obj = {"name": entry.get("name") or "", "location": f"device-group:{dg_name}"}

                # Process the address object types and values
                ip_netmask = entry.find("ip-netmask")
                if ip_netmask is not None and ip_netmask.text is not None:
                    address_obj["type"] = "ip-netmask"
                    address_obj["value"] = ip_netmask.text
                elif (ip_range := entry.find("ip-range")) is not None and ip_range.text is not None:
                    address_obj["type"] = "ip-range"
                    address_obj["value"] = ip_range.text
                elif (fqdn := entry.find("fqdn")) is not None and fqdn.text is not None:
                    address_obj["type"] = "fqdn"
                    address_obj["value"] = fqdn.text
                else:
                    address_obj["type"] = "unknown"
                    address_obj["value"] = ""

                # Get description if available
                description = entry.find("description")
                if description is not None and description.text is not None:
                    address_obj["description"] = description.text

                address_objects.append(address_obj)

    # Always check for vsys address objects (works for both firewall and Panorama)
    vsys_params = {
        "type": "config",
        "action": "get",
        "xpath": "/config/devices/entry/vsys/entry/address",
    }

    vsys_root = await self._make_request(vsys_params)
    vsys_entries = vsys_root.findall(".//entry")

    # Process each address object in vsys
    for entry in vsys_entries:
        address_obj = {"name": entry.get("name") or "", "location": "vsys"}

        # Process the address object types and values
        ip_netmask = entry.find("ip-netmask")
        if ip_netmask is not None and ip_netmask.text is not None:
            address_obj["type"] = "ip-netmask"
            address_obj["value"] = ip_netmask.text
        elif (ip_range := entry.find("ip-range")) is not None and ip_range.text is not None:
            address_obj["type"] = "ip-range"
            address_obj["value"] = ip_range.text
        elif (fqdn := entry.find("fqdn")) is not None and fqdn.text is not None:
            address_obj["type"] = "fqdn"
            address_obj["value"] = fqdn.text
        else:
            address_obj["type"] = "unknown"
            address_obj["value"] = ""

        # Get description if available
        description = entry.find("description")
        if description is not None and description.text is not None:
            address_obj["description"] = description.text

        address_objects.append(address_obj)

    return address_objects

This method retrieves address objects configured on the firewall or Panorama, including their names, types, values, descriptions, and locations (device group or vsys).

get_security_zones

async def get_security_zones(self) -> list[dict[str, str]]:
    """Get security zones configured on the firewall.

    Returns:
        List of dictionaries containing security zone information.
    """
    params = {"type": "config", "action": "get", "xpath": "/config/devices/entry/vsys/entry/zone"}

    root = await self._make_request(params)
    entries = root.findall(".//entry")

    zones = []
    for entry in entries:
        zone = {"name": entry.get("name") or ""}

        # Get zone type
        if entry.find("network/layer3") is not None:
            zone["type"] = "layer3"
            interfaces = entry.findall(".//member")
            zone["interfaces"] = ",".join([interface.text for interface in interfaces if interface.text])
        elif entry.find("network/layer2") is not None:
            zone["type"] = "layer2"
            interfaces = entry.findall(".//member")
            zone["interfaces"] = ",".join([interface.text for interface in interfaces if interface.text])
        elif entry.find("network/virtual-wire") is not None:
            zone["type"] = "virtual-wire"
            interfaces = entry.findall(".//member")
            zone["interfaces"] = ",".join([interface.text for interface in interfaces if interface.text])
        elif entry.find("network/tap") is not None:
            zone["type"] = "tap"
            interfaces = entry.findall(".//member")
            zone["interfaces"] = ",".join([interface.text for interface in interfaces if interface.text])
        elif entry.find("network/external") is not None:
            zone["type"] = "external"
            zone["interfaces"] = ""
        else:
            zone["type"] = "unknown"
            zone["interfaces"] = ""

        zones.append(zone)

    return zones

This method retrieves security zones configured on the firewall, including their names, types, and interfaces.

get_security_policies

async def get_security_policies(self) -> list[dict[str, str | list[str]]]:
    """Get security policies configured on the firewall.

    Returns:
        List of dictionaries containing security policy information.
    """
    params = {"type": "config", "action": "get", "xpath": "/config/devices/entry/vsys/entry/rulebase/security/rules"}

    root = await self._make_request(params)
    entries = root.findall(".//entry")

    policies = []
    for entry in entries:
        policy = {"name": entry.get("name") or ""}

        # Source information
        source_zones = entry.findall(".//from/member")
        policy["source_zones"] = ",".join([zone.text for zone in source_zones if zone.text])

        source_addresses = entry.findall(".//source/member")
        policy["source_addresses"] = ",".join([addr.text for addr in source_addresses if addr.text])

        # Destination information
        dest_zones = entry.findall(".//to/member")
        policy["destination_zones"] = ",".join([zone.text for zone in dest_zones if zone.text])

        dest_addresses = entry.findall(".//destination/member")
        policy["destination_addresses"] = ",".join([addr.text for addr in dest_addresses if addr.text])

        # Application and service information
        applications = entry.findall(".//application/member")
        policy["applications"] = ",".join([app.text for app in applications if app.text])

        services = entry.findall(".//service/member")
        policy["services"] = ",".join([svc.text for svc in services if svc.text])

        # Action
        action = entry.find("action")
        if action is not None and action.text is not None:
            policy["action"] = action.text
        else:
            policy["action"] = ""

        # Description
        description = entry.find("description")
        if description is not None and description.text is not None:
            policy["description"] = description.text
        else:
            policy["description"] = ""

        policies.append(policy)

    return policies

This method retrieves security policies configured on the firewall, including their names, sources, destinations, applications, and actions.

XML Parsing

The module uses Python's standard xml.etree.ElementTree module to parse XML responses from the Palo Alto Networks API. Each method includes specific parsing logic to extract the relevant data from the XML structure and convert it to a more usable Python data structure (dictionaries and lists).