From a379b626aff9b36219a08008ba4c98526f0345e4 Mon Sep 17 00:00:00 2001 From: dttb Date: Sat, 18 Apr 2026 00:42:49 +0300 Subject: [PATCH] =?UTF-8?q?kb-audit:=20=D1=83=D1=80=D0=BE=D0=B2=D0=B5?= =?UTF-8?q?=D0=BD=D1=8C=203=20=E2=80=94=20auto-apply=20safe=20drift=20fixe?= =?UTF-8?q?s=20(karpathy-style)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - kb_audit_helpers.py — общие функции parse_live/inventory/deleted - kb-audit-apply.py — применяет только structural факт-правки: * new VMID → добавить в "🔴 Остановленные" (только для stopped) * missing VMID → переместить в "🗑️ Удалённые" с датой - Коммитит как kb-audit-bot — фильтруемо в git log - Safety: live<5 хостов → abort - Не трогает описания/IP/назначения — только структурные поля из pct list Cron обновлён: audit → apply → propose (остаток для ручного ревью) --- scripts/README.md | 33 ++++++-- scripts/kb-audit-apply.py | 165 ++++++++++++++++++++++++++++++++++++ scripts/kb-audit.py | 80 +---------------- scripts/kb_audit_helpers.py | 69 +++++++++++++++ 4 files changed, 262 insertions(+), 85 deletions(-) create mode 100755 scripts/kb-audit-apply.py create mode 100644 scripts/kb_audit_helpers.py diff --git a/scripts/README.md b/scripts/README.md index 68713b9..1a7708b 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -22,25 +22,40 @@ python3 scripts/kb-audit.py bash scripts/kb-audit-propose.sh ``` +## kb-audit-apply.py (karpathy-style уровень 3) +Автоматически применяет **безопасные** правки в `proxmox-inventory.md`: +- Новый LXC/VM в Proxmox → добавляется строка в таблицу «Остановленные» (только для stopped — running требует ручной секции с деталями) +- VMID отсутствует в pct/qm list → перемещается в секцию «🗑️ Удалённые» с сегодняшней датой + +**Гарды:** +- Live-list < 5 хостов → abort (Proxmox возможно недоступен, не искалечим inventory) +- Коммит с автором `kb-audit-bot ` — легко фильтровать в git log +- Трогает **только** `proxmox-inventory.md`, никогда описания/назначения + +Запуск: +```bash +python3 scripts/kb-audit-apply.py +``` + ## Еженедельный cron (code-server LXC 132) ```cron -# воскресенье 06:00 — drift audit + Opus предложения -0 6 * * 0 /usr/bin/python3 /root/knowledge-base/scripts/kb-audit.py && /bin/bash /root/knowledge-base/scripts/kb-audit-propose.sh +# воскресенье 06:00 — drift audit → safe auto-apply → Opus предложения оставшегося +0 6 * * 0 /usr/bin/python3 /root/knowledge-base/scripts/kb-audit.py && /usr/bin/python3 /root/knowledge-base/scripts/kb-audit-apply.py; /bin/bash /root/knowledge-base/scripts/kb-audit-propose.sh ``` ## Архитектура ``` pct list / qm list (Proxmox) ↓ -kb-audit.py — фактовый diff +kb-audit.py — фактовый diff → audit/YYYY-MM-DD-drift.md ↓ -audit/YYYY-MM-DD-drift.md (коммитится автоматом kb-autosync.sh) +kb-audit-apply.py — автоматически применяет safe-правки (новые VMID, missing → 🗑️) + ↓ коммитит как kb-audit-bot ↓ -kb-audit-propose.sh — Opus предлагает patch +kb-audit-propose.sh — Opus читает drift + свежий inventory, предлагает что осталось + ↓ → audit/YYYY-MM-DD-proposed.md ↓ -audit/YYYY-MM-DD-proposed.md (коммитится) +ты ревьюишь оставшееся (описания, IP, назначения), применяешь руками ↓ -ты ревьюишь, применяешь руками - ↓ -коммит inventory, sync везде +коммит → sync везде (через kb-autosync + File Provider + Nextcloud) ``` diff --git a/scripts/kb-audit-apply.py b/scripts/kb-audit-apply.py new file mode 100755 index 0000000..e731c97 --- /dev/null +++ b/scripts/kb-audit-apply.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +""" +kb-audit-apply — автоматически применяет ТОЛЬКО безопасные правки inventory. + +Безопасное = структурная правка по фактам, без мнения/описаний: + 1. Новый VMID в live, отсутствует в inventory → добавить строку в + "🟢 Запущенные" или "🔴 Остановленные" (LXC/VM, по типу) + 2. VMID в inventory (вне секции 🗑️), отсутствует в live → переместить в 🗑️ + Удалённые с датой (явно deletion, не stopped — pct list показывает stopped) + +НЕ трогает: описания, назначения, IP. Если проблема сложнее — оставляет drift-отчёту. + +Коммитит как `kb-audit-bot ` чтобы можно было фильтровать. +""" + +import re +import subprocess +import sys +from datetime import date +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent)) +# используем те же helpers +from kb_audit_helpers import parse_live, parse_inventory, find_deleted_section, INVENTORY # type: ignore + +VAULT = Path(__file__).resolve().parent.parent +BOT_NAME = "kb-audit-bot" +BOT_EMAIL = "kb-audit@dttb.ru" + + +def find_section_insert_point(text: str, section_header: str) -> int | None: + """Найти позицию СТРОКИ в конце markdown-таблицы сразу после заголовка секции.""" + m = re.search(re.escape(section_header), text) + if not m: + return None + # после заголовка ищем конец таблицы (пустая строка или следующий заголовок ##) + after = text[m.end():] + rows_block = re.search( + r"\|[^\n]*\|\n\|[-:\s|]+\|\n((?:\|[^\n]*\|\n)+)", + after, + re.MULTILINE, + ) + if not rows_block: + return None + return m.end() + rows_block.end() + + +def insert_row_in_table(text: str, section_header: str, row: str) -> str | None: + """Вставить row в конец таблицы указанной секции.""" + pos = find_section_insert_point(text, section_header) + if pos is None: + return None + return text[:pos] + row + text[pos:] + + +def apply_new_vmid(text: str, vmid: str, info: dict) -> str | None: + """Добавить строку в соответствующую таблицу. + Таблицы inventory (по памяти — см. parse): + - ## 🟢 Запущенные LXC → подзаголовки ### LXC NNN ... + - ## 🔴 Остановленные LXC → таблица | VMID | Имя | Назначение | + - ## 🟢 Запущенные VM — тоже подзаголовки ### VM NNN + - ## 🔴 Остановленные VM → таблица + + Мы можем ТОЛЬКО дописывать в таблицы Остановленных (простая структура). + Running-хосты требуют отдельной секции ### — это семантическая правка, оставляем человеку. + """ + if info["status"] != "stopped": + return None # для running — выдать через drift, пусть человек сам добавит секцию + + if info["type"] == "LXC": + header = "## 🔴 Остановленные LXC" + row = f"| {vmid} | {info['name']} | (новый, уточнить) |\n" + else: + header = "## 🔴 Остановленные VM (QEMU)" + row = f"| {vmid} | {info['name']} | — | — | — | (новый, уточнить) |\n" + + return insert_row_in_table(text, header, row) + + +def move_to_deleted(text: str, vmid: str) -> str | None: + """Переместить упоминание VMID в секцию '🗑️ Удалённые'.""" + today = date.today().isoformat() + # удаляем строки table-row с |VMID| + row_pattern = re.compile(rf"^\s*\|\s*{vmid}\s*\|.*\n", re.MULTILINE) + found_rows = row_pattern.findall(text) + if not found_rows: + # нет явной строки — значит упомянут только в заголовке (### LXC NNN) + # это сложная правка — не трогаем + return None + + new_text = row_pattern.sub("", text) + # добавляем в секцию удалённых + deleted_row = f"| {vmid} | (auto-detected missing) | {today} | — |\n" + result = insert_row_in_table(new_text, "## 🗑️ Удалённые (история)", deleted_row) + return result + + +def safe_apply_all(): + text = INVENTORY.read_text() + original = text + + live = parse_live() + inventory = parse_inventory(INVENTORY) + deleted = find_deleted_section(INVENTORY) + + live_ids = set(live.keys()) + inv_ids = set(inventory.keys()) + + # safety: live не должно быть пустым (если Proxmox недоступен — abort) + if len(live_ids) < 5: + print(f"safety abort: live {len(live_ids)} < 5. Proxmox не отвечает?", file=sys.stderr) + sys.exit(2) + + applied = [] + skipped = [] + + # новые LXC/VM: в live, не в inventory + for vmid in sorted(live_ids - inv_ids, key=int): + info = live[vmid] + result = apply_new_vmid(text, vmid, info) + if result: + text = result + applied.append(f"+ VMID {vmid} ({info['type']} {info['name']}, {info['status']})") + else: + skipped.append(f" VMID {vmid} ({info['type']} {info['name']}, {info['status']}) — требует ручной правки (running → новая секция)") + + # пропавшие: в inventory (не в 🗑️), не в live + for vmid in sorted(inv_ids - live_ids - deleted, key=int): + result = move_to_deleted(text, vmid) + if result: + text = result + applied.append(f"→🗑️ VMID {vmid} (больше нет в pct/qm list)") + else: + skipped.append(f" VMID {vmid} — упомянут только в заголовке, ручная правка") + + if text == original: + print("no changes to apply") + return + + INVENTORY.write_text(text) + + summary = "\n".join(applied) + if skipped: + summary += "\n\nSkipped (too complex for auto-apply):\n" + "\n".join(skipped) + + # commit + subprocess.run( + ["git", "-C", str(VAULT), "add", str(INVENTORY.relative_to(VAULT))], + check=True, + ) + subprocess.run( + [ + "git", "-C", str(VAULT), "-c", f"user.name={BOT_NAME}", "-c", f"user.email={BOT_EMAIL}", + "commit", "-m", f"kb-audit-bot: auto-apply drift {date.today().isoformat()}\n\n{summary}", + ], + check=True, + ) + subprocess.run(["git", "-C", str(VAULT), "push"], check=False) + + print(f"applied {len(applied)} change(s):") + print(summary) + + +if __name__ == "__main__": + safe_apply_all() diff --git a/scripts/kb-audit.py b/scripts/kb-audit.py index ddf9e13..f0701e1 100755 --- a/scripts/kb-audit.py +++ b/scripts/kb-audit.py @@ -7,88 +7,19 @@ kb-audit — детектит drift между живой инфраструкт Запускать из code-server (LXC 132) или любой машины с sshpass + доступом к Proxmox. """ -import re -import subprocess import sys from datetime import date from pathlib import Path -PROXMOX_HOST = "10.0.0.250" -PROXMOX_PASS = "1qaz!QAZ" +sys.path.insert(0, str(Path(__file__).resolve().parent)) +from kb_audit_helpers import parse_live, parse_inventory, find_deleted_section, INVENTORY, VAULT # type: ignore -VAULT = Path(__file__).resolve().parent.parent -INVENTORY = VAULT / "projects/dttb/proxmox-inventory.md" OUT_DIR = VAULT / "audit" -def sh(cmd: str) -> str: - """SSH-выполнение команды на Proxmox.""" - full = [ - "sshpass", "-p", PROXMOX_PASS, - "ssh", "-o", "StrictHostKeyChecking=no", - "-o", "ConnectTimeout=10", - f"root@{PROXMOX_HOST}", - cmd, - ] - r = subprocess.run(full, capture_output=True, text=True, timeout=30) - return r.stdout - - -def parse_live(): - """Парсит pct list / qm list. Возвращает {id: {type, status, name}}.""" - live = {} - for raw_line in sh("pct list").splitlines()[1:]: - parts = raw_line.split(maxsplit=3) - if len(parts) >= 3: - vmid, status = parts[0], parts[1] - name = parts[-1] if len(parts) > 2 else "" - live[vmid] = {"type": "LXC", "status": status, "name": name} - for raw_line in sh("qm list").splitlines()[1:]: - # qm list: VMID NAME STATUS MEM BOOTDISK PID - parts = raw_line.split() - if len(parts) >= 3 and parts[0].isdigit(): - vmid, name, status = parts[0], parts[1], parts[2] - live[vmid] = {"type": "VM", "status": status, "name": name} - return live - - -def parse_inventory(path: Path): - """Парсит inventory-файл. Извлекает упомянутые VMID + связанный контекст. - Ловит два формата: - 1. "LXC 132" / "VM 250" — в заголовках и тексте - 2. Table rows: | 132 | debian | ... | — в таблицах - """ - text = path.read_text() - found = {} - - def add(vmid: str, idx: int): - if vmid not in found: - start = max(0, idx - 20) - end = min(len(text), idx + 80) - found[vmid] = text[start:end].replace("\n", " ").strip() - - for m in re.finditer(r"(?:LXC|VM)\s+(\d{2,4})\b", text): - add(m.group(1), m.start()) - # table rows: строка начинается с `| NNN |` (игнорируем header-row с тире) - for m in re.finditer(r"^\s*\|\s*(\d{2,4})\s*\|", text, re.MULTILINE): - add(m.group(1), m.start()) - return found - - -def find_deleted_section(path: Path) -> set: - """Ищет VMID в секции '## 🗑️ Удалённые' чтобы не флагать их как missing.""" - text = path.read_text() - # блок между '🗑️ Удалённые' и следующим '##' - m = re.search(r"##\s*🗑[^\n]*Удал[^\n]*\n(.*?)(?=\n##|\Z)", text, re.DOTALL) - if not m: - return set() - return set(re.findall(r"\|\s*(\d{2,4})\s*\|", m.group(1))) - - def compare(live: dict, inventory: dict): live_ids = set(live.keys()) inv_ids = set(inventory.keys()) - only_live = sorted(live_ids - inv_ids, key=int) only_inv = sorted(inv_ids - live_ids, key=int) both = sorted(live_ids & inv_ids, key=int) @@ -104,9 +35,6 @@ def main(): inventory = parse_inventory(INVENTORY) deleted = find_deleted_section(INVENTORY) only_live, only_inv_raw, common = compare(live, inventory) - # разделяем "в inventory но не в live" на 2 группы: - # - known-deleted (есть в секции "🗑️ Удалённые") — это ок - # - truly missing (нет и в live, и не в секции deleted) — проблема only_inv = [v for v in only_inv_raw if v not in deleted] known_deleted = [v for v in only_inv_raw if v in deleted] @@ -120,7 +48,7 @@ def main(): "", f"# KB drift audit — {today}", "", - f"Сравнение живого `pct list` / `qm list` с [[../projects/dttb/proxmox-inventory|proxmox-inventory.md]]", + "Сравнение живого `pct list` / `qm list` с [[../projects/dttb/proxmox-inventory|proxmox-inventory.md]]", "", f"- Живых гостей Proxmox: **{len(live)}**", f"- Упомянуто в inventory: **{len(inventory)}**", @@ -164,7 +92,7 @@ def main(): lines += [ "", "---", - f"*Автоматически сгенерировано `scripts/kb-audit.py`. Применять правки — вручную после ревью.*", + "*Автоматически сгенерировано `scripts/kb-audit.py`. Применять правки — вручную после ревью.*", ] out.write_text("\n".join(lines)) diff --git a/scripts/kb_audit_helpers.py b/scripts/kb_audit_helpers.py new file mode 100644 index 0000000..96b80e2 --- /dev/null +++ b/scripts/kb_audit_helpers.py @@ -0,0 +1,69 @@ +""" +Общие функции для kb-audit.py и kb-audit-apply.py. +""" + +import re +import subprocess +from pathlib import Path + +PROXMOX_HOST = "10.0.0.250" +PROXMOX_PASS = "1qaz!QAZ" +VAULT = Path(__file__).resolve().parent.parent +INVENTORY = VAULT / "projects/dttb/proxmox-inventory.md" + + +def sh(cmd: str) -> str: + """SSH-выполнение на Proxmox.""" + full = [ + "sshpass", "-p", PROXMOX_PASS, + "ssh", "-o", "StrictHostKeyChecking=no", + "-o", "ConnectTimeout=10", + f"root@{PROXMOX_HOST}", + cmd, + ] + r = subprocess.run(full, capture_output=True, text=True, timeout=30) + return r.stdout + + +def parse_live(): + """pct list + qm list → {vmid: {type, status, name}}.""" + live = {} + for raw_line in sh("pct list").splitlines()[1:]: + parts = raw_line.split(maxsplit=3) + if len(parts) >= 3: + vmid, status = parts[0], parts[1] + name = parts[-1] if len(parts) > 2 else "" + live[vmid] = {"type": "LXC", "status": status, "name": name} + for raw_line in sh("qm list").splitlines()[1:]: + parts = raw_line.split() + if len(parts) >= 3 and parts[0].isdigit(): + vmid, name, status = parts[0], parts[1], parts[2] + live[vmid] = {"type": "VM", "status": status, "name": name} + return live + + +def parse_inventory(path: Path): + """VMID-упоминания в inventory. Заголовки + table-rows.""" + text = path.read_text() + found = {} + + def add(vmid: str, idx: int): + if vmid not in found: + start = max(0, idx - 20) + end = min(len(text), idx + 80) + found[vmid] = text[start:end].replace("\n", " ").strip() + + for m in re.finditer(r"(?:LXC|VM)\s+(\d{2,4})\b", text): + add(m.group(1), m.start()) + for m in re.finditer(r"^\s*\|\s*(\d{2,4})\s*\|", text, re.MULTILINE): + add(m.group(1), m.start()) + return found + + +def find_deleted_section(path: Path) -> set: + """VMID в секции '## 🗑️ Удалённые'.""" + text = path.read_text() + m = re.search(r"##\s*🗑[^\n]*Удал[^\n]*\n(.*?)(?=\n##|\Z)", text, re.DOTALL) + if not m: + return set() + return set(re.findall(r"\|\s*(\d{2,4})\s*\|", m.group(1)))