- kb_audit_helpers.py — общие функции parse_live/inventory/deleted - kb-audit-apply.py — применяет только structural факт-правки: * new VMID → добавить в "🔴 Остановленные" (только для stopped) * missing VMID → переместить в "🗑️ Удалённые" с датой - Коммитит как kb-audit-bot <kb-audit@dttb.ru> — фильтруемо в git log - Safety: live<5 хостов → abort - Не трогает описания/IP/назначения — только структурные поля из pct list Cron обновлён: audit → apply → propose (остаток для ручного ревью)
166 lines
6.9 KiB
Python
Executable File
166 lines
6.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
kb-audit-apply — автоматически применяет ТОЛЬКО безопасные правки inventory.
|
||
|
||
Безопасное = структурная правка по фактам, без мнения/описаний:
|
||
1. Новый VMID в live, отсутствует в inventory → добавить строку в
|
||
"🟢 Запущенные" или "🔴 Остановленные" (LXC/VM, по типу)
|
||
2. VMID в inventory (вне секции 🗑️), отсутствует в live → переместить в 🗑️
|
||
Удалённые с датой (явно deletion, не stopped — pct list показывает stopped)
|
||
|
||
НЕ трогает: описания, назначения, IP. Если проблема сложнее — оставляет drift-отчёту.
|
||
|
||
Коммитит как `kb-audit-bot <kb-audit@dttb.ru>` чтобы можно было фильтровать.
|
||
"""
|
||
|
||
import re
|
||
import subprocess
|
||
import sys
|
||
from datetime import date
|
||
from pathlib import Path
|
||
|
||
sys.path.insert(0, str(Path(__file__).resolve().parent))
|
||
# используем те же helpers
|
||
from kb_audit_helpers import parse_live, parse_inventory, find_deleted_section, INVENTORY # type: ignore
|
||
|
||
VAULT = Path(__file__).resolve().parent.parent
|
||
BOT_NAME = "kb-audit-bot"
|
||
BOT_EMAIL = "kb-audit@dttb.ru"
|
||
|
||
|
||
def find_section_insert_point(text: str, section_header: str) -> int | None:
|
||
"""Найти позицию СТРОКИ в конце markdown-таблицы сразу после заголовка секции."""
|
||
m = re.search(re.escape(section_header), text)
|
||
if not m:
|
||
return None
|
||
# после заголовка ищем конец таблицы (пустая строка или следующий заголовок ##)
|
||
after = text[m.end():]
|
||
rows_block = re.search(
|
||
r"\|[^\n]*\|\n\|[-:\s|]+\|\n((?:\|[^\n]*\|\n)+)",
|
||
after,
|
||
re.MULTILINE,
|
||
)
|
||
if not rows_block:
|
||
return None
|
||
return m.end() + rows_block.end()
|
||
|
||
|
||
def insert_row_in_table(text: str, section_header: str, row: str) -> str | None:
|
||
"""Вставить row в конец таблицы указанной секции."""
|
||
pos = find_section_insert_point(text, section_header)
|
||
if pos is None:
|
||
return None
|
||
return text[:pos] + row + text[pos:]
|
||
|
||
|
||
def apply_new_vmid(text: str, vmid: str, info: dict) -> str | None:
|
||
"""Добавить строку в соответствующую таблицу.
|
||
Таблицы inventory (по памяти — см. parse):
|
||
- ## 🟢 Запущенные LXC → подзаголовки ### LXC NNN ...
|
||
- ## 🔴 Остановленные LXC → таблица | VMID | Имя | Назначение |
|
||
- ## 🟢 Запущенные VM — тоже подзаголовки ### VM NNN
|
||
- ## 🔴 Остановленные VM → таблица
|
||
|
||
Мы можем ТОЛЬКО дописывать в таблицы Остановленных (простая структура).
|
||
Running-хосты требуют отдельной секции ### — это семантическая правка, оставляем человеку.
|
||
"""
|
||
if info["status"] != "stopped":
|
||
return None # для running — выдать через drift, пусть человек сам добавит секцию
|
||
|
||
if info["type"] == "LXC":
|
||
header = "## 🔴 Остановленные LXC"
|
||
row = f"| {vmid} | {info['name']} | (новый, уточнить) |\n"
|
||
else:
|
||
header = "## 🔴 Остановленные VM (QEMU)"
|
||
row = f"| {vmid} | {info['name']} | — | — | — | (новый, уточнить) |\n"
|
||
|
||
return insert_row_in_table(text, header, row)
|
||
|
||
|
||
def move_to_deleted(text: str, vmid: str) -> str | None:
|
||
"""Переместить упоминание VMID в секцию '🗑️ Удалённые'."""
|
||
today = date.today().isoformat()
|
||
# удаляем строки table-row с |VMID|
|
||
row_pattern = re.compile(rf"^\s*\|\s*{vmid}\s*\|.*\n", re.MULTILINE)
|
||
found_rows = row_pattern.findall(text)
|
||
if not found_rows:
|
||
# нет явной строки — значит упомянут только в заголовке (### LXC NNN)
|
||
# это сложная правка — не трогаем
|
||
return None
|
||
|
||
new_text = row_pattern.sub("", text)
|
||
# добавляем в секцию удалённых
|
||
deleted_row = f"| {vmid} | (auto-detected missing) | {today} | — |\n"
|
||
result = insert_row_in_table(new_text, "## 🗑️ Удалённые (история)", deleted_row)
|
||
return result
|
||
|
||
|
||
def safe_apply_all():
|
||
text = INVENTORY.read_text()
|
||
original = text
|
||
|
||
live = parse_live()
|
||
inventory = parse_inventory(INVENTORY)
|
||
deleted = find_deleted_section(INVENTORY)
|
||
|
||
live_ids = set(live.keys())
|
||
inv_ids = set(inventory.keys())
|
||
|
||
# safety: live не должно быть пустым (если Proxmox недоступен — abort)
|
||
if len(live_ids) < 5:
|
||
print(f"safety abort: live {len(live_ids)} < 5. Proxmox не отвечает?", file=sys.stderr)
|
||
sys.exit(2)
|
||
|
||
applied = []
|
||
skipped = []
|
||
|
||
# новые LXC/VM: в live, не в inventory
|
||
for vmid in sorted(live_ids - inv_ids, key=int):
|
||
info = live[vmid]
|
||
result = apply_new_vmid(text, vmid, info)
|
||
if result:
|
||
text = result
|
||
applied.append(f"+ VMID {vmid} ({info['type']} {info['name']}, {info['status']})")
|
||
else:
|
||
skipped.append(f" VMID {vmid} ({info['type']} {info['name']}, {info['status']}) — требует ручной правки (running → новая секция)")
|
||
|
||
# пропавшие: в inventory (не в 🗑️), не в live
|
||
for vmid in sorted(inv_ids - live_ids - deleted, key=int):
|
||
result = move_to_deleted(text, vmid)
|
||
if result:
|
||
text = result
|
||
applied.append(f"→🗑️ VMID {vmid} (больше нет в pct/qm list)")
|
||
else:
|
||
skipped.append(f" VMID {vmid} — упомянут только в заголовке, ручная правка")
|
||
|
||
if text == original:
|
||
print("no changes to apply")
|
||
return
|
||
|
||
INVENTORY.write_text(text)
|
||
|
||
summary = "\n".join(applied)
|
||
if skipped:
|
||
summary += "\n\nSkipped (too complex for auto-apply):\n" + "\n".join(skipped)
|
||
|
||
# commit
|
||
subprocess.run(
|
||
["git", "-C", str(VAULT), "add", str(INVENTORY.relative_to(VAULT))],
|
||
check=True,
|
||
)
|
||
subprocess.run(
|
||
[
|
||
"git", "-C", str(VAULT), "-c", f"user.name={BOT_NAME}", "-c", f"user.email={BOT_EMAIL}",
|
||
"commit", "-m", f"kb-audit-bot: auto-apply drift {date.today().isoformat()}\n\n{summary}",
|
||
],
|
||
check=True,
|
||
)
|
||
subprocess.run(["git", "-C", str(VAULT), "push"], check=False)
|
||
|
||
print(f"applied {len(applied)} change(s):")
|
||
print(summary)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
safe_apply_all()
|