#!/usr/bin/env python3 """ kb-audit-npm — сверяет NPM API с npm-proxy-hosts.md. Детектит: новые proxy hosts, удалённые, смена backend IP/порт, SSL-дрейф, enabled-флаг. Пишет в audit/YYYY-MM-DD-npm-drift.md. """ import json import re import ssl import sys import urllib.request from datetime import date from pathlib import Path NPM_URL = "https://npm.dttb.ru" NPM_USER = "it5870@yandex.ru" NPM_PASS = "1qaz!QAZ" VAULT = Path(__file__).resolve().parent.parent NPM_FILE = VAULT / "projects/dttb/npm-proxy-hosts.md" OUT_DIR = VAULT / "audit" _CTX = ssl.create_default_context() _CTX.check_hostname = False _CTX.verify_mode = ssl.CERT_NONE def npm_api(path: str, token: str = None, method: str = "GET", body: dict = None): headers = {"Content-Type": "application/json"} if body else {} if token: headers["Authorization"] = f"Bearer {token}" req = urllib.request.Request( f"{NPM_URL}{path}", data=json.dumps(body).encode() if body else None, headers=headers, method=method, ) return json.loads(urllib.request.urlopen(req, context=_CTX, timeout=15).read()) def fetch_live_hosts(): token = npm_api("/api/tokens", method="POST", body={"identity": NPM_USER, "secret": NPM_PASS})["token"] hosts = npm_api("/api/nginx/proxy-hosts", token=token) result = {} for h in hosts: hid = str(h["id"]) result[hid] = { "domains": ",".join(h.get("domain_names", [])), "backend": f"{h.get('forward_host','?')}:{h.get('forward_port','?')}", "scheme": h.get("forward_scheme", "http"), "ssl": bool(h.get("certificate_id")), "enabled": bool(h.get("enabled")), "websockets": bool(h.get("allow_websocket_upgrade")), } return result def parse_kb_hosts(): """Таблица proxy-hosts. Поля: # | Домен | Backend | SSL | Forced | WSS | HTTP/2 | Назначение.""" text = NPM_FILE.read_text() result = {} for m in re.finditer( r"^\|\s*(\d+)\s*\|\s*`([^`]+)`\s*\|\s*([^\|]+?)\s*\|\s*([^\|]+?)\s*\|", text, re.MULTILINE, ): hid, domain, backend, ssl_col = m.groups() backend = backend.strip() # убираем "(HTTPS)" суффикс backend_clean = re.sub(r"\s*\(HTTPS\)\s*$", "", backend) result[hid] = { "domain": domain.strip(), "backend": backend_clean.strip(), "ssl": "✅" in ssl_col, } return result def compare(live: dict, kb: dict): live_ids = set(live.keys()) kb_ids = set(kb.keys()) added = sorted(live_ids - kb_ids, key=int) removed = sorted(kb_ids - live_ids, key=int) changed = [] for hid in sorted(live_ids & kb_ids, key=int): l, k = live[hid], kb[hid] diffs = [] # backend сравниваем "host:port" if l["backend"].lower() != k["backend"].lower(): diffs.append(f"backend: KB=`{k['backend']}` → live=`{l['backend']}`") # ssl if l["ssl"] != k["ssl"]: diffs.append(f"ssl: KB={'✓' if k['ssl'] else '✗'} → live={'✓' if l['ssl'] else '✗'}") if diffs: changed.append((hid, l["domains"], diffs)) return added, removed, changed def main(): today = date.today().isoformat() OUT_DIR.mkdir(parents=True, exist_ok=True) out = OUT_DIR / f"{today}-npm-drift.md" try: live = fetch_live_hosts() except Exception as e: print(f"NPM API fail: {e}", file=sys.stderr) sys.exit(1) if len(live) < 3: print(f"safety abort: live {len(live)} < 3", file=sys.stderr) sys.exit(2) kb = parse_kb_hosts() added, removed, changed = compare(live, kb) lines = [ "---", f"date: {today}", "type: audit", "source: kb-audit-npm.py", "tags: [audit, drift, npm]", "---", "", f"# NPM drift audit — {today}", "", f"Сверка [[../projects/dttb/npm-proxy-hosts|npm-proxy-hosts.md]] с NPM API ({NPM_URL}).", "", f"- Живых proxy hosts: **{len(live)}**", f"- В KB: **{len(kb)}**", f"- Совпадений: {len(set(live) & set(kb))} / новых: {len(added)} / удалённых из NPM: {len(removed)} / с изменениями: {len(changed)}", "", ] if added: lines += ["## ⚠ Новые hosts (в NPM есть, в KB нет)", "", "| ID | Домены | Backend | SSL | Enabled |", "|---|---|---|---|---|"] for hid in added: h = live[hid] lines.append(f"| {hid} | `{h['domains']}` | `{h['backend']}` | {'✓' if h['ssl'] else '-'} | {'on' if h['enabled'] else 'off'} |") lines.append("") if removed: lines += ["## 🗑 Удалены из NPM (но есть в KB)", "", "| ID | Домен из KB | Backend KB |", "|---|---|---|"] for hid in removed: k = kb[hid] lines.append(f"| {hid} | `{k['domain']}` | `{k['backend']}` |") lines.append("") if changed: lines += ["## 🔄 Изменения (ID совпадает, но что-то сменилось)", ""] for hid, domains, diffs in changed: lines.append(f"### #{hid} `{domains}`") for d in diffs: lines.append(f"- {d}") lines.append("") if not (added or removed or changed): lines += ["## ✓ NPM полностью совпадает с npm-proxy-hosts.md", ""] lines += [ "## Полный живой список", "", "| ID | Домены | Backend | SSL | Enabled |", "|---|---|---|---|---|", ] for hid in sorted(live.keys(), key=int): h = live[hid] lines.append(f"| {hid} | `{h['domains']}` | `{h['backend']}` | {'✓' if h['ssl'] else '-'} | {'on' if h['enabled'] else 'off'} |") lines += ["", "---", "*Автоматически через `scripts/kb-audit-npm.py`.*"] out.write_text("\n".join(lines)) print(f"npm drift: {out}") print(f" added: {len(added)} / removed: {len(removed)} / changed: {len(changed)}") if __name__ == "__main__": main()