Skip to main content

Python Audit and Reporting

Python scripts for structured Docker auditing -- useful for generating reports, tracking trends, and feeding data into dashboards or alerting systems.

Docker SDK Setup

Install the official Docker SDK for Python:

pip install docker

The SDK communicates with the Docker daemon directly, no shell commands needed.

Container Health Audit

Generate a JSON report of all containers with their health status:

#!/usr/bin/env python3
"""Docker container health audit report."""
import docker
import json
from datetime import datetime

client = docker.from_env()

def audit_containers():
"""Audit all containers and return a structured report."""
report = {
"timestamp": datetime.now().isoformat(),
"hostname": client.info()["Name"],
"containers": [],
}

for container in client.containers.list(all=True):
info = container.attrs
state = info["State"]

entry = {
"name": container.name,
"image": container.image.tags[0] if container.image.tags else "untagged",
"status": state["Status"],
"running": state["Running"],
"restart_count": info["RestartCount"],
"oom_killed": state["OOMKilled"],
"started_at": state.get("StartedAt", ""),
"health": "no-healthcheck",
}

# Get health check status if configured
if "Health" in state and state["Health"]:
entry["health"] = state["Health"]["Status"]

# Flag issues
entry["issues"] = []
if entry["health"] == "unhealthy":
entry["issues"].append("unhealthy")
if entry["oom_killed"]:
entry["issues"].append("oom-killed")
if entry["restart_count"] > 5:
entry["issues"].append(f"high-restarts ({entry['restart_count']})")

report["containers"].append(entry)

report["total"] = len(report["containers"])
report["issues"] = sum(1 for c in report["containers"] if c["issues"])
return report


if __name__ == "__main__":
result = audit_containers()
print(json.dumps(result, indent=2))

Output:

{
"timestamp": "2024-01-15T10:30:00",
"hostname": "docker-host",
"total": 5,
"issues": 1,
"containers": [
{
"name": "api",
"image": "my-api:1.2.0",
"status": "running",
"health": "healthy",
"restart_count": 0,
"issues": []
}
]
}

Disk Usage Report

Track Docker disk usage over time:

#!/usr/bin/env python3
"""Docker disk usage report."""
import docker
import json
from datetime import datetime

client = docker.from_env()

def disk_report():
"""Generate a disk usage report."""
df = client.df()

# Calculate image sizes
images = df.get("Images", []) or []
total_image_size = sum(img.get("Size", 0) for img in images)
dangling = sum(1 for img in images if not img.get("RepoTags"))

# Calculate volume sizes
volumes = df.get("Volumes", []) or []
total_volume_size = sum(
vol.get("UsageData", {}).get("Size", 0)
for vol in volumes
if vol.get("UsageData")
)

# Calculate container sizes
containers = df.get("Containers", []) or []
total_container_rw = sum(c.get("SizeRw", 0) for c in containers)

def human_size(size_bytes):
"""Convert bytes to human-readable format."""
for unit in ["B", "KB", "MB", "GB"]:
if size_bytes < 1024:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.1f} TB"

return {
"timestamp": datetime.now().isoformat(),
"images": {
"count": len(images),
"dangling": dangling,
"total_size": human_size(total_image_size),
},
"volumes": {
"count": len(volumes),
"total_size": human_size(total_volume_size),
},
"containers": {
"count": len(containers),
"writable_layer_size": human_size(total_container_rw),
},
}


if __name__ == "__main__":
result = disk_report()
print(json.dumps(result, indent=2))

Image Inventory Report

List all images with size and age for cleanup decisions:

#!/usr/bin/env python3
"""Docker image inventory."""
import docker
from datetime import datetime

client = docker.from_env()

def image_inventory():
"""List all images sorted by size."""
images = []
for img in client.images.list():
tags = img.tags if img.tags else ["<none>:<none>"]
created = datetime.fromisoformat(
img.attrs["Created"].replace("Z", "+00:00")
)
age_days = (datetime.now(created.tzinfo) - created).days
size_mb = img.attrs["Size"] / (1024 * 1024)

for tag in tags:
images.append({
"tag": tag,
"size_mb": round(size_mb, 1),
"age_days": age_days,
"id": img.short_id,
})

# Sort by size descending
images.sort(key=lambda x: x["size_mb"], reverse=True)
return images


if __name__ == "__main__":
inventory = image_inventory()
print(f"{'IMAGE':<40} {'SIZE':>10} {'AGE':>8} {'ID':<15}")
print("-" * 75)
for img in inventory:
print(f"{img['tag']:<40} {img['size_mb']:>8.1f}MB {img['age_days']:>5}d {img['id']}")

Saving Reports to CSV

Append results to a CSV for trend tracking:

import csv
from datetime import datetime

def append_to_csv(filename, data):
"""Append a row of data to a CSV file."""
file_exists = False
try:
with open(filename, "r"):
file_exists = True
except FileNotFoundError:
pass

with open(filename, "a", newline="") as f:
writer = csv.DictWriter(f, fieldnames=data.keys())
if not file_exists:
writer.writeheader()
writer.writerow(data)

# Usage with disk report
report = disk_report()
append_to_csv("docker-disk-trend.csv", {
"date": datetime.now().isoformat(),
"image_count": report["images"]["count"],
"image_size": report["images"]["total_size"],
"volume_count": report["volumes"]["count"],
"volume_size": report["volumes"]["total_size"],
})

Key Takeaways

  • Use the Docker SDK for Python (docker package) instead of parsing subprocess output.
  • Structure reports as JSON for easy integration with monitoring systems.
  • Track disk usage over time in CSV to detect growth trends.
  • Schedule scripts with cron (daily disk reports, hourly health checks).

What's Next