#!/usr/bin/env python3 """ kb-audit — детектит drift между живой инфраструктурой и KB-инвентарями. Пишет структурированный отчёт в audit/YYYY-MM-DD-drift.md. Только факты — никаких LLM, галлюцинаций быть не может. Запускать из code-server (LXC 132) или любой машины с sshpass + доступом к Proxmox. """ import re import subprocess import sys from datetime import date from pathlib import Path PROXMOX_HOST = "10.0.0.250" PROXMOX_PASS = "1qaz!QAZ" VAULT = Path(__file__).resolve().parent.parent INVENTORY = VAULT / "projects/dttb/proxmox-inventory.md" OUT_DIR = VAULT / "audit" def sh(cmd: str) -> str: """SSH-выполнение команды на Proxmox.""" full = [ "sshpass", "-p", PROXMOX_PASS, "ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", f"root@{PROXMOX_HOST}", cmd, ] r = subprocess.run(full, capture_output=True, text=True, timeout=30) return r.stdout def parse_live(): """Парсит pct list / qm list. Возвращает {id: {type, status, name}}.""" live = {} for raw_line in sh("pct list").splitlines()[1:]: parts = raw_line.split(maxsplit=3) if len(parts) >= 3: vmid, status = parts[0], parts[1] name = parts[-1] if len(parts) > 2 else "" live[vmid] = {"type": "LXC", "status": status, "name": name} for raw_line in sh("qm list").splitlines()[1:]: # qm list: VMID NAME STATUS MEM BOOTDISK PID parts = raw_line.split() if len(parts) >= 3 and parts[0].isdigit(): vmid, name, status = parts[0], parts[1], parts[2] live[vmid] = {"type": "VM", "status": status, "name": name} return live def parse_inventory(path: Path): """Парсит inventory-файл. Извлекает упомянутые VMID + связанный контекст.""" text = path.read_text() found = {} # Ищем все упоминания "LXC NNN" или "VM NNN" в заголовках и таблицах for m in re.finditer(r"(?:LXC|VM)\s+(\d{2,4})\b", text): vmid = m.group(1) if vmid not in found: # контекст: 80 символов вокруг start = max(0, m.start() - 20) end = min(len(text), m.end() + 60) found[vmid] = text[start:end].replace("\n", " ").strip() return found def compare(live: dict, inventory: dict): live_ids = set(live.keys()) inv_ids = set(inventory.keys()) only_live = sorted(live_ids - inv_ids, key=int) only_inv = sorted(inv_ids - live_ids, key=int) both = sorted(live_ids & inv_ids, key=int) return only_live, only_inv, both def main(): today = date.today().isoformat() OUT_DIR.mkdir(parents=True, exist_ok=True) out = OUT_DIR / f"{today}-drift.md" live = parse_live() inventory = parse_inventory(INVENTORY) only_live, only_inv, common = compare(live, inventory) lines = [ "---", f"date: {today}", "type: audit", "source: kb-audit.py", "tags: [audit, drift, infrastructure]", "---", "", f"# KB drift audit — {today}", "", f"Сравнение живого `pct list` / `qm list` с [[../projects/dttb/proxmox-inventory|proxmox-inventory.md]]", "", f"- Живых гостей Proxmox: **{len(live)}**", f"- Упомянуто в inventory: **{len(inventory)}**", f"- В обоих: {len(common)} / только в live: {len(only_live)} / только в inventory: {len(only_inv)}", "", ] if only_live: lines += ["## ⚠ В Proxmox есть, в inventory НЕТ (надо добавить)", ""] lines += ["| VMID | Type | Status | Name |", "|---|---|---|---|"] for vmid in only_live: x = live[vmid] lines += [f"| {vmid} | {x['type']} | {x['status']} | {x['name']} |"] lines += [""] if only_inv: lines += ["## 🗑 В inventory есть, в Proxmox НЕТ (удалён? переименован?)", ""] lines += ["| VMID | Контекст из inventory |", "|---|---|"] for vmid in only_inv: ctx = inventory[vmid][:100] lines += [f"| {vmid} | `...{ctx}...` |"] lines += [""] if not only_live and not only_inv: lines += ["## ✓ Inventory полностью совпадает с живой инфраструктурой", ""] lines += [ "## Полный живой список", "", "| VMID | Type | Status | Name |", "|---|---|---|---|", ] for vmid in sorted(live.keys(), key=int): x = live[vmid] lines += [f"| {vmid} | {x['type']} | {x['status']} | {x['name']} |"] lines += [ "", "---", f"*Автоматически сгенерировано `scripts/kb-audit.py`. Применять правки — вручную после ревью.*", ] out.write_text("\n".join(lines)) print(f"drift report: {out}") print(f" only_live: {len(only_live)}") print(f" only_inv: {len(only_inv)}") if __name__ == "__main__": main()