Files
knowledge-base/scripts/kb-audit-npm.py
dttb 1f7d265f16 audit: +kb-audit-npm/creds/dns — расширение karpathy-style
- kb-audit-npm.py: NPM API → сверка с npm-proxy-hosts.md
  детектит новые/удалённые hosts + смену backend/SSL
- kb-audit-creds.py: HEAD/GET-ping всех URL из credentials.md
  с fallback на GET при 501/405, skip embedded-creds URLs
- kb-audit-dns.py: dig @8.8.8.8 и @10.0.0.1 для всех доменов NPM
  детектит NXDOMAIN + split-horizon

Первый прогон нашёл:
- NPM: 2 новых host (router/vpn.dttb.ru), 2 изменения (bitrix24 backend, git SSL)
- Creds: все 12 URL reachable ✓
- DNS: itilegent.ru не резолвится (публичные записи протухли)
2026-04-18 12:35:21 +03:00

182 lines
6.2 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
kb-audit-npm — сверяет NPM API с npm-proxy-hosts.md.
Детектит: новые proxy hosts, удалённые, смена backend IP/порт, SSL-дрейф, enabled-флаг.
Пишет в audit/YYYY-MM-DD-npm-drift.md.
"""
import json
import re
import ssl
import sys
import urllib.request
from datetime import date
from pathlib import Path
NPM_URL = "https://npm.dttb.ru"
NPM_USER = "it5870@yandex.ru"
NPM_PASS = "1qaz!QAZ"
VAULT = Path(__file__).resolve().parent.parent
NPM_FILE = VAULT / "projects/dttb/npm-proxy-hosts.md"
OUT_DIR = VAULT / "audit"
_CTX = ssl.create_default_context()
_CTX.check_hostname = False
_CTX.verify_mode = ssl.CERT_NONE
def npm_api(path: str, token: str = None, method: str = "GET", body: dict = None):
headers = {"Content-Type": "application/json"} if body else {}
if token:
headers["Authorization"] = f"Bearer {token}"
req = urllib.request.Request(
f"{NPM_URL}{path}",
data=json.dumps(body).encode() if body else None,
headers=headers,
method=method,
)
return json.loads(urllib.request.urlopen(req, context=_CTX, timeout=15).read())
def fetch_live_hosts():
token = npm_api("/api/tokens", method="POST", body={"identity": NPM_USER, "secret": NPM_PASS})["token"]
hosts = npm_api("/api/nginx/proxy-hosts", token=token)
result = {}
for h in hosts:
hid = str(h["id"])
result[hid] = {
"domains": ",".join(h.get("domain_names", [])),
"backend": f"{h.get('forward_host','?')}:{h.get('forward_port','?')}",
"scheme": h.get("forward_scheme", "http"),
"ssl": bool(h.get("certificate_id")),
"enabled": bool(h.get("enabled")),
"websockets": bool(h.get("allow_websocket_upgrade")),
}
return result
def parse_kb_hosts():
"""Таблица proxy-hosts. Поля: # | Домен | Backend | SSL | Forced | WSS | HTTP/2 | Назначение."""
text = NPM_FILE.read_text()
result = {}
for m in re.finditer(
r"^\|\s*(\d+)\s*\|\s*`([^`]+)`\s*\|\s*([^\|]+?)\s*\|\s*([^\|]+?)\s*\|",
text,
re.MULTILINE,
):
hid, domain, backend, ssl_col = m.groups()
backend = backend.strip()
# убираем "(HTTPS)" суффикс
backend_clean = re.sub(r"\s*\(HTTPS\)\s*$", "", backend)
result[hid] = {
"domain": domain.strip(),
"backend": backend_clean.strip(),
"ssl": "" in ssl_col,
}
return result
def compare(live: dict, kb: dict):
live_ids = set(live.keys())
kb_ids = set(kb.keys())
added = sorted(live_ids - kb_ids, key=int)
removed = sorted(kb_ids - live_ids, key=int)
changed = []
for hid in sorted(live_ids & kb_ids, key=int):
l, k = live[hid], kb[hid]
diffs = []
# backend сравниваем "host:port"
if l["backend"].lower() != k["backend"].lower():
diffs.append(f"backend: KB=`{k['backend']}` → live=`{l['backend']}`")
# ssl
if l["ssl"] != k["ssl"]:
diffs.append(f"ssl: KB={'' if k['ssl'] else ''} → live={'' if l['ssl'] else ''}")
if diffs:
changed.append((hid, l["domains"], diffs))
return added, removed, changed
def main():
today = date.today().isoformat()
OUT_DIR.mkdir(parents=True, exist_ok=True)
out = OUT_DIR / f"{today}-npm-drift.md"
try:
live = fetch_live_hosts()
except Exception as e:
print(f"NPM API fail: {e}", file=sys.stderr)
sys.exit(1)
if len(live) < 3:
print(f"safety abort: live {len(live)} < 3", file=sys.stderr)
sys.exit(2)
kb = parse_kb_hosts()
added, removed, changed = compare(live, kb)
lines = [
"---",
f"date: {today}",
"type: audit",
"source: kb-audit-npm.py",
"tags: [audit, drift, npm]",
"---",
"",
f"# NPM drift audit — {today}",
"",
f"Сверка [[../projects/dttb/npm-proxy-hosts|npm-proxy-hosts.md]] с NPM API ({NPM_URL}).",
"",
f"- Живых proxy hosts: **{len(live)}**",
f"- В KB: **{len(kb)}**",
f"- Совпадений: {len(set(live) & set(kb))} / новых: {len(added)} / удалённых из NPM: {len(removed)} / с изменениями: {len(changed)}",
"",
]
if added:
lines += ["## ⚠ Новые hosts (в NPM есть, в KB нет)", "",
"| ID | Домены | Backend | SSL | Enabled |",
"|---|---|---|---|---|"]
for hid in added:
h = live[hid]
lines.append(f"| {hid} | `{h['domains']}` | `{h['backend']}` | {'' if h['ssl'] else '-'} | {'on' if h['enabled'] else 'off'} |")
lines.append("")
if removed:
lines += ["## 🗑 Удалены из NPM (но есть в KB)", "",
"| ID | Домен из KB | Backend KB |", "|---|---|---|"]
for hid in removed:
k = kb[hid]
lines.append(f"| {hid} | `{k['domain']}` | `{k['backend']}` |")
lines.append("")
if changed:
lines += ["## 🔄 Изменения (ID совпадает, но что-то сменилось)", ""]
for hid, domains, diffs in changed:
lines.append(f"### #{hid} `{domains}`")
for d in diffs:
lines.append(f"- {d}")
lines.append("")
if not (added or removed or changed):
lines += ["## ✓ NPM полностью совпадает с npm-proxy-hosts.md", ""]
lines += [
"## Полный живой список",
"",
"| ID | Домены | Backend | SSL | Enabled |",
"|---|---|---|---|---|",
]
for hid in sorted(live.keys(), key=int):
h = live[hid]
lines.append(f"| {hid} | `{h['domains']}` | `{h['backend']}` | {'' if h['ssl'] else '-'} | {'on' if h['enabled'] else 'off'} |")
lines += ["", "---", "*Автоматически через `scripts/kb-audit-npm.py`.*"]
out.write_text("\n".join(lines))
print(f"npm drift: {out}")
print(f" added: {len(added)} / removed: {len(removed)} / changed: {len(changed)}")
if __name__ == "__main__":
main()