- kb_audit_helpers.py — общие функции parse_live/inventory/deleted - kb-audit-apply.py — применяет только structural факт-правки: * new VMID → добавить в "🔴 Остановленные" (только для stopped) * missing VMID → переместить в "🗑️ Удалённые" с датой - Коммитит как kb-audit-bot <kb-audit@dttb.ru> — фильтруемо в git log - Safety: live<5 хостов → abort - Не трогает описания/IP/назначения — только структурные поля из pct list Cron обновлён: audit → apply → propose (остаток для ручного ревью)
70 lines
2.2 KiB
Python
70 lines
2.2 KiB
Python
"""
|
|
Общие функции для kb-audit.py и kb-audit-apply.py.
|
|
"""
|
|
|
|
import re
|
|
import subprocess
|
|
from pathlib import Path
|
|
|
|
PROXMOX_HOST = "10.0.0.250"
|
|
PROXMOX_PASS = "1qaz!QAZ"
|
|
VAULT = Path(__file__).resolve().parent.parent
|
|
INVENTORY = VAULT / "projects/dttb/proxmox-inventory.md"
|
|
|
|
|
|
def sh(cmd: str) -> str:
|
|
"""SSH-выполнение на Proxmox."""
|
|
full = [
|
|
"sshpass", "-p", PROXMOX_PASS,
|
|
"ssh", "-o", "StrictHostKeyChecking=no",
|
|
"-o", "ConnectTimeout=10",
|
|
f"root@{PROXMOX_HOST}",
|
|
cmd,
|
|
]
|
|
r = subprocess.run(full, capture_output=True, text=True, timeout=30)
|
|
return r.stdout
|
|
|
|
|
|
def parse_live():
|
|
"""pct list + qm list → {vmid: {type, status, name}}."""
|
|
live = {}
|
|
for raw_line in sh("pct list").splitlines()[1:]:
|
|
parts = raw_line.split(maxsplit=3)
|
|
if len(parts) >= 3:
|
|
vmid, status = parts[0], parts[1]
|
|
name = parts[-1] if len(parts) > 2 else ""
|
|
live[vmid] = {"type": "LXC", "status": status, "name": name}
|
|
for raw_line in sh("qm list").splitlines()[1:]:
|
|
parts = raw_line.split()
|
|
if len(parts) >= 3 and parts[0].isdigit():
|
|
vmid, name, status = parts[0], parts[1], parts[2]
|
|
live[vmid] = {"type": "VM", "status": status, "name": name}
|
|
return live
|
|
|
|
|
|
def parse_inventory(path: Path):
|
|
"""VMID-упоминания в inventory. Заголовки + table-rows."""
|
|
text = path.read_text()
|
|
found = {}
|
|
|
|
def add(vmid: str, idx: int):
|
|
if vmid not in found:
|
|
start = max(0, idx - 20)
|
|
end = min(len(text), idx + 80)
|
|
found[vmid] = text[start:end].replace("\n", " ").strip()
|
|
|
|
for m in re.finditer(r"(?:LXC|VM)\s+(\d{2,4})\b", text):
|
|
add(m.group(1), m.start())
|
|
for m in re.finditer(r"^\s*\|\s*(\d{2,4})\s*\|", text, re.MULTILINE):
|
|
add(m.group(1), m.start())
|
|
return found
|
|
|
|
|
|
def find_deleted_section(path: Path) -> set:
|
|
"""VMID в секции '## 🗑️ Удалённые'."""
|
|
text = path.read_text()
|
|
m = re.search(r"##\s*🗑[^\n]*Удал[^\n]*\n(.*?)(?=\n##|\Z)", text, re.DOTALL)
|
|
if not m:
|
|
return set()
|
|
return set(re.findall(r"\|\s*(\d{2,4})\s*\|", m.group(1)))
|