Files
all_ping/scripts/build_final_excel.py
2026-07-01 02:52:14 +05:00

464 lines
15 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import re
from collections import Counter, defaultdict
from pathlib import Path
from openpyxl import Workbook
from openpyxl.styles import Alignment, Font, PatternFill
from openpyxl.utils import get_column_letter
RED_FILL = PatternFill(fill_type="solid", fgColor="FDE2E1")
RED_FONT = Font(color="9C0006", bold=True)
HEADER_FILL = PatternFill(fill_type="solid", fgColor="D9EAF7")
HEADER_FONT = Font(bold=True)
WRAP_ALIGN = Alignment(vertical="top", wrap_text=True)
OWNER_LABELS = {
"cloud/vps": "Облако / VPS",
"other": "Прочее",
"provider/isp": "Провайдер / ISP",
"corp/perimeter": "Корпоративный периметр",
"edge/cdn": "Edge / CDN / WAF",
}
REASON_LABELS = {
"Public app/auth surface": "Публичная веб- или auth-поверхность",
"Default page exposed": "Снаружи видна дефолтная страница",
"Internet-facing platform/service": "Публично доступный сервис или платформа",
"Default reverse proxy certificate": "Дефолтный сертификат reverse proxy",
"Self-signed TLS certificate": "Самоподписанный TLS-сертификат",
"TLS cert does not match IP/SNI": "TLS-сертификат не соответствует IP или SNI",
"Corporate wildcard cert on raw IP": "Корпоративный wildcard-сертификат на прямом IP",
"Likely corporate perimeter/origin": "Похоже на корпоративный периметр или origin",
"Looks like CDN/WAF or edge": "Похоже на CDN/WAF или edge-слой",
"SSH exposed": "Снаружи доступен SSH",
"SSH on nonstandard port": "SSH на нестандартном порту",
"Older SSH banner": "Старый баннер SSH",
"SSH offers password authentication": "SSH предлагает парольную аутентификацию",
"Named business asset/domain": "Связан с именованным бизнес-активом или доменом",
}
PRIORITY_LABELS = {
"HIGH": "Высокий",
"MEDIUM": "Средний",
"LOW": "Низкий",
}
MANUAL_LABELS = {
"YES": "Да",
"NO": "Нет",
}
def read_network_mapping(path: Path) -> list[dict[str, str]]:
with path.open(encoding="utf-8") as fh:
return list(csv.DictReader(fh, delimiter="\t"))
def read_ssh(path: Path) -> dict[str, dict[str, list[str]]]:
by_ip: dict[str, dict[str, list[str]]] = defaultdict(lambda: {"ports": [], "banners": []})
if not path.exists():
return by_ip
current_ip = ""
current_port = ""
seen_banner = False
for raw in path.read_text(encoding="utf-8", errors="replace").splitlines():
line = raw.strip()
match = re.match(r"^=== ([0-9.]+):(\d+) ===$", line)
if match:
current_ip = match.group(1)
current_port = match.group(2)
seen_banner = False
if current_port not in by_ip[current_ip]["ports"]:
by_ip[current_ip]["ports"].append(current_port)
continue
if current_ip and "SSH-2.0-" in line and not seen_banner:
banner = line.lstrip("# ").strip()
by_ip[current_ip]["banners"].append(banner)
seen_banner = True
return by_ip
def read_ssh_auth(path: Path) -> dict[str, dict[str, list[str] | str]]:
by_ip: dict[str, dict[str, list[str] | str]] = defaultdict(
lambda: {"methods": [], "password_ports": [], "notes": []}
)
if not path.exists():
return by_ip
with path.open(encoding="utf-8") as fh:
for row in csv.DictReader(fh, delimiter="\t"):
ip = row.get("ip", "")
port = row.get("port", "")
methods = row.get("auth_methods", "")
password_offered = row.get("password_offered", "")
note = row.get("note", "")
if not ip:
continue
if methods:
entry = f"{port}:{methods}"
if entry not in by_ip[ip]["methods"]:
by_ip[ip]["methods"].append(entry)
if password_offered == "yes" and port:
if port not in by_ip[ip]["password_ports"]:
by_ip[ip]["password_ports"].append(port)
if note:
note_entry = f"{port}:{note}"
if note_entry not in by_ip[ip]["notes"]:
by_ip[ip]["notes"].append(note_entry)
return by_ip
def has_old_ssh(banners: list[str]) -> bool:
for banner in banners:
if any(x in banner for x in ("OpenSSH_7.2", "OpenSSH_7.6", "OpenSSH_8.2")):
return True
return False
def compute_review(
row: dict[str, str],
ssh_info: dict[str, list[str]],
ssh_auth: dict[str, list[str] | str],
) -> tuple[str, str]:
reasons: list[str] = []
blob = " ".join(
[
row.get("http_server", ""),
row.get("http_title", ""),
row.get("http_final_url", ""),
row.get("tech", ""),
row.get("cert_subject_cn", ""),
row.get("cert_flags", ""),
row.get("ptr", ""),
]
).lower()
http_status = row.get("http_status", "")
owner_type = row.get("owner_type", "")
cert_flags = row.get("cert_flags", "")
cert_subject = row.get("cert_subject_cn", "")
ssh_ports = ssh_info.get("ports", [])
ssh_banners = ssh_info.get("banners", [])
password_ports = ssh_auth.get("password_ports", [])
if http_status in {"200", "401", "500", "502"} and any(
token in blob
for token in (
"login",
"nextcloud",
"opensearch",
"jitsi",
"mail-in-a-box",
"default page",
"welcome to nginx",
"grafana",
"pwm",
"401 unauthorized",
"forbidden",
)
):
reasons.append("Public app/auth surface")
if "default page" in blob or "welcome to nginx" in blob or "default site" in blob:
reasons.append("Default page exposed")
if any(token in blob for token in ("mail-in-a-box", "opensearch", "nextcloud", "jitsi")):
reasons.append("Internet-facing platform/service")
if "træfik default cert" in blob or "traefik default cert" in blob:
reasons.append("Default reverse proxy certificate")
if "self-signed" in cert_flags:
reasons.append("Self-signed TLS certificate")
if "mismatched" in cert_flags:
reasons.append("TLS cert does not match IP/SNI")
if "wildcard" in cert_flags and "fix.ru" in cert_subject:
reasons.append("Corporate wildcard cert on raw IP")
if owner_type == "corp/perimeter":
reasons.append("Likely corporate perimeter/origin")
if owner_type == "edge/cdn":
reasons.append("Looks like CDN/WAF or edge")
if ssh_ports:
reasons.append("SSH exposed")
if any(port != "22" for port in ssh_ports):
reasons.append("SSH on nonstandard port")
if has_old_ssh(ssh_banners):
reasons.append("Older SSH banner")
if password_ports:
reasons.append("SSH offers password authentication")
if any(token in blob for token in ("seoconference.ru", "vikupai.ru", "jecp.ru", "iteh.ru", "cbn.ru", "fix.ru")):
reasons.append("Named business asset/domain")
unique_reasons = []
seen = set()
for reason in reasons:
if reason not in seen:
seen.add(reason)
unique_reasons.append(reason)
if not unique_reasons:
return ("NO", "")
return ("YES", "; ".join(unique_reasons))
def compute_priority(
row: dict[str, str],
manual_check: str,
reasons: str,
ssh_info: dict[str, list[str]],
ssh_auth: dict[str, list[str] | str],
) -> str:
if manual_check == "NO":
return "LOW"
blob = " ".join(
[
row.get("http_server", ""),
row.get("http_title", ""),
row.get("http_final_url", ""),
row.get("cert_subject_cn", ""),
row.get("cert_flags", ""),
reasons,
]
).lower()
if any(
token in blob
for token in (
"public app/auth surface",
"default page exposed",
"opensearch",
"nextcloud",
"jitsi",
"mail-in-a-box",
"self-signed",
"corporate wildcard cert on raw ip",
"older ssh banner",
"ssh offers password authentication",
"iis",
)
):
return "HIGH"
if ssh_info.get("ports") or row.get("http_status") or row.get("cert_subject_cn"):
return "MEDIUM"
return "LOW"
def autosize(ws) -> None:
widths: dict[int, int] = {}
for row in ws.iter_rows():
for cell in row:
value = "" if cell.value is None else str(cell.value)
widths[cell.column] = max(widths.get(cell.column, 0), min(len(value), 80))
for column_idx, width in widths.items():
ws.column_dimensions[get_column_letter(column_idx)].width = max(12, min(width + 2, 60))
def translate_owner(owner_type: str) -> str:
return OWNER_LABELS.get(owner_type, owner_type)
def translate_reasons(reasons: str) -> str:
if not reasons:
return ""
parts = [part.strip() for part in reasons.split(";") if part.strip()]
translated = [REASON_LABELS.get(part, part) for part in parts]
return "; ".join(translated)
def translate_priority(priority: str) -> str:
return PRIORITY_LABELS.get(priority, priority)
def translate_manual(manual: str) -> str:
return MANUAL_LABELS.get(manual, manual)
def build_workbook(
rows: list[dict[str, str]],
ssh: dict[str, dict[str, list[str]]],
ssh_auth: dict[str, dict[str, list[str] | str]],
out_path: Path,
) -> None:
wb = Workbook()
ws = wb.active
ws.title = "Активы"
headers = [
"IP",
"Проверить вручную",
"Приоритет",
"Причины ручной проверки",
"Тип владельца",
"Владелец / ASN Holder",
"ASN",
"Префикс",
"PTR",
"HTTP статус",
"HTTP сервер",
"HTTP заголовок / title",
"Итоговый URL / redirect",
"TLS subject CN",
"TLS issuer CN",
"TLS признаки",
"Технологии",
"SSH порты",
"SSH баннеры",
"SSH методы аутентификации",
"SSH пароль предложен",
"SSH примечания",
]
ws.append(headers)
for cell in ws[1]:
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = WRAP_ALIGN
holder_counter: Counter[str] = Counter()
owner_counter: Counter[str] = Counter()
cert_counter: Counter[str] = Counter()
manual_counter: Counter[str] = Counter()
for row in rows:
ip = row["ip"]
ssh_info = ssh.get(ip, {"ports": [], "banners": []})
ssh_auth_info = ssh_auth.get(ip, {"methods": [], "password_ports": [], "notes": []})
manual_check, review_reasons = compute_review(row, ssh_info, ssh_auth_info)
priority = compute_priority(row, manual_check, review_reasons, ssh_info, ssh_auth_info)
holder_counter[row["holder"]] += 1
owner_counter[row["owner_type"]] += 1
if row["cert_subject_cn"]:
cert_counter[row["cert_subject_cn"]] += 1
manual_counter[priority] += 1
out_row = [
ip,
translate_manual(manual_check),
translate_priority(priority),
translate_reasons(review_reasons),
translate_owner(row["owner_type"]),
row["holder"],
row["asn"],
row["prefix"],
row["ptr"],
row["http_status"],
row["http_server"],
row["http_title"],
row["http_final_url"],
row["cert_subject_cn"],
row["cert_issuer_cn"],
row["cert_flags"],
row["tech"],
", ".join(sorted(ssh_info["ports"], key=lambda x: int(x))),
"\n".join(ssh_info["banners"]),
"\n".join(ssh_auth_info.get("methods", [])),
"Да" if ssh_auth_info.get("password_ports") else "Нет",
"\n".join(ssh_auth_info.get("notes", [])),
]
ws.append(out_row)
current_row = ws.max_row
for cell in ws[current_row]:
cell.alignment = WRAP_ALIGN
if manual_check == "YES":
for cell in ws[current_row]:
cell.fill = RED_FILL
ws.cell(current_row, 2).font = RED_FONT
ws.cell(current_row, 3).font = RED_FONT
ws.cell(current_row, 4).font = RED_FONT
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
autosize(ws)
summary = wb.create_sheet("Сводка")
summary["A1"] = "Итоговая сводка по внешней экспозиции"
summary["A1"].font = Font(bold=True, size=14)
summary["A3"] = "Всего IP"
summary["B3"] = len(rows)
summary["A4"] = "Требуют ручной проверки"
summary["B4"] = sum(1 for row_idx in range(2, ws.max_row + 1) if ws.cell(row_idx, 2).value == "Да")
summary["A5"] = "Приоритет высокий"
summary["B5"] = manual_counter["HIGH"]
summary["A6"] = "Приоритет средний"
summary["B6"] = manual_counter["MEDIUM"]
summary["A7"] = "Приоритет низкий"
summary["B7"] = manual_counter["LOW"]
summary["A9"] = "Легенда"
summary["A10"] = "Красные строки"
summary["B10"] = "Нужно проверять вручную в первую очередь"
summary["A10"].fill = RED_FILL
summary["B10"].fill = RED_FILL
summary["A12"] = "Количество по типам владельцев"
summary["A12"].font = HEADER_FONT
row_idx = 13
for owner_type, count in owner_counter.most_common():
summary.cell(row_idx, 1, translate_owner(owner_type))
summary.cell(row_idx, 2, count)
row_idx += 1
row_idx += 1
summary.cell(row_idx, 1, "Основные владельцы / провайдеры").font = HEADER_FONT
row_idx += 1
for holder, count in holder_counter.most_common(15):
if not holder:
continue
summary.cell(row_idx, 1, holder)
summary.cell(row_idx, 2, count)
row_idx += 1
row_idx += 1
summary.cell(row_idx, 1, "Частые TLS subject CN").font = HEADER_FONT
row_idx += 1
for subject, count in cert_counter.most_common(15):
summary.cell(row_idx, 1, subject)
summary.cell(row_idx, 2, count)
row_idx += 1
autosize(summary)
wb.save(out_path)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--mapping-tsv", required=True)
parser.add_argument("--ssh-file", required=True)
parser.add_argument("--ssh-auth-file", required=False, default="")
parser.add_argument("--output", required=True)
args = parser.parse_args()
rows = read_network_mapping(Path(args.mapping_tsv))
ssh = read_ssh(Path(args.ssh_file))
ssh_auth = read_ssh_auth(Path(args.ssh_auth_file)) if args.ssh_auth_file else {}
build_workbook(rows, ssh, ssh_auth, Path(args.output))
print(args.output)
return 0
if __name__ == "__main__":
raise SystemExit(main())