- kb-audit-npm.py: NPM API → сверка с npm-proxy-hosts.md детектит новые/удалённые hosts + смену backend/SSL - kb-audit-creds.py: HEAD/GET-ping всех URL из credentials.md с fallback на GET при 501/405, skip embedded-creds URLs - kb-audit-dns.py: dig @8.8.8.8 и @10.0.0.1 для всех доменов NPM детектит NXDOMAIN + split-horizon Первый прогон нашёл: - NPM: 2 новых host (router/vpn.dttb.ru), 2 изменения (bitrix24 backend, git SSL) - Creds: все 12 URL reachable ✓ - DNS: itilegent.ru не резолвится (публичные записи протухли)
182 lines
6.2 KiB
Python
Executable File
182 lines
6.2 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
kb-audit-npm — сверяет NPM API с npm-proxy-hosts.md.
|
||
Детектит: новые proxy hosts, удалённые, смена backend IP/порт, SSL-дрейф, enabled-флаг.
|
||
Пишет в audit/YYYY-MM-DD-npm-drift.md.
|
||
"""
|
||
|
||
import json
|
||
import re
|
||
import ssl
|
||
import sys
|
||
import urllib.request
|
||
from datetime import date
|
||
from pathlib import Path
|
||
|
||
NPM_URL = "https://npm.dttb.ru"
|
||
NPM_USER = "it5870@yandex.ru"
|
||
NPM_PASS = "1qaz!QAZ"
|
||
|
||
VAULT = Path(__file__).resolve().parent.parent
|
||
NPM_FILE = VAULT / "projects/dttb/npm-proxy-hosts.md"
|
||
OUT_DIR = VAULT / "audit"
|
||
|
||
_CTX = ssl.create_default_context()
|
||
_CTX.check_hostname = False
|
||
_CTX.verify_mode = ssl.CERT_NONE
|
||
|
||
|
||
def npm_api(path: str, token: str = None, method: str = "GET", body: dict = None):
|
||
headers = {"Content-Type": "application/json"} if body else {}
|
||
if token:
|
||
headers["Authorization"] = f"Bearer {token}"
|
||
req = urllib.request.Request(
|
||
f"{NPM_URL}{path}",
|
||
data=json.dumps(body).encode() if body else None,
|
||
headers=headers,
|
||
method=method,
|
||
)
|
||
return json.loads(urllib.request.urlopen(req, context=_CTX, timeout=15).read())
|
||
|
||
|
||
def fetch_live_hosts():
|
||
token = npm_api("/api/tokens", method="POST", body={"identity": NPM_USER, "secret": NPM_PASS})["token"]
|
||
hosts = npm_api("/api/nginx/proxy-hosts", token=token)
|
||
result = {}
|
||
for h in hosts:
|
||
hid = str(h["id"])
|
||
result[hid] = {
|
||
"domains": ",".join(h.get("domain_names", [])),
|
||
"backend": f"{h.get('forward_host','?')}:{h.get('forward_port','?')}",
|
||
"scheme": h.get("forward_scheme", "http"),
|
||
"ssl": bool(h.get("certificate_id")),
|
||
"enabled": bool(h.get("enabled")),
|
||
"websockets": bool(h.get("allow_websocket_upgrade")),
|
||
}
|
||
return result
|
||
|
||
|
||
def parse_kb_hosts():
|
||
"""Таблица proxy-hosts. Поля: # | Домен | Backend | SSL | Forced | WSS | HTTP/2 | Назначение."""
|
||
text = NPM_FILE.read_text()
|
||
result = {}
|
||
for m in re.finditer(
|
||
r"^\|\s*(\d+)\s*\|\s*`([^`]+)`\s*\|\s*([^\|]+?)\s*\|\s*([^\|]+?)\s*\|",
|
||
text,
|
||
re.MULTILINE,
|
||
):
|
||
hid, domain, backend, ssl_col = m.groups()
|
||
backend = backend.strip()
|
||
# убираем "(HTTPS)" суффикс
|
||
backend_clean = re.sub(r"\s*\(HTTPS\)\s*$", "", backend)
|
||
result[hid] = {
|
||
"domain": domain.strip(),
|
||
"backend": backend_clean.strip(),
|
||
"ssl": "✅" in ssl_col,
|
||
}
|
||
return result
|
||
|
||
|
||
def compare(live: dict, kb: dict):
|
||
live_ids = set(live.keys())
|
||
kb_ids = set(kb.keys())
|
||
added = sorted(live_ids - kb_ids, key=int)
|
||
removed = sorted(kb_ids - live_ids, key=int)
|
||
changed = []
|
||
for hid in sorted(live_ids & kb_ids, key=int):
|
||
l, k = live[hid], kb[hid]
|
||
diffs = []
|
||
# backend сравниваем "host:port"
|
||
if l["backend"].lower() != k["backend"].lower():
|
||
diffs.append(f"backend: KB=`{k['backend']}` → live=`{l['backend']}`")
|
||
# ssl
|
||
if l["ssl"] != k["ssl"]:
|
||
diffs.append(f"ssl: KB={'✓' if k['ssl'] else '✗'} → live={'✓' if l['ssl'] else '✗'}")
|
||
if diffs:
|
||
changed.append((hid, l["domains"], diffs))
|
||
return added, removed, changed
|
||
|
||
|
||
def main():
|
||
today = date.today().isoformat()
|
||
OUT_DIR.mkdir(parents=True, exist_ok=True)
|
||
out = OUT_DIR / f"{today}-npm-drift.md"
|
||
|
||
try:
|
||
live = fetch_live_hosts()
|
||
except Exception as e:
|
||
print(f"NPM API fail: {e}", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
if len(live) < 3:
|
||
print(f"safety abort: live {len(live)} < 3", file=sys.stderr)
|
||
sys.exit(2)
|
||
|
||
kb = parse_kb_hosts()
|
||
added, removed, changed = compare(live, kb)
|
||
|
||
lines = [
|
||
"---",
|
||
f"date: {today}",
|
||
"type: audit",
|
||
"source: kb-audit-npm.py",
|
||
"tags: [audit, drift, npm]",
|
||
"---",
|
||
"",
|
||
f"# NPM drift audit — {today}",
|
||
"",
|
||
f"Сверка [[../projects/dttb/npm-proxy-hosts|npm-proxy-hosts.md]] с NPM API ({NPM_URL}).",
|
||
"",
|
||
f"- Живых proxy hosts: **{len(live)}**",
|
||
f"- В KB: **{len(kb)}**",
|
||
f"- Совпадений: {len(set(live) & set(kb))} / новых: {len(added)} / удалённых из NPM: {len(removed)} / с изменениями: {len(changed)}",
|
||
"",
|
||
]
|
||
|
||
if added:
|
||
lines += ["## ⚠ Новые hosts (в NPM есть, в KB нет)", "",
|
||
"| ID | Домены | Backend | SSL | Enabled |",
|
||
"|---|---|---|---|---|"]
|
||
for hid in added:
|
||
h = live[hid]
|
||
lines.append(f"| {hid} | `{h['domains']}` | `{h['backend']}` | {'✓' if h['ssl'] else '-'} | {'on' if h['enabled'] else 'off'} |")
|
||
lines.append("")
|
||
|
||
if removed:
|
||
lines += ["## 🗑 Удалены из NPM (но есть в KB)", "",
|
||
"| ID | Домен из KB | Backend KB |", "|---|---|---|"]
|
||
for hid in removed:
|
||
k = kb[hid]
|
||
lines.append(f"| {hid} | `{k['domain']}` | `{k['backend']}` |")
|
||
lines.append("")
|
||
|
||
if changed:
|
||
lines += ["## 🔄 Изменения (ID совпадает, но что-то сменилось)", ""]
|
||
for hid, domains, diffs in changed:
|
||
lines.append(f"### #{hid} `{domains}`")
|
||
for d in diffs:
|
||
lines.append(f"- {d}")
|
||
lines.append("")
|
||
|
||
if not (added or removed or changed):
|
||
lines += ["## ✓ NPM полностью совпадает с npm-proxy-hosts.md", ""]
|
||
|
||
lines += [
|
||
"## Полный живой список",
|
||
"",
|
||
"| ID | Домены | Backend | SSL | Enabled |",
|
||
"|---|---|---|---|---|",
|
||
]
|
||
for hid in sorted(live.keys(), key=int):
|
||
h = live[hid]
|
||
lines.append(f"| {hid} | `{h['domains']}` | `{h['backend']}` | {'✓' if h['ssl'] else '-'} | {'on' if h['enabled'] else 'off'} |")
|
||
lines += ["", "---", "*Автоматически через `scripts/kb-audit-npm.py`.*"]
|
||
|
||
out.write_text("\n".join(lines))
|
||
print(f"npm drift: {out}")
|
||
print(f" added: {len(added)} / removed: {len(removed)} / changed: {len(changed)}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|