From 36c5f0bbd16963c261c7cb6fedceae6d70b4617a Mon Sep 17 00:00:00 2001 From: emilsarafutdinov Date: Wed, 1 Jul 2026 02:52:14 +0500 Subject: [PATCH] Add safe external inventory scripts --- .gitignore | 7 + README.md | 53 ++++ reports/.gitkeep | 1 + scripts/build_final_excel.py | 463 +++++++++++++++++++++++++++++ scripts/build_network_mapping.py | 303 +++++++++++++++++++ scripts/check_ssh_auth_methods.py | 151 ++++++++++ scripts/safe_external_inventory.sh | 151 ++++++++++ 7 files changed, 1129 insertions(+) create mode 100644 .gitignore create mode 100644 README.md create mode 100644 reports/.gitkeep create mode 100644 scripts/build_final_excel.py create mode 100644 scripts/build_network_mapping.py create mode 100644 scripts/check_ssh_auth_methods.py create mode 100755 scripts/safe_external_inventory.sh diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..23e4d87 --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +.DS_Store +__pycache__/ +*.py[cod] +.venv*/ +reports/* +!reports/.gitkeep + diff --git a/README.md b/README.md new file mode 100644 index 0000000..26df64d --- /dev/null +++ b/README.md @@ -0,0 +1,53 @@ +# Safe External Inventory + +This repository contains a list of public IPv4 addresses in [public_ipv4_from_master.txt](public_ipv4_from_master.txt) and a helper script for a conservative external inventory pass. + +## What the script does + +The script: + +- validates and deduplicates IPv4 addresses from the input file +- runs a moderate `nmap` inventory against the top 20 TCP ports +- performs light service detection +- writes raw `nmap` outputs plus CSV and Markdown summaries + +It intentionally does not run aggressive vulnerability scripts, OS fingerprinting, or full port sweeps. + +## Usage + +Install `nmap`, then run: + +```bash +chmod +x scripts/safe_external_inventory.sh +scripts/safe_external_inventory.sh +``` + +Optional arguments: + +```bash +scripts/safe_external_inventory.sh \ + public_ipv4_from_master.txt \ + reports/custom-run +``` + +## Output + +Each run creates a timestamped folder under [reports](reports) with: + +- `targets.txt` - cleaned list of validated IPs +- `nmap_inventory.nmap` - normal `nmap` output +- `nmap_inventory.gnmap` - grepable output +- `nmap_inventory.xml` - XML output for tooling +- `summary.csv` - one row per open port +- `summary.md` - quick review queue sorted by host exposure + +## Interpreting the results + +Prioritize hosts that expose: + +- multiple services to the internet +- remote administration ports such as SSH, RDP, WinRM, or control panels +- outdated or unexpected product banners +- plaintext services such as Telnet, FTP, or HTTP admin endpoints + +Use the inventory as a review queue for patching, access reduction, ownership checks, and deeper manual assessment within your approved process. diff --git a/reports/.gitkeep b/reports/.gitkeep new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/reports/.gitkeep @@ -0,0 +1 @@ + diff --git a/scripts/build_final_excel.py b/scripts/build_final_excel.py new file mode 100644 index 0000000..9352b20 --- /dev/null +++ b/scripts/build_final_excel.py @@ -0,0 +1,463 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import csv +import re +from collections import Counter, defaultdict +from pathlib import Path + +from openpyxl import Workbook +from openpyxl.styles import Alignment, Font, PatternFill +from openpyxl.utils import get_column_letter + + +RED_FILL = PatternFill(fill_type="solid", fgColor="FDE2E1") +RED_FONT = Font(color="9C0006", bold=True) +HEADER_FILL = PatternFill(fill_type="solid", fgColor="D9EAF7") +HEADER_FONT = Font(bold=True) +WRAP_ALIGN = Alignment(vertical="top", wrap_text=True) + +OWNER_LABELS = { + "cloud/vps": "Облако / VPS", + "other": "Прочее", + "provider/isp": "Провайдер / ISP", + "corp/perimeter": "Корпоративный периметр", + "edge/cdn": "Edge / CDN / WAF", +} + +REASON_LABELS = { + "Public app/auth surface": "Публичная веб- или auth-поверхность", + "Default page exposed": "Снаружи видна дефолтная страница", + "Internet-facing platform/service": "Публично доступный сервис или платформа", + "Default reverse proxy certificate": "Дефолтный сертификат reverse proxy", + "Self-signed TLS certificate": "Самоподписанный TLS-сертификат", + "TLS cert does not match IP/SNI": "TLS-сертификат не соответствует IP или SNI", + "Corporate wildcard cert on raw IP": "Корпоративный wildcard-сертификат на прямом IP", + "Likely corporate perimeter/origin": "Похоже на корпоративный периметр или origin", + "Looks like CDN/WAF or edge": "Похоже на CDN/WAF или edge-слой", + "SSH exposed": "Снаружи доступен SSH", + "SSH on nonstandard port": "SSH на нестандартном порту", + "Older SSH banner": "Старый баннер SSH", + "SSH offers password authentication": "SSH предлагает парольную аутентификацию", + "Named business asset/domain": "Связан с именованным бизнес-активом или доменом", +} + +PRIORITY_LABELS = { + "HIGH": "Высокий", + "MEDIUM": "Средний", + "LOW": "Низкий", +} + +MANUAL_LABELS = { + "YES": "Да", + "NO": "Нет", +} + + +def read_network_mapping(path: Path) -> list[dict[str, str]]: + with path.open(encoding="utf-8") as fh: + return list(csv.DictReader(fh, delimiter="\t")) + + +def read_ssh(path: Path) -> dict[str, dict[str, list[str]]]: + by_ip: dict[str, dict[str, list[str]]] = defaultdict(lambda: {"ports": [], "banners": []}) + if not path.exists(): + return by_ip + + current_ip = "" + current_port = "" + seen_banner = False + + for raw in path.read_text(encoding="utf-8", errors="replace").splitlines(): + line = raw.strip() + match = re.match(r"^=== ([0-9.]+):(\d+) ===$", line) + if match: + current_ip = match.group(1) + current_port = match.group(2) + seen_banner = False + if current_port not in by_ip[current_ip]["ports"]: + by_ip[current_ip]["ports"].append(current_port) + continue + + if current_ip and "SSH-2.0-" in line and not seen_banner: + banner = line.lstrip("# ").strip() + by_ip[current_ip]["banners"].append(banner) + seen_banner = True + + return by_ip + + +def read_ssh_auth(path: Path) -> dict[str, dict[str, list[str] | str]]: + by_ip: dict[str, dict[str, list[str] | str]] = defaultdict( + lambda: {"methods": [], "password_ports": [], "notes": []} + ) + if not path.exists(): + return by_ip + + with path.open(encoding="utf-8") as fh: + for row in csv.DictReader(fh, delimiter="\t"): + ip = row.get("ip", "") + port = row.get("port", "") + methods = row.get("auth_methods", "") + password_offered = row.get("password_offered", "") + note = row.get("note", "") + if not ip: + continue + if methods: + entry = f"{port}:{methods}" + if entry not in by_ip[ip]["methods"]: + by_ip[ip]["methods"].append(entry) + if password_offered == "yes" and port: + if port not in by_ip[ip]["password_ports"]: + by_ip[ip]["password_ports"].append(port) + if note: + note_entry = f"{port}:{note}" + if note_entry not in by_ip[ip]["notes"]: + by_ip[ip]["notes"].append(note_entry) + return by_ip + + +def has_old_ssh(banners: list[str]) -> bool: + for banner in banners: + if any(x in banner for x in ("OpenSSH_7.2", "OpenSSH_7.6", "OpenSSH_8.2")): + return True + return False + + +def compute_review( + row: dict[str, str], + ssh_info: dict[str, list[str]], + ssh_auth: dict[str, list[str] | str], +) -> tuple[str, str]: + reasons: list[str] = [] + + blob = " ".join( + [ + row.get("http_server", ""), + row.get("http_title", ""), + row.get("http_final_url", ""), + row.get("tech", ""), + row.get("cert_subject_cn", ""), + row.get("cert_flags", ""), + row.get("ptr", ""), + ] + ).lower() + + http_status = row.get("http_status", "") + owner_type = row.get("owner_type", "") + cert_flags = row.get("cert_flags", "") + cert_subject = row.get("cert_subject_cn", "") + ssh_ports = ssh_info.get("ports", []) + ssh_banners = ssh_info.get("banners", []) + password_ports = ssh_auth.get("password_ports", []) + + if http_status in {"200", "401", "500", "502"} and any( + token in blob + for token in ( + "login", + "nextcloud", + "opensearch", + "jitsi", + "mail-in-a-box", + "default page", + "welcome to nginx", + "grafana", + "pwm", + "401 unauthorized", + "forbidden", + ) + ): + reasons.append("Public app/auth surface") + + if "default page" in blob or "welcome to nginx" in blob or "default site" in blob: + reasons.append("Default page exposed") + + if any(token in blob for token in ("mail-in-a-box", "opensearch", "nextcloud", "jitsi")): + reasons.append("Internet-facing platform/service") + + if "træfik default cert" in blob or "traefik default cert" in blob: + reasons.append("Default reverse proxy certificate") + + if "self-signed" in cert_flags: + reasons.append("Self-signed TLS certificate") + + if "mismatched" in cert_flags: + reasons.append("TLS cert does not match IP/SNI") + + if "wildcard" in cert_flags and "fix.ru" in cert_subject: + reasons.append("Corporate wildcard cert on raw IP") + + if owner_type == "corp/perimeter": + reasons.append("Likely corporate perimeter/origin") + + if owner_type == "edge/cdn": + reasons.append("Looks like CDN/WAF or edge") + + if ssh_ports: + reasons.append("SSH exposed") + if any(port != "22" for port in ssh_ports): + reasons.append("SSH on nonstandard port") + if has_old_ssh(ssh_banners): + reasons.append("Older SSH banner") + if password_ports: + reasons.append("SSH offers password authentication") + + if any(token in blob for token in ("seoconference.ru", "vikupai.ru", "jecp.ru", "iteh.ru", "cbn.ru", "fix.ru")): + reasons.append("Named business asset/domain") + + unique_reasons = [] + seen = set() + for reason in reasons: + if reason not in seen: + seen.add(reason) + unique_reasons.append(reason) + + if not unique_reasons: + return ("NO", "") + return ("YES", "; ".join(unique_reasons)) + + +def compute_priority( + row: dict[str, str], + manual_check: str, + reasons: str, + ssh_info: dict[str, list[str]], + ssh_auth: dict[str, list[str] | str], +) -> str: + if manual_check == "NO": + return "LOW" + + blob = " ".join( + [ + row.get("http_server", ""), + row.get("http_title", ""), + row.get("http_final_url", ""), + row.get("cert_subject_cn", ""), + row.get("cert_flags", ""), + reasons, + ] + ).lower() + + if any( + token in blob + for token in ( + "public app/auth surface", + "default page exposed", + "opensearch", + "nextcloud", + "jitsi", + "mail-in-a-box", + "self-signed", + "corporate wildcard cert on raw ip", + "older ssh banner", + "ssh offers password authentication", + "iis", + ) + ): + return "HIGH" + + if ssh_info.get("ports") or row.get("http_status") or row.get("cert_subject_cn"): + return "MEDIUM" + + return "LOW" + + +def autosize(ws) -> None: + widths: dict[int, int] = {} + for row in ws.iter_rows(): + for cell in row: + value = "" if cell.value is None else str(cell.value) + widths[cell.column] = max(widths.get(cell.column, 0), min(len(value), 80)) + for column_idx, width in widths.items(): + ws.column_dimensions[get_column_letter(column_idx)].width = max(12, min(width + 2, 60)) + + +def translate_owner(owner_type: str) -> str: + return OWNER_LABELS.get(owner_type, owner_type) + + +def translate_reasons(reasons: str) -> str: + if not reasons: + return "" + parts = [part.strip() for part in reasons.split(";") if part.strip()] + translated = [REASON_LABELS.get(part, part) for part in parts] + return "; ".join(translated) + + +def translate_priority(priority: str) -> str: + return PRIORITY_LABELS.get(priority, priority) + + +def translate_manual(manual: str) -> str: + return MANUAL_LABELS.get(manual, manual) + + +def build_workbook( + rows: list[dict[str, str]], + ssh: dict[str, dict[str, list[str]]], + ssh_auth: dict[str, dict[str, list[str] | str]], + out_path: Path, +) -> None: + wb = Workbook() + ws = wb.active + ws.title = "Активы" + + headers = [ + "IP", + "Проверить вручную", + "Приоритет", + "Причины ручной проверки", + "Тип владельца", + "Владелец / ASN Holder", + "ASN", + "Префикс", + "PTR", + "HTTP статус", + "HTTP сервер", + "HTTP заголовок / title", + "Итоговый URL / redirect", + "TLS subject CN", + "TLS issuer CN", + "TLS признаки", + "Технологии", + "SSH порты", + "SSH баннеры", + "SSH методы аутентификации", + "SSH пароль предложен", + "SSH примечания", + ] + ws.append(headers) + + for cell in ws[1]: + cell.fill = HEADER_FILL + cell.font = HEADER_FONT + cell.alignment = WRAP_ALIGN + + holder_counter: Counter[str] = Counter() + owner_counter: Counter[str] = Counter() + cert_counter: Counter[str] = Counter() + manual_counter: Counter[str] = Counter() + + for row in rows: + ip = row["ip"] + ssh_info = ssh.get(ip, {"ports": [], "banners": []}) + ssh_auth_info = ssh_auth.get(ip, {"methods": [], "password_ports": [], "notes": []}) + manual_check, review_reasons = compute_review(row, ssh_info, ssh_auth_info) + priority = compute_priority(row, manual_check, review_reasons, ssh_info, ssh_auth_info) + + holder_counter[row["holder"]] += 1 + owner_counter[row["owner_type"]] += 1 + if row["cert_subject_cn"]: + cert_counter[row["cert_subject_cn"]] += 1 + manual_counter[priority] += 1 + + out_row = [ + ip, + translate_manual(manual_check), + translate_priority(priority), + translate_reasons(review_reasons), + translate_owner(row["owner_type"]), + row["holder"], + row["asn"], + row["prefix"], + row["ptr"], + row["http_status"], + row["http_server"], + row["http_title"], + row["http_final_url"], + row["cert_subject_cn"], + row["cert_issuer_cn"], + row["cert_flags"], + row["tech"], + ", ".join(sorted(ssh_info["ports"], key=lambda x: int(x))), + "\n".join(ssh_info["banners"]), + "\n".join(ssh_auth_info.get("methods", [])), + "Да" if ssh_auth_info.get("password_ports") else "Нет", + "\n".join(ssh_auth_info.get("notes", [])), + ] + ws.append(out_row) + + current_row = ws.max_row + for cell in ws[current_row]: + cell.alignment = WRAP_ALIGN + + if manual_check == "YES": + for cell in ws[current_row]: + cell.fill = RED_FILL + ws.cell(current_row, 2).font = RED_FONT + ws.cell(current_row, 3).font = RED_FONT + ws.cell(current_row, 4).font = RED_FONT + + ws.freeze_panes = "A2" + ws.auto_filter.ref = ws.dimensions + autosize(ws) + + summary = wb.create_sheet("Сводка") + summary["A1"] = "Итоговая сводка по внешней экспозиции" + summary["A1"].font = Font(bold=True, size=14) + summary["A3"] = "Всего IP" + summary["B3"] = len(rows) + summary["A4"] = "Требуют ручной проверки" + summary["B4"] = sum(1 for row_idx in range(2, ws.max_row + 1) if ws.cell(row_idx, 2).value == "Да") + summary["A5"] = "Приоритет высокий" + summary["B5"] = manual_counter["HIGH"] + summary["A6"] = "Приоритет средний" + summary["B6"] = manual_counter["MEDIUM"] + summary["A7"] = "Приоритет низкий" + summary["B7"] = manual_counter["LOW"] + summary["A9"] = "Легенда" + summary["A10"] = "Красные строки" + summary["B10"] = "Нужно проверять вручную в первую очередь" + summary["A10"].fill = RED_FILL + summary["B10"].fill = RED_FILL + + summary["A12"] = "Количество по типам владельцев" + summary["A12"].font = HEADER_FONT + row_idx = 13 + for owner_type, count in owner_counter.most_common(): + summary.cell(row_idx, 1, translate_owner(owner_type)) + summary.cell(row_idx, 2, count) + row_idx += 1 + + row_idx += 1 + summary.cell(row_idx, 1, "Основные владельцы / провайдеры").font = HEADER_FONT + row_idx += 1 + for holder, count in holder_counter.most_common(15): + if not holder: + continue + summary.cell(row_idx, 1, holder) + summary.cell(row_idx, 2, count) + row_idx += 1 + + row_idx += 1 + summary.cell(row_idx, 1, "Частые TLS subject CN").font = HEADER_FONT + row_idx += 1 + for subject, count in cert_counter.most_common(15): + summary.cell(row_idx, 1, subject) + summary.cell(row_idx, 2, count) + row_idx += 1 + + autosize(summary) + wb.save(out_path) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--mapping-tsv", required=True) + parser.add_argument("--ssh-file", required=True) + parser.add_argument("--ssh-auth-file", required=False, default="") + parser.add_argument("--output", required=True) + args = parser.parse_args() + + rows = read_network_mapping(Path(args.mapping_tsv)) + ssh = read_ssh(Path(args.ssh_file)) + ssh_auth = read_ssh_auth(Path(args.ssh_auth_file)) if args.ssh_auth_file else {} + build_workbook(rows, ssh, ssh_auth, Path(args.output)) + print(args.output) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/build_network_mapping.py b/scripts/build_network_mapping.py new file mode 100644 index 0000000..050d5a0 --- /dev/null +++ b/scripts/build_network_mapping.py @@ -0,0 +1,303 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import csv +import json +import sys +import time +from collections import Counter, defaultdict +from pathlib import Path +from typing import Any +from urllib.error import HTTPError, URLError +from urllib.parse import urlencode +from urllib.request import urlopen + + +def load_ips(path: Path) -> list[str]: + ips: list[str] = [] + for raw in path.read_text(encoding="utf-8").splitlines(): + line = raw.strip() + if not line or line.startswith("#"): + continue + ips.append(line) + return ips + + +def load_ptrs(path: Path) -> dict[str, str]: + ptrs: dict[str, str] = {} + if not path.exists(): + return ptrs + + for idx, raw in enumerate(path.read_text(encoding="utf-8").splitlines()): + if idx == 0 and raw.startswith("ip\tptr"): + continue + line = raw.rstrip("\n") + if not line: + continue + ip, _, ptr = line.partition("\t") + ptrs[ip.strip()] = ptr.strip() + return ptrs + + +def load_httpx(path: Path) -> dict[str, list[dict[str, Any]]]: + by_ip: dict[str, list[dict[str, Any]]] = defaultdict(list) + if not path.exists(): + return by_ip + + for raw in path.read_text(encoding="utf-8").splitlines(): + line = raw.strip() + if not line: + continue + try: + row = json.loads(line) + except json.JSONDecodeError: + continue + ip = row.get("input") + if not ip: + continue + by_ip[ip].append(row) + return by_ip + + +def fetch_json(url: str, timeout: float = 20.0) -> dict[str, Any]: + with urlopen(url, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + + +def network_info(ip: str) -> tuple[str, str]: + url = "https://stat.ripe.net/data/network-info/data.json?" + urlencode( + {"resource": ip} + ) + data = fetch_json(url).get("data", {}) + asns = data.get("asns") or [] + prefix = data.get("prefix") or "" + return (asns[0] if asns else "", prefix) + + +def as_holder(asn: str) -> str: + if not asn: + return "" + url = "https://stat.ripe.net/data/as-overview/data.json?" + urlencode( + {"resource": asn} + ) + data = fetch_json(url).get("data", {}) + return data.get("holder") or "" + + +def classify_owner(holder: str, ptr: str, techs: list[str], cert_cn: str) -> str: + blob = " ".join([holder, ptr, " ".join(techs), cert_cn]).lower() + if any(x in blob for x in ("cloudflare", "akamai", "ddos-guard", "cdn", "waf")): + return "edge/cdn" + if any( + x in blob + for x in ( + "digitalocean", + "hetzner", + "amazon", + "google", + "microsoft", + "vultr", + "linode", + "selectel", + "vk cloud", + "timeweb", + "firstbyte", + "vdsina", + "your-server.de", + ) + ): + return "cloud/vps" + if any(x in blob for x in ("fix.ru", "blackhole.fix.ru", "vpn03.fix.ru", "*.fix.ru")): + return "corp/perimeter" + if any(x in blob for x in ("telecom", "isp", "obit", "network", "hosted-by")): + return "provider/isp" + return "other" + + +def summarize_http(rows: list[dict[str, Any]]) -> tuple[str, str, str, str, list[str]]: + if not rows: + return ("", "", "", "", []) + + rows = sorted( + rows, + key=lambda r: ( + int(str(r.get("status_code", 0)) == "200"), + int(str(r.get("port", "0")).isdigit() and r.get("port") in ("443", "80")), + len(str(r.get("title") or "")), + ), + reverse=True, + ) + top = rows[0] + server = str(top.get("webserver") or "") + status = str(top.get("status_code") or "") + title = str(top.get("title") or "") + final_url = str(top.get("final_url") or top.get("location") or "") + techs = sorted({t for row in rows for t in row.get("tech", []) if isinstance(t, str)}) + return (status, server, title, final_url, techs) + + +def summarize_cert(rows: list[dict[str, Any]]) -> tuple[str, str, str]: + for row in rows: + tls = row.get("tls") + if not isinstance(tls, dict): + continue + subject = str(tls.get("subject_cn") or "") + issuer = str(tls.get("issuer_cn") or "") + flags: list[str] = [] + if tls.get("mismatched"): + flags.append("mismatched") + if tls.get("wildcard_certificate"): + flags.append("wildcard") + if tls.get("self_signed"): + flags.append("self-signed") + return (subject, issuer, ",".join(flags)) + return ("", "", "") + + +def write_outputs( + out_dir: Path, + rows: list[dict[str, str]], + owner_counts: Counter[str], + cert_counts: Counter[str], +) -> None: + tsv_path = out_dir / "network_mapping.tsv" + md_path = out_dir / "network_mapping.md" + + with tsv_path.open("w", encoding="utf-8", newline="") as fh: + writer = csv.DictWriter( + fh, + delimiter="\t", + fieldnames=[ + "ip", + "ptr", + "asn", + "prefix", + "holder", + "owner_type", + "http_status", + "http_server", + "http_title", + "http_final_url", + "cert_subject_cn", + "cert_issuer_cn", + "cert_flags", + "tech", + ], + ) + writer.writeheader() + writer.writerows(rows) + + with md_path.open("w", encoding="utf-8") as fh: + fh.write("# Network Mapping Summary\n\n") + fh.write(f"- Total IPs: {len(rows)}\n") + fh.write(f"- IPs with HTTP/HTTPS responses: {sum(1 for row in rows if row['http_status'])}\n") + fh.write(f"- IPs with TLS subject CN captured: {sum(1 for row in rows if row['cert_subject_cn'])}\n\n") + + fh.write("## Owner Type Counts\n\n") + for key, value in owner_counts.most_common(): + fh.write(f"- {key}: {value}\n") + + fh.write("\n## Frequent Certificate Subjects\n\n") + for key, value in cert_counts.most_common(10): + if not key: + continue + fh.write(f"- {key}: {value}\n") + + fh.write("\n## Priority Review Candidates\n\n") + fh.write("| IP | Owner Type | Holder | HTTP | TLS Subject | PTR |\n") + fh.write("| --- | --- | --- | --- | --- | --- |\n") + for row in rows: + if row["http_status"] or row["cert_subject_cn"] or row["ptr"]: + http = " ".join( + x + for x in [row["http_status"], row["http_server"], row["http_title"]] + if x + ).strip() + fh.write( + f"| {row['ip']} | {row['owner_type']} | {row['holder']} | " + f"{http} | {row['cert_subject_cn']} | {row['ptr']} |\n" + ) + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--ip-file", required=True) + parser.add_argument("--ptr-file", required=True) + parser.add_argument("--httpx-file", required=True) + parser.add_argument("--out-dir", required=True) + args = parser.parse_args() + + ip_file = Path(args.ip_file) + ptr_file = Path(args.ptr_file) + httpx_file = Path(args.httpx_file) + out_dir = Path(args.out_dir) + out_dir.mkdir(parents=True, exist_ok=True) + + ips = load_ips(ip_file) + ptrs = load_ptrs(ptr_file) + httpx = load_httpx(httpx_file) + + holder_cache: dict[str, str] = {} + rows: list[dict[str, str]] = [] + owner_counts: Counter[str] = Counter() + cert_counts: Counter[str] = Counter() + + for idx, ip in enumerate(ips, start=1): + try: + asn, prefix = network_info(ip) + except (HTTPError, URLError, TimeoutError) as exc: + print(f"warning: network-info failed for {ip}: {exc}", file=sys.stderr) + asn, prefix = "", "" + + holder = holder_cache.get(asn, "") + if asn and not holder: + try: + holder = as_holder(asn) + except (HTTPError, URLError, TimeoutError) as exc: + print(f"warning: as-overview failed for AS{asn}: {exc}", file=sys.stderr) + holder = "" + holder_cache[asn] = holder + + http_status, http_server, http_title, http_final_url, techs = summarize_http( + httpx.get(ip, []) + ) + cert_subject, cert_issuer, cert_flags = summarize_cert(httpx.get(ip, [])) + owner_type = classify_owner(holder, ptrs.get(ip, ""), techs, cert_subject) + + owner_counts[owner_type] += 1 + if cert_subject: + cert_counts[cert_subject] += 1 + + rows.append( + { + "ip": ip, + "ptr": ptrs.get(ip, ""), + "asn": asn, + "prefix": prefix, + "holder": holder, + "owner_type": owner_type, + "http_status": http_status, + "http_server": http_server, + "http_title": http_title, + "http_final_url": http_final_url, + "cert_subject_cn": cert_subject, + "cert_issuer_cn": cert_issuer, + "cert_flags": cert_flags, + "tech": ",".join(techs), + } + ) + + if idx % 20 == 0 or idx == len(ips): + print(f"processed {idx}/{len(ips)}", file=sys.stderr) + time.sleep(0.05) + + write_outputs(out_dir, rows, owner_counts, cert_counts) + print(out_dir / "network_mapping.tsv") + print(out_dir / "network_mapping.md") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/check_ssh_auth_methods.py b/scripts/check_ssh_auth_methods.py new file mode 100644 index 0000000..3910b01 --- /dev/null +++ b/scripts/check_ssh_auth_methods.py @@ -0,0 +1,151 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import csv +import re +import subprocess +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path + + +HEADER_RE = re.compile(r"^=== ([0-9.]+):(\d+) ===$") +METHODS_RE = re.compile(r"Permission denied \(([^)]+)\)") + + +def parse_targets(path: Path) -> list[tuple[str, str]]: + targets: list[tuple[str, str]] = [] + seen: set[tuple[str, str]] = set() + + for raw in path.read_text(encoding="utf-8", errors="replace").splitlines(): + line = raw.strip() + match = HEADER_RE.match(line) + if not match: + continue + target = (match.group(1), match.group(2)) + if target not in seen: + seen.add(target) + targets.append(target) + return targets + + +def check_target(ip: str, port: str, username: str, timeout: int) -> dict[str, str]: + cmd = [ + "ssh", + "-F", + "/dev/null", + "-o", + "StrictHostKeyChecking=no", + "-o", + "UserKnownHostsFile=/dev/null", + "-o", + "GlobalKnownHostsFile=/dev/null", + "-o", + "LogLevel=ERROR", + "-o", + "PreferredAuthentications=none", + "-o", + "PubkeyAuthentication=no", + "-o", + "PasswordAuthentication=no", + "-o", + "KbdInteractiveAuthentication=no", + "-o", + "GSSAPIAuthentication=no", + "-o", + "BatchMode=yes", + "-o", + f"ConnectTimeout={timeout}", + "-p", + port, + f"{username}@{ip}", + "true", + ] + + result = subprocess.run(cmd, capture_output=True, text=True) + message = (result.stderr or result.stdout or "").strip() + methods = "" + password_offered = "unknown" + interactive_offered = "unknown" + note = "" + + match = METHODS_RE.search(message) + if match: + methods = match.group(1) + offered = {item.strip() for item in methods.split(",") if item.strip()} + password_offered = "yes" if "password" in offered else "no" + interactive_offered = ( + "yes" + if any(item in offered for item in ("keyboard-interactive", "kbdint")) + else "no" + ) + note = "auth methods advertised" + elif "Connection timed out" in message or "Operation timed out" in message: + note = "timeout" + elif "Connection refused" in message: + note = "refused" + elif "Permission denied" in message: + note = "permission denied without explicit methods" + elif message: + note = message + else: + note = "no response" + + return { + "ip": ip, + "port": port, + "auth_methods": methods, + "password_offered": password_offered, + "kbdinteractive_offered": interactive_offered, + "note": note, + } + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--ssh-keyscan-file", required=True) + parser.add_argument("--output", required=True) + parser.add_argument("--username", default="auditcheck") + parser.add_argument("--timeout", type=int, default=5) + parser.add_argument("--workers", type=int, default=16) + args = parser.parse_args() + + targets = parse_targets(Path(args.ssh_keyscan_file)) + rows: list[dict[str, str]] = [] + + with ThreadPoolExecutor(max_workers=args.workers) as pool: + futures = [ + pool.submit(check_target, ip, port, args.username, args.timeout) + for ip, port in targets + ] + for idx, future in enumerate(as_completed(futures), start=1): + rows.append(future.result()) + if idx % 10 == 0 or idx == len(futures): + print(f"processed {idx}/{len(futures)}", flush=True) + + rows.sort(key=lambda row: (tuple(int(x) for x in row["ip"].split(".")), int(row["port"]))) + + out_path = Path(args.output) + with out_path.open("w", encoding="utf-8", newline="") as fh: + writer = csv.DictWriter( + fh, + delimiter="\t", + fieldnames=[ + "ip", + "port", + "auth_methods", + "password_offered", + "kbdinteractive_offered", + "note", + ], + ) + writer.writeheader() + writer.writerows(rows) + + print(out_path) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/safe_external_inventory.sh b/scripts/safe_external_inventory.sh new file mode 100755 index 0000000..a415641 --- /dev/null +++ b/scripts/safe_external_inventory.sh @@ -0,0 +1,151 @@ +#!/usr/bin/env bash + +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +INPUT_FILE="${1:-$ROOT_DIR/public_ipv4_from_master.txt}" +STAMP="$(date +%Y%m%d-%H%M%S)" +OUTPUT_DIR="${2:-$ROOT_DIR/reports/inventory-$STAMP}" +TARGETS_FILE="$OUTPUT_DIR/targets.txt" +RAW_BASENAME="$OUTPUT_DIR/nmap_inventory" +SUMMARY_CSV="$OUTPUT_DIR/summary.csv" +SUMMARY_MD="$OUTPUT_DIR/summary.md" + +mkdir -p "$OUTPUT_DIR" + +if ! command -v nmap >/dev/null 2>&1; then + echo "nmap is not installed. Install it first, then rerun this script." >&2 + echo "Expected input: $INPUT_FILE" >&2 + echo "Planned output directory: $OUTPUT_DIR" >&2 + exit 1 +fi + +if [[ ! -f "$INPUT_FILE" ]]; then + echo "Input file not found: $INPUT_FILE" >&2 + exit 1 +fi + +python3 - "$INPUT_FILE" "$TARGETS_FILE" <<'PY' +import ipaddress +import sys + +src, dst = sys.argv[1], sys.argv[2] +valid = [] +seen = set() + +with open(src, "r", encoding="utf-8") as fh: + for raw in fh: + line = raw.strip() + if not line or line.startswith("#"): + continue + try: + ip = ipaddress.IPv4Address(line) + except ipaddress.AddressValueError: + print(f"Skipping invalid IPv4: {line}", file=sys.stderr) + continue + text = str(ip) + if text not in seen: + seen.add(text) + valid.append(text) + +with open(dst, "w", encoding="utf-8") as fh: + for ip in valid: + fh.write(f"{ip}\n") + +print(len(valid)) +PY + +TARGET_COUNT="$(wc -l < "$TARGETS_FILE" | tr -d ' ')" + +if [[ "$TARGET_COUNT" -eq 0 ]]; then + echo "No valid IPv4 targets found in $INPUT_FILE" >&2 + exit 1 +fi + +echo "Validated $TARGET_COUNT targets" +echo "Running a conservative external inventory scan" + +nmap \ + -Pn \ + -n \ + -T3 \ + --top-ports 20 \ + -sV \ + --version-light \ + --open \ + --max-retries 2 \ + --host-timeout 2m \ + -iL "$TARGETS_FILE" \ + -oA "$RAW_BASENAME" + +python3 - "$RAW_BASENAME.gnmap" "$SUMMARY_CSV" "$SUMMARY_MD" "$TARGET_COUNT" <<'PY' +import csv +import sys +from collections import defaultdict + +gnmap_path, csv_path, md_path, total_targets = sys.argv[1:5] +rows = [] +ports_by_ip = defaultdict(list) +seen_hosts = set() + +with open(gnmap_path, "r", encoding="utf-8", errors="replace") as fh: + for raw in fh: + line = raw.strip() + if not line.startswith("Host: "): + continue + if "Ports: " not in line: + continue + host = line.split()[1] + seen_hosts.add(host) + ports_blob = line.split("Ports: ", 1)[1] + for item in ports_blob.split(", "): + parts = item.split("/") + if len(parts) < 7: + continue + port, state, proto, _, service, product, extra = parts[:7] + if state != "open": + continue + product_info = " ".join(x for x in (product, extra) if x).strip() + rows.append( + { + "ip": host, + "port": port, + "protocol": proto, + "state": state, + "service": service or "unknown", + "product": product_info or "-", + } + ) + ports_by_ip[host].append(f"{port}/{proto} {service or 'unknown'}".strip()) + +with open(csv_path, "w", newline="", encoding="utf-8") as fh: + writer = csv.DictWriter( + fh, + fieldnames=["ip", "port", "protocol", "state", "service", "product"], + ) + writer.writeheader() + writer.writerows(rows) + +sorted_hosts = sorted(ports_by_ip.items(), key=lambda item: (-len(item[1]), item[0])) + +with open(md_path, "w", encoding="utf-8") as fh: + fh.write("# External Inventory Summary\n\n") + fh.write(f"- Total validated targets: {total_targets}\n") + fh.write(f"- Hosts with at least one open top port: {len(sorted_hosts)}\n") + fh.write(f"- CSV details: `{csv_path}`\n") + fh.write(f"- Raw Nmap files: `{gnmap_path[:-6]}` (`.nmap`, `.gnmap`, `.xml`)\n\n") + fh.write("## Prioritized review queue\n\n") + fh.write("| IP | Open ports found | Services |\n") + fh.write("| --- | ---: | --- |\n") + for ip, entries in sorted_hosts: + fh.write(f"| {ip} | {len(entries)} | {', '.join(entries)} |\n") + +print(f"Wrote {len(rows)} rows to {csv_path}") +print(f"Wrote Markdown summary to {md_path}") +PY + +echo +echo "Inventory complete" +echo "Output directory: $OUTPUT_DIR" +echo "Summary CSV: $SUMMARY_CSV" +echo "Summary report: $SUMMARY_MD"