kb-audit: уровень 3 — auto-apply safe drift fixes (karpathy-style)
- kb_audit_helpers.py — общие функции parse_live/inventory/deleted - kb-audit-apply.py — применяет только structural факт-правки: * new VMID → добавить в "🔴 Остановленные" (только для stopped) * missing VMID → переместить в "🗑️ Удалённые" с датой - Коммитит как kb-audit-bot <kb-audit@dttb.ru> — фильтруемо в git log - Safety: live<5 хостов → abort - Не трогает описания/IP/назначения — только структурные поля из pct list Cron обновлён: audit → apply → propose (остаток для ручного ревью)
This commit is contained in:
@@ -7,88 +7,19 @@ kb-audit — детектит drift между живой инфраструкт
|
||||
Запускать из code-server (LXC 132) или любой машины с sshpass + доступом к Proxmox.
|
||||
"""
|
||||
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from datetime import date
|
||||
from pathlib import Path
|
||||
|
||||
PROXMOX_HOST = "10.0.0.250"
|
||||
PROXMOX_PASS = "1qaz!QAZ"
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||||
from kb_audit_helpers import parse_live, parse_inventory, find_deleted_section, INVENTORY, VAULT # type: ignore
|
||||
|
||||
VAULT = Path(__file__).resolve().parent.parent
|
||||
INVENTORY = VAULT / "projects/dttb/proxmox-inventory.md"
|
||||
OUT_DIR = VAULT / "audit"
|
||||
|
||||
|
||||
def sh(cmd: str) -> str:
|
||||
"""SSH-выполнение команды на Proxmox."""
|
||||
full = [
|
||||
"sshpass", "-p", PROXMOX_PASS,
|
||||
"ssh", "-o", "StrictHostKeyChecking=no",
|
||||
"-o", "ConnectTimeout=10",
|
||||
f"root@{PROXMOX_HOST}",
|
||||
cmd,
|
||||
]
|
||||
r = subprocess.run(full, capture_output=True, text=True, timeout=30)
|
||||
return r.stdout
|
||||
|
||||
|
||||
def parse_live():
|
||||
"""Парсит pct list / qm list. Возвращает {id: {type, status, name}}."""
|
||||
live = {}
|
||||
for raw_line in sh("pct list").splitlines()[1:]:
|
||||
parts = raw_line.split(maxsplit=3)
|
||||
if len(parts) >= 3:
|
||||
vmid, status = parts[0], parts[1]
|
||||
name = parts[-1] if len(parts) > 2 else ""
|
||||
live[vmid] = {"type": "LXC", "status": status, "name": name}
|
||||
for raw_line in sh("qm list").splitlines()[1:]:
|
||||
# qm list: VMID NAME STATUS MEM BOOTDISK PID
|
||||
parts = raw_line.split()
|
||||
if len(parts) >= 3 and parts[0].isdigit():
|
||||
vmid, name, status = parts[0], parts[1], parts[2]
|
||||
live[vmid] = {"type": "VM", "status": status, "name": name}
|
||||
return live
|
||||
|
||||
|
||||
def parse_inventory(path: Path):
|
||||
"""Парсит inventory-файл. Извлекает упомянутые VMID + связанный контекст.
|
||||
Ловит два формата:
|
||||
1. "LXC 132" / "VM 250" — в заголовках и тексте
|
||||
2. Table rows: | 132 | debian | ... | — в таблицах
|
||||
"""
|
||||
text = path.read_text()
|
||||
found = {}
|
||||
|
||||
def add(vmid: str, idx: int):
|
||||
if vmid not in found:
|
||||
start = max(0, idx - 20)
|
||||
end = min(len(text), idx + 80)
|
||||
found[vmid] = text[start:end].replace("\n", " ").strip()
|
||||
|
||||
for m in re.finditer(r"(?:LXC|VM)\s+(\d{2,4})\b", text):
|
||||
add(m.group(1), m.start())
|
||||
# table rows: строка начинается с `| NNN |` (игнорируем header-row с тире)
|
||||
for m in re.finditer(r"^\s*\|\s*(\d{2,4})\s*\|", text, re.MULTILINE):
|
||||
add(m.group(1), m.start())
|
||||
return found
|
||||
|
||||
|
||||
def find_deleted_section(path: Path) -> set:
|
||||
"""Ищет VMID в секции '## 🗑️ Удалённые' чтобы не флагать их как missing."""
|
||||
text = path.read_text()
|
||||
# блок между '🗑️ Удалённые' и следующим '##'
|
||||
m = re.search(r"##\s*🗑[^\n]*Удал[^\n]*\n(.*?)(?=\n##|\Z)", text, re.DOTALL)
|
||||
if not m:
|
||||
return set()
|
||||
return set(re.findall(r"\|\s*(\d{2,4})\s*\|", m.group(1)))
|
||||
|
||||
|
||||
def compare(live: dict, inventory: dict):
|
||||
live_ids = set(live.keys())
|
||||
inv_ids = set(inventory.keys())
|
||||
|
||||
only_live = sorted(live_ids - inv_ids, key=int)
|
||||
only_inv = sorted(inv_ids - live_ids, key=int)
|
||||
both = sorted(live_ids & inv_ids, key=int)
|
||||
@@ -104,9 +35,6 @@ def main():
|
||||
inventory = parse_inventory(INVENTORY)
|
||||
deleted = find_deleted_section(INVENTORY)
|
||||
only_live, only_inv_raw, common = compare(live, inventory)
|
||||
# разделяем "в inventory но не в live" на 2 группы:
|
||||
# - known-deleted (есть в секции "🗑️ Удалённые") — это ок
|
||||
# - truly missing (нет и в live, и не в секции deleted) — проблема
|
||||
only_inv = [v for v in only_inv_raw if v not in deleted]
|
||||
known_deleted = [v for v in only_inv_raw if v in deleted]
|
||||
|
||||
@@ -120,7 +48,7 @@ def main():
|
||||
"",
|
||||
f"# KB drift audit — {today}",
|
||||
"",
|
||||
f"Сравнение живого `pct list` / `qm list` с [[../projects/dttb/proxmox-inventory|proxmox-inventory.md]]",
|
||||
"Сравнение живого `pct list` / `qm list` с [[../projects/dttb/proxmox-inventory|proxmox-inventory.md]]",
|
||||
"",
|
||||
f"- Живых гостей Proxmox: **{len(live)}**",
|
||||
f"- Упомянуто в inventory: **{len(inventory)}**",
|
||||
@@ -164,7 +92,7 @@ def main():
|
||||
lines += [
|
||||
"",
|
||||
"---",
|
||||
f"*Автоматически сгенерировано `scripts/kb-audit.py`. Применять правки — вручную после ревью.*",
|
||||
"*Автоматически сгенерировано `scripts/kb-audit.py`. Применять правки — вручную после ревью.*",
|
||||
]
|
||||
|
||||
out.write_text("\n".join(lines))
|
||||
|
||||
Reference in New Issue
Block a user