#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import re from collections import Counter, defaultdict from pathlib import Path from openpyxl import Workbook from openpyxl.styles import Alignment, Font, PatternFill from openpyxl.utils import get_column_letter RED_FILL = PatternFill(fill_type="solid", fgColor="FDE2E1") RED_FONT = Font(color="9C0006", bold=True) HEADER_FILL = PatternFill(fill_type="solid", fgColor="D9EAF7") HEADER_FONT = Font(bold=True) WRAP_ALIGN = Alignment(vertical="top", wrap_text=True) OWNER_LABELS = { "cloud/vps": "Облако / VPS", "other": "Прочее", "provider/isp": "Провайдер / ISP", "corp/perimeter": "Корпоративный периметр", "edge/cdn": "Edge / CDN / WAF", } REASON_LABELS = { "Public app/auth surface": "Публичная веб- или auth-поверхность", "Default page exposed": "Снаружи видна дефолтная страница", "Internet-facing platform/service": "Публично доступный сервис или платформа", "Default reverse proxy certificate": "Дефолтный сертификат reverse proxy", "Self-signed TLS certificate": "Самоподписанный TLS-сертификат", "TLS cert does not match IP/SNI": "TLS-сертификат не соответствует IP или SNI", "Corporate wildcard cert on raw IP": "Корпоративный wildcard-сертификат на прямом IP", "Likely corporate perimeter/origin": "Похоже на корпоративный периметр или origin", "Looks like CDN/WAF or edge": "Похоже на CDN/WAF или edge-слой", "SSH exposed": "Снаружи доступен SSH", "SSH on nonstandard port": "SSH на нестандартном порту", "Older SSH banner": "Старый баннер SSH", "SSH offers password authentication": "SSH предлагает парольную аутентификацию", "Named business asset/domain": "Связан с именованным бизнес-активом или доменом", } PRIORITY_LABELS = { "HIGH": "Высокий", "MEDIUM": "Средний", "LOW": "Низкий", } MANUAL_LABELS = { "YES": "Да", "NO": "Нет", } def read_network_mapping(path: Path) -> list[dict[str, str]]: with path.open(encoding="utf-8") as fh: return list(csv.DictReader(fh, delimiter="\t")) def read_ssh(path: Path) -> dict[str, dict[str, list[str]]]: by_ip: dict[str, dict[str, list[str]]] = defaultdict(lambda: {"ports": [], "banners": []}) if not path.exists(): return by_ip current_ip = "" current_port = "" seen_banner = False for raw in path.read_text(encoding="utf-8", errors="replace").splitlines(): line = raw.strip() match = re.match(r"^=== ([0-9.]+):(\d+) ===$", line) if match: current_ip = match.group(1) current_port = match.group(2) seen_banner = False if current_port not in by_ip[current_ip]["ports"]: by_ip[current_ip]["ports"].append(current_port) continue if current_ip and "SSH-2.0-" in line and not seen_banner: banner = line.lstrip("# ").strip() by_ip[current_ip]["banners"].append(banner) seen_banner = True return by_ip def read_ssh_auth(path: Path) -> dict[str, dict[str, list[str] | str]]: by_ip: dict[str, dict[str, list[str] | str]] = defaultdict( lambda: {"methods": [], "password_ports": [], "notes": []} ) if not path.exists(): return by_ip with path.open(encoding="utf-8") as fh: for row in csv.DictReader(fh, delimiter="\t"): ip = row.get("ip", "") port = row.get("port", "") methods = row.get("auth_methods", "") password_offered = row.get("password_offered", "") note = row.get("note", "") if not ip: continue if methods: entry = f"{port}:{methods}" if entry not in by_ip[ip]["methods"]: by_ip[ip]["methods"].append(entry) if password_offered == "yes" and port: if port not in by_ip[ip]["password_ports"]: by_ip[ip]["password_ports"].append(port) if note: note_entry = f"{port}:{note}" if note_entry not in by_ip[ip]["notes"]: by_ip[ip]["notes"].append(note_entry) return by_ip def has_old_ssh(banners: list[str]) -> bool: for banner in banners: if any(x in banner for x in ("OpenSSH_7.2", "OpenSSH_7.6", "OpenSSH_8.2")): return True return False def compute_review( row: dict[str, str], ssh_info: dict[str, list[str]], ssh_auth: dict[str, list[str] | str], ) -> tuple[str, str]: reasons: list[str] = [] blob = " ".join( [ row.get("http_server", ""), row.get("http_title", ""), row.get("http_final_url", ""), row.get("tech", ""), row.get("cert_subject_cn", ""), row.get("cert_flags", ""), row.get("ptr", ""), ] ).lower() http_status = row.get("http_status", "") owner_type = row.get("owner_type", "") cert_flags = row.get("cert_flags", "") cert_subject = row.get("cert_subject_cn", "") ssh_ports = ssh_info.get("ports", []) ssh_banners = ssh_info.get("banners", []) password_ports = ssh_auth.get("password_ports", []) if http_status in {"200", "401", "500", "502"} and any( token in blob for token in ( "login", "nextcloud", "opensearch", "jitsi", "mail-in-a-box", "default page", "welcome to nginx", "grafana", "pwm", "401 unauthorized", "forbidden", ) ): reasons.append("Public app/auth surface") if "default page" in blob or "welcome to nginx" in blob or "default site" in blob: reasons.append("Default page exposed") if any(token in blob for token in ("mail-in-a-box", "opensearch", "nextcloud", "jitsi")): reasons.append("Internet-facing platform/service") if "træfik default cert" in blob or "traefik default cert" in blob: reasons.append("Default reverse proxy certificate") if "self-signed" in cert_flags: reasons.append("Self-signed TLS certificate") if "mismatched" in cert_flags: reasons.append("TLS cert does not match IP/SNI") if "wildcard" in cert_flags and "fix.ru" in cert_subject: reasons.append("Corporate wildcard cert on raw IP") if owner_type == "corp/perimeter": reasons.append("Likely corporate perimeter/origin") if owner_type == "edge/cdn": reasons.append("Looks like CDN/WAF or edge") if ssh_ports: reasons.append("SSH exposed") if any(port != "22" for port in ssh_ports): reasons.append("SSH on nonstandard port") if has_old_ssh(ssh_banners): reasons.append("Older SSH banner") if password_ports: reasons.append("SSH offers password authentication") if any(token in blob for token in ("seoconference.ru", "vikupai.ru", "jecp.ru", "iteh.ru", "cbn.ru", "fix.ru")): reasons.append("Named business asset/domain") unique_reasons = [] seen = set() for reason in reasons: if reason not in seen: seen.add(reason) unique_reasons.append(reason) if not unique_reasons: return ("NO", "") return ("YES", "; ".join(unique_reasons)) def compute_priority( row: dict[str, str], manual_check: str, reasons: str, ssh_info: dict[str, list[str]], ssh_auth: dict[str, list[str] | str], ) -> str: if manual_check == "NO": return "LOW" blob = " ".join( [ row.get("http_server", ""), row.get("http_title", ""), row.get("http_final_url", ""), row.get("cert_subject_cn", ""), row.get("cert_flags", ""), reasons, ] ).lower() if any( token in blob for token in ( "public app/auth surface", "default page exposed", "opensearch", "nextcloud", "jitsi", "mail-in-a-box", "self-signed", "corporate wildcard cert on raw ip", "older ssh banner", "ssh offers password authentication", "iis", ) ): return "HIGH" if ssh_info.get("ports") or row.get("http_status") or row.get("cert_subject_cn"): return "MEDIUM" return "LOW" def autosize(ws) -> None: widths: dict[int, int] = {} for row in ws.iter_rows(): for cell in row: value = "" if cell.value is None else str(cell.value) widths[cell.column] = max(widths.get(cell.column, 0), min(len(value), 80)) for column_idx, width in widths.items(): ws.column_dimensions[get_column_letter(column_idx)].width = max(12, min(width + 2, 60)) def translate_owner(owner_type: str) -> str: return OWNER_LABELS.get(owner_type, owner_type) def translate_reasons(reasons: str) -> str: if not reasons: return "" parts = [part.strip() for part in reasons.split(";") if part.strip()] translated = [REASON_LABELS.get(part, part) for part in parts] return "; ".join(translated) def translate_priority(priority: str) -> str: return PRIORITY_LABELS.get(priority, priority) def translate_manual(manual: str) -> str: return MANUAL_LABELS.get(manual, manual) def build_workbook( rows: list[dict[str, str]], ssh: dict[str, dict[str, list[str]]], ssh_auth: dict[str, dict[str, list[str] | str]], out_path: Path, ) -> None: wb = Workbook() ws = wb.active ws.title = "Активы" headers = [ "IP", "Проверить вручную", "Приоритет", "Причины ручной проверки", "Тип владельца", "Владелец / ASN Holder", "ASN", "Префикс", "PTR", "HTTP статус", "HTTP сервер", "HTTP заголовок / title", "Итоговый URL / redirect", "TLS subject CN", "TLS issuer CN", "TLS признаки", "Технологии", "SSH порты", "SSH баннеры", "SSH методы аутентификации", "SSH пароль предложен", "SSH примечания", ] ws.append(headers) for cell in ws[1]: cell.fill = HEADER_FILL cell.font = HEADER_FONT cell.alignment = WRAP_ALIGN holder_counter: Counter[str] = Counter() owner_counter: Counter[str] = Counter() cert_counter: Counter[str] = Counter() manual_counter: Counter[str] = Counter() for row in rows: ip = row["ip"] ssh_info = ssh.get(ip, {"ports": [], "banners": []}) ssh_auth_info = ssh_auth.get(ip, {"methods": [], "password_ports": [], "notes": []}) manual_check, review_reasons = compute_review(row, ssh_info, ssh_auth_info) priority = compute_priority(row, manual_check, review_reasons, ssh_info, ssh_auth_info) holder_counter[row["holder"]] += 1 owner_counter[row["owner_type"]] += 1 if row["cert_subject_cn"]: cert_counter[row["cert_subject_cn"]] += 1 manual_counter[priority] += 1 out_row = [ ip, translate_manual(manual_check), translate_priority(priority), translate_reasons(review_reasons), translate_owner(row["owner_type"]), row["holder"], row["asn"], row["prefix"], row["ptr"], row["http_status"], row["http_server"], row["http_title"], row["http_final_url"], row["cert_subject_cn"], row["cert_issuer_cn"], row["cert_flags"], row["tech"], ", ".join(sorted(ssh_info["ports"], key=lambda x: int(x))), "\n".join(ssh_info["banners"]), "\n".join(ssh_auth_info.get("methods", [])), "Да" if ssh_auth_info.get("password_ports") else "Нет", "\n".join(ssh_auth_info.get("notes", [])), ] ws.append(out_row) current_row = ws.max_row for cell in ws[current_row]: cell.alignment = WRAP_ALIGN if manual_check == "YES": for cell in ws[current_row]: cell.fill = RED_FILL ws.cell(current_row, 2).font = RED_FONT ws.cell(current_row, 3).font = RED_FONT ws.cell(current_row, 4).font = RED_FONT ws.freeze_panes = "A2" ws.auto_filter.ref = ws.dimensions autosize(ws) summary = wb.create_sheet("Сводка") summary["A1"] = "Итоговая сводка по внешней экспозиции" summary["A1"].font = Font(bold=True, size=14) summary["A3"] = "Всего IP" summary["B3"] = len(rows) summary["A4"] = "Требуют ручной проверки" summary["B4"] = sum(1 for row_idx in range(2, ws.max_row + 1) if ws.cell(row_idx, 2).value == "Да") summary["A5"] = "Приоритет высокий" summary["B5"] = manual_counter["HIGH"] summary["A6"] = "Приоритет средний" summary["B6"] = manual_counter["MEDIUM"] summary["A7"] = "Приоритет низкий" summary["B7"] = manual_counter["LOW"] summary["A9"] = "Легенда" summary["A10"] = "Красные строки" summary["B10"] = "Нужно проверять вручную в первую очередь" summary["A10"].fill = RED_FILL summary["B10"].fill = RED_FILL summary["A12"] = "Количество по типам владельцев" summary["A12"].font = HEADER_FONT row_idx = 13 for owner_type, count in owner_counter.most_common(): summary.cell(row_idx, 1, translate_owner(owner_type)) summary.cell(row_idx, 2, count) row_idx += 1 row_idx += 1 summary.cell(row_idx, 1, "Основные владельцы / провайдеры").font = HEADER_FONT row_idx += 1 for holder, count in holder_counter.most_common(15): if not holder: continue summary.cell(row_idx, 1, holder) summary.cell(row_idx, 2, count) row_idx += 1 row_idx += 1 summary.cell(row_idx, 1, "Частые TLS subject CN").font = HEADER_FONT row_idx += 1 for subject, count in cert_counter.most_common(15): summary.cell(row_idx, 1, subject) summary.cell(row_idx, 2, count) row_idx += 1 autosize(summary) wb.save(out_path) def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--mapping-tsv", required=True) parser.add_argument("--ssh-file", required=True) parser.add_argument("--ssh-auth-file", required=False, default="") parser.add_argument("--output", required=True) args = parser.parse_args() rows = read_network_mapping(Path(args.mapping_tsv)) ssh = read_ssh(Path(args.ssh_file)) ssh_auth = read_ssh_auth(Path(args.ssh_auth_file)) if args.ssh_auth_file else {} build_workbook(rows, ssh, ssh_auth, Path(args.output)) print(args.output) return 0 if __name__ == "__main__": raise SystemExit(main())