#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import json import sys import time from collections import Counter, defaultdict from pathlib import Path from typing import Any from urllib.error import HTTPError, URLError from urllib.parse import urlencode from urllib.request import urlopen def load_ips(path: Path) -> list[str]: ips: list[str] = [] for raw in path.read_text(encoding="utf-8").splitlines(): line = raw.strip() if not line or line.startswith("#"): continue ips.append(line) return ips def load_ptrs(path: Path) -> dict[str, str]: ptrs: dict[str, str] = {} if not path.exists(): return ptrs for idx, raw in enumerate(path.read_text(encoding="utf-8").splitlines()): if idx == 0 and raw.startswith("ip\tptr"): continue line = raw.rstrip("\n") if not line: continue ip, _, ptr = line.partition("\t") ptrs[ip.strip()] = ptr.strip() return ptrs def load_httpx(path: Path) -> dict[str, list[dict[str, Any]]]: by_ip: dict[str, list[dict[str, Any]]] = defaultdict(list) if not path.exists(): return by_ip for raw in path.read_text(encoding="utf-8").splitlines(): line = raw.strip() if not line: continue try: row = json.loads(line) except json.JSONDecodeError: continue ip = row.get("input") if not ip: continue by_ip[ip].append(row) return by_ip def fetch_json(url: str, timeout: float = 20.0) -> dict[str, Any]: with urlopen(url, timeout=timeout) as resp: return json.loads(resp.read().decode("utf-8")) def network_info(ip: str) -> tuple[str, str]: url = "https://stat.ripe.net/data/network-info/data.json?" + urlencode( {"resource": ip} ) data = fetch_json(url).get("data", {}) asns = data.get("asns") or [] prefix = data.get("prefix") or "" return (asns[0] if asns else "", prefix) def as_holder(asn: str) -> str: if not asn: return "" url = "https://stat.ripe.net/data/as-overview/data.json?" + urlencode( {"resource": asn} ) data = fetch_json(url).get("data", {}) return data.get("holder") or "" def classify_owner(holder: str, ptr: str, techs: list[str], cert_cn: str) -> str: blob = " ".join([holder, ptr, " ".join(techs), cert_cn]).lower() if any(x in blob for x in ("cloudflare", "akamai", "ddos-guard", "cdn", "waf")): return "edge/cdn" if any( x in blob for x in ( "digitalocean", "hetzner", "amazon", "google", "microsoft", "vultr", "linode", "selectel", "vk cloud", "timeweb", "firstbyte", "vdsina", "your-server.de", ) ): return "cloud/vps" if any(x in blob for x in ("fix.ru", "blackhole.fix.ru", "vpn03.fix.ru", "*.fix.ru")): return "corp/perimeter" if any(x in blob for x in ("telecom", "isp", "obit", "network", "hosted-by")): return "provider/isp" return "other" def summarize_http(rows: list[dict[str, Any]]) -> tuple[str, str, str, str, list[str]]: if not rows: return ("", "", "", "", []) rows = sorted( rows, key=lambda r: ( int(str(r.get("status_code", 0)) == "200"), int(str(r.get("port", "0")).isdigit() and r.get("port") in ("443", "80")), len(str(r.get("title") or "")), ), reverse=True, ) top = rows[0] server = str(top.get("webserver") or "") status = str(top.get("status_code") or "") title = str(top.get("title") or "") final_url = str(top.get("final_url") or top.get("location") or "") techs = sorted({t for row in rows for t in row.get("tech", []) if isinstance(t, str)}) return (status, server, title, final_url, techs) def summarize_cert(rows: list[dict[str, Any]]) -> tuple[str, str, str]: for row in rows: tls = row.get("tls") if not isinstance(tls, dict): continue subject = str(tls.get("subject_cn") or "") issuer = str(tls.get("issuer_cn") or "") flags: list[str] = [] if tls.get("mismatched"): flags.append("mismatched") if tls.get("wildcard_certificate"): flags.append("wildcard") if tls.get("self_signed"): flags.append("self-signed") return (subject, issuer, ",".join(flags)) return ("", "", "") def write_outputs( out_dir: Path, rows: list[dict[str, str]], owner_counts: Counter[str], cert_counts: Counter[str], ) -> None: tsv_path = out_dir / "network_mapping.tsv" md_path = out_dir / "network_mapping.md" with tsv_path.open("w", encoding="utf-8", newline="") as fh: writer = csv.DictWriter( fh, delimiter="\t", fieldnames=[ "ip", "ptr", "asn", "prefix", "holder", "owner_type", "http_status", "http_server", "http_title", "http_final_url", "cert_subject_cn", "cert_issuer_cn", "cert_flags", "tech", ], ) writer.writeheader() writer.writerows(rows) with md_path.open("w", encoding="utf-8") as fh: fh.write("# Network Mapping Summary\n\n") fh.write(f"- Total IPs: {len(rows)}\n") fh.write(f"- IPs with HTTP/HTTPS responses: {sum(1 for row in rows if row['http_status'])}\n") fh.write(f"- IPs with TLS subject CN captured: {sum(1 for row in rows if row['cert_subject_cn'])}\n\n") fh.write("## Owner Type Counts\n\n") for key, value in owner_counts.most_common(): fh.write(f"- {key}: {value}\n") fh.write("\n## Frequent Certificate Subjects\n\n") for key, value in cert_counts.most_common(10): if not key: continue fh.write(f"- {key}: {value}\n") fh.write("\n## Priority Review Candidates\n\n") fh.write("| IP | Owner Type | Holder | HTTP | TLS Subject | PTR |\n") fh.write("| --- | --- | --- | --- | --- | --- |\n") for row in rows: if row["http_status"] or row["cert_subject_cn"] or row["ptr"]: http = " ".join( x for x in [row["http_status"], row["http_server"], row["http_title"]] if x ).strip() fh.write( f"| {row['ip']} | {row['owner_type']} | {row['holder']} | " f"{http} | {row['cert_subject_cn']} | {row['ptr']} |\n" ) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--ip-file", required=True) parser.add_argument("--ptr-file", required=True) parser.add_argument("--httpx-file", required=True) parser.add_argument("--out-dir", required=True) args = parser.parse_args() ip_file = Path(args.ip_file) ptr_file = Path(args.ptr_file) httpx_file = Path(args.httpx_file) out_dir = Path(args.out_dir) out_dir.mkdir(parents=True, exist_ok=True) ips = load_ips(ip_file) ptrs = load_ptrs(ptr_file) httpx = load_httpx(httpx_file) holder_cache: dict[str, str] = {} rows: list[dict[str, str]] = [] owner_counts: Counter[str] = Counter() cert_counts: Counter[str] = Counter() for idx, ip in enumerate(ips, start=1): try: asn, prefix = network_info(ip) except (HTTPError, URLError, TimeoutError) as exc: print(f"warning: network-info failed for {ip}: {exc}", file=sys.stderr) asn, prefix = "", "" holder = holder_cache.get(asn, "") if asn and not holder: try: holder = as_holder(asn) except (HTTPError, URLError, TimeoutError) as exc: print(f"warning: as-overview failed for AS{asn}: {exc}", file=sys.stderr) holder = "" holder_cache[asn] = holder http_status, http_server, http_title, http_final_url, techs = summarize_http( httpx.get(ip, []) ) cert_subject, cert_issuer, cert_flags = summarize_cert(httpx.get(ip, [])) owner_type = classify_owner(holder, ptrs.get(ip, ""), techs, cert_subject) owner_counts[owner_type] += 1 if cert_subject: cert_counts[cert_subject] += 1 rows.append( { "ip": ip, "ptr": ptrs.get(ip, ""), "asn": asn, "prefix": prefix, "holder": holder, "owner_type": owner_type, "http_status": http_status, "http_server": http_server, "http_title": http_title, "http_final_url": http_final_url, "cert_subject_cn": cert_subject, "cert_issuer_cn": cert_issuer, "cert_flags": cert_flags, "tech": ",".join(techs), } ) if idx % 20 == 0 or idx == len(ips): print(f"processed {idx}/{len(ips)}", file=sys.stderr) time.sleep(0.05) write_outputs(out_dir, rows, owner_counts, cert_counts) print(out_dir / "network_mapping.tsv") print(out_dir / "network_mapping.md") return 0 if __name__ == "__main__": raise SystemExit(main())