#!/usr/bin/env python3 """ kb-audit — детектит drift между живой инфраструктурой и KB-инвентарями. Пишет структурированный отчёт в audit/YYYY-MM-DD-drift.md. Только факты — никаких LLM, галлюцинаций быть не может. Запускать из code-server (LXC 132) или любой машины с sshpass + доступом к Proxmox. """ import re import subprocess import sys from datetime import date from pathlib import Path PROXMOX_HOST = "10.0.0.250" PROXMOX_PASS = "1qaz!QAZ" VAULT = Path(__file__).resolve().parent.parent INVENTORY = VAULT / "projects/dttb/proxmox-inventory.md" OUT_DIR = VAULT / "audit" def sh(cmd: str) -> str: """SSH-выполнение команды на Proxmox.""" full = [ "sshpass", "-p", PROXMOX_PASS, "ssh", "-o", "StrictHostKeyChecking=no", "-o", "ConnectTimeout=10", f"root@{PROXMOX_HOST}", cmd, ] r = subprocess.run(full, capture_output=True, text=True, timeout=30) return r.stdout def parse_live(): """Парсит pct list / qm list. Возвращает {id: {type, status, name}}.""" live = {} for raw_line in sh("pct list").splitlines()[1:]: parts = raw_line.split(maxsplit=3) if len(parts) >= 3: vmid, status = parts[0], parts[1] name = parts[-1] if len(parts) > 2 else "" live[vmid] = {"type": "LXC", "status": status, "name": name} for raw_line in sh("qm list").splitlines()[1:]: # qm list: VMID NAME STATUS MEM BOOTDISK PID parts = raw_line.split() if len(parts) >= 3 and parts[0].isdigit(): vmid, name, status = parts[0], parts[1], parts[2] live[vmid] = {"type": "VM", "status": status, "name": name} return live def parse_inventory(path: Path): """Парсит inventory-файл. Извлекает упомянутые VMID + связанный контекст. Ловит два формата: 1. "LXC 132" / "VM 250" — в заголовках и тексте 2. Table rows: | 132 | debian | ... | — в таблицах """ text = path.read_text() found = {} def add(vmid: str, idx: int): if vmid not in found: start = max(0, idx - 20) end = min(len(text), idx + 80) found[vmid] = text[start:end].replace("\n", " ").strip() for m in re.finditer(r"(?:LXC|VM)\s+(\d{2,4})\b", text): add(m.group(1), m.start()) # table rows: строка начинается с `| NNN |` (игнорируем header-row с тире) for m in re.finditer(r"^\s*\|\s*(\d{2,4})\s*\|", text, re.MULTILINE): add(m.group(1), m.start()) return found def find_deleted_section(path: Path) -> set: """Ищет VMID в секции '## 🗑️ Удалённые' чтобы не флагать их как missing.""" text = path.read_text() # блок между '🗑️ Удалённые' и следующим '##' m = re.search(r"##\s*🗑[^\n]*Удал[^\n]*\n(.*?)(?=\n##|\Z)", text, re.DOTALL) if not m: return set() return set(re.findall(r"\|\s*(\d{2,4})\s*\|", m.group(1))) def compare(live: dict, inventory: dict): live_ids = set(live.keys()) inv_ids = set(inventory.keys()) only_live = sorted(live_ids - inv_ids, key=int) only_inv = sorted(inv_ids - live_ids, key=int) both = sorted(live_ids & inv_ids, key=int) return only_live, only_inv, both def main(): today = date.today().isoformat() OUT_DIR.mkdir(parents=True, exist_ok=True) out = OUT_DIR / f"{today}-drift.md" live = parse_live() inventory = parse_inventory(INVENTORY) deleted = find_deleted_section(INVENTORY) only_live, only_inv_raw, common = compare(live, inventory) # разделяем "в inventory но не в live" на 2 группы: # - known-deleted (есть в секции "🗑️ Удалённые") — это ок # - truly missing (нет и в live, и не в секции deleted) — проблема only_inv = [v for v in only_inv_raw if v not in deleted] known_deleted = [v for v in only_inv_raw if v in deleted] lines = [ "---", f"date: {today}", "type: audit", "source: kb-audit.py", "tags: [audit, drift, infrastructure]", "---", "", f"# KB drift audit — {today}", "", f"Сравнение живого `pct list` / `qm list` с [[../projects/dttb/proxmox-inventory|proxmox-inventory.md]]", "", f"- Живых гостей Proxmox: **{len(live)}**", f"- Упомянуто в inventory: **{len(inventory)}**", f"- В обоих: {len(common)} / только в live: {len(only_live)} / отсутствуют в live: {len(only_inv)}", f"- Известны как удалённые: {len(known_deleted)} (в `## 🗑️ Удалённые`)", "", ] if only_live: lines += ["## ⚠ В Proxmox есть, в inventory НЕТ (надо добавить)", ""] lines += ["| VMID | Type | Status | Name |", "|---|---|---|---|"] for vmid in only_live: x = live[vmid] lines += [f"| {vmid} | {x['type']} | {x['status']} | {x['name']} |"] lines += [""] if only_inv: lines += ["## ❓ В inventory есть, в Proxmox НЕТ (не в секции 🗑️ — проверить вручную)", ""] lines += ["| VMID | Контекст из inventory |", "|---|---|"] for vmid in only_inv: ctx = inventory[vmid][:100] lines += [f"| {vmid} | `...{ctx}...` |"] lines += [""] if known_deleted: lines += [f"## ✓ Удалённые хосты (задокументированы): {', '.join(sorted(known_deleted, key=int))}", ""] if not only_live and not only_inv: lines += ["## ✓ Inventory полностью совпадает с живой инфраструктурой", ""] lines += [ "## Полный живой список", "", "| VMID | Type | Status | Name |", "|---|---|---|---|", ] for vmid in sorted(live.keys(), key=int): x = live[vmid] lines += [f"| {vmid} | {x['type']} | {x['status']} | {x['name']} |"] lines += [ "", "---", f"*Автоматически сгенерировано `scripts/kb-audit.py`. Применять правки — вручную после ревью.*", ] out.write_text("\n".join(lines)) print(f"drift report: {out}") print(f" only_live: {len(only_live)}") print(f" only_inv: {len(only_inv)}") if __name__ == "__main__": main()