Add safe external inventory scripts

This commit is contained in:
emilsarafutdinov
2026-07-01 02:52:14 +05:00
commit 36c5f0bbd1
7 changed files with 1129 additions and 0 deletions
+463
View File
@@ -0,0 +1,463 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import re
from collections import Counter, defaultdict
from pathlib import Path
from openpyxl import Workbook
from openpyxl.styles import Alignment, Font, PatternFill
from openpyxl.utils import get_column_letter
RED_FILL = PatternFill(fill_type="solid", fgColor="FDE2E1")
RED_FONT = Font(color="9C0006", bold=True)
HEADER_FILL = PatternFill(fill_type="solid", fgColor="D9EAF7")
HEADER_FONT = Font(bold=True)
WRAP_ALIGN = Alignment(vertical="top", wrap_text=True)
OWNER_LABELS = {
"cloud/vps": "Облако / VPS",
"other": "Прочее",
"provider/isp": "Провайдер / ISP",
"corp/perimeter": "Корпоративный периметр",
"edge/cdn": "Edge / CDN / WAF",
}
REASON_LABELS = {
"Public app/auth surface": "Публичная веб- или auth-поверхность",
"Default page exposed": "Снаружи видна дефолтная страница",
"Internet-facing platform/service": "Публично доступный сервис или платформа",
"Default reverse proxy certificate": "Дефолтный сертификат reverse proxy",
"Self-signed TLS certificate": "Самоподписанный TLS-сертификат",
"TLS cert does not match IP/SNI": "TLS-сертификат не соответствует IP или SNI",
"Corporate wildcard cert on raw IP": "Корпоративный wildcard-сертификат на прямом IP",
"Likely corporate perimeter/origin": "Похоже на корпоративный периметр или origin",
"Looks like CDN/WAF or edge": "Похоже на CDN/WAF или edge-слой",
"SSH exposed": "Снаружи доступен SSH",
"SSH on nonstandard port": "SSH на нестандартном порту",
"Older SSH banner": "Старый баннер SSH",
"SSH offers password authentication": "SSH предлагает парольную аутентификацию",
"Named business asset/domain": "Связан с именованным бизнес-активом или доменом",
}
PRIORITY_LABELS = {
"HIGH": "Высокий",
"MEDIUM": "Средний",
"LOW": "Низкий",
}
MANUAL_LABELS = {
"YES": "Да",
"NO": "Нет",
}
def read_network_mapping(path: Path) -> list[dict[str, str]]:
with path.open(encoding="utf-8") as fh:
return list(csv.DictReader(fh, delimiter="\t"))
def read_ssh(path: Path) -> dict[str, dict[str, list[str]]]:
by_ip: dict[str, dict[str, list[str]]] = defaultdict(lambda: {"ports": [], "banners": []})
if not path.exists():
return by_ip
current_ip = ""
current_port = ""
seen_banner = False
for raw in path.read_text(encoding="utf-8", errors="replace").splitlines():
line = raw.strip()
match = re.match(r"^=== ([0-9.]+):(\d+) ===$", line)
if match:
current_ip = match.group(1)
current_port = match.group(2)
seen_banner = False
if current_port not in by_ip[current_ip]["ports"]:
by_ip[current_ip]["ports"].append(current_port)
continue
if current_ip and "SSH-2.0-" in line and not seen_banner:
banner = line.lstrip("# ").strip()
by_ip[current_ip]["banners"].append(banner)
seen_banner = True
return by_ip
def read_ssh_auth(path: Path) -> dict[str, dict[str, list[str] | str]]:
by_ip: dict[str, dict[str, list[str] | str]] = defaultdict(
lambda: {"methods": [], "password_ports": [], "notes": []}
)
if not path.exists():
return by_ip
with path.open(encoding="utf-8") as fh:
for row in csv.DictReader(fh, delimiter="\t"):
ip = row.get("ip", "")
port = row.get("port", "")
methods = row.get("auth_methods", "")
password_offered = row.get("password_offered", "")
note = row.get("note", "")
if not ip:
continue
if methods:
entry = f"{port}:{methods}"
if entry not in by_ip[ip]["methods"]:
by_ip[ip]["methods"].append(entry)
if password_offered == "yes" and port:
if port not in by_ip[ip]["password_ports"]:
by_ip[ip]["password_ports"].append(port)
if note:
note_entry = f"{port}:{note}"
if note_entry not in by_ip[ip]["notes"]:
by_ip[ip]["notes"].append(note_entry)
return by_ip
def has_old_ssh(banners: list[str]) -> bool:
for banner in banners:
if any(x in banner for x in ("OpenSSH_7.2", "OpenSSH_7.6", "OpenSSH_8.2")):
return True
return False
def compute_review(
row: dict[str, str],
ssh_info: dict[str, list[str]],
ssh_auth: dict[str, list[str] | str],
) -> tuple[str, str]:
reasons: list[str] = []
blob = " ".join(
[
row.get("http_server", ""),
row.get("http_title", ""),
row.get("http_final_url", ""),
row.get("tech", ""),
row.get("cert_subject_cn", ""),
row.get("cert_flags", ""),
row.get("ptr", ""),
]
).lower()
http_status = row.get("http_status", "")
owner_type = row.get("owner_type", "")
cert_flags = row.get("cert_flags", "")
cert_subject = row.get("cert_subject_cn", "")
ssh_ports = ssh_info.get("ports", [])
ssh_banners = ssh_info.get("banners", [])
password_ports = ssh_auth.get("password_ports", [])
if http_status in {"200", "401", "500", "502"} and any(
token in blob
for token in (
"login",
"nextcloud",
"opensearch",
"jitsi",
"mail-in-a-box",
"default page",
"welcome to nginx",
"grafana",
"pwm",
"401 unauthorized",
"forbidden",
)
):
reasons.append("Public app/auth surface")
if "default page" in blob or "welcome to nginx" in blob or "default site" in blob:
reasons.append("Default page exposed")
if any(token in blob for token in ("mail-in-a-box", "opensearch", "nextcloud", "jitsi")):
reasons.append("Internet-facing platform/service")
if "træfik default cert" in blob or "traefik default cert" in blob:
reasons.append("Default reverse proxy certificate")
if "self-signed" in cert_flags:
reasons.append("Self-signed TLS certificate")
if "mismatched" in cert_flags:
reasons.append("TLS cert does not match IP/SNI")
if "wildcard" in cert_flags and "fix.ru" in cert_subject:
reasons.append("Corporate wildcard cert on raw IP")
if owner_type == "corp/perimeter":
reasons.append("Likely corporate perimeter/origin")
if owner_type == "edge/cdn":
reasons.append("Looks like CDN/WAF or edge")
if ssh_ports:
reasons.append("SSH exposed")
if any(port != "22" for port in ssh_ports):
reasons.append("SSH on nonstandard port")
if has_old_ssh(ssh_banners):
reasons.append("Older SSH banner")
if password_ports:
reasons.append("SSH offers password authentication")
if any(token in blob for token in ("seoconference.ru", "vikupai.ru", "jecp.ru", "iteh.ru", "cbn.ru", "fix.ru")):
reasons.append("Named business asset/domain")
unique_reasons = []
seen = set()
for reason in reasons:
if reason not in seen:
seen.add(reason)
unique_reasons.append(reason)
if not unique_reasons:
return ("NO", "")
return ("YES", "; ".join(unique_reasons))
def compute_priority(
row: dict[str, str],
manual_check: str,
reasons: str,
ssh_info: dict[str, list[str]],
ssh_auth: dict[str, list[str] | str],
) -> str:
if manual_check == "NO":
return "LOW"
blob = " ".join(
[
row.get("http_server", ""),
row.get("http_title", ""),
row.get("http_final_url", ""),
row.get("cert_subject_cn", ""),
row.get("cert_flags", ""),
reasons,
]
).lower()
if any(
token in blob
for token in (
"public app/auth surface",
"default page exposed",
"opensearch",
"nextcloud",
"jitsi",
"mail-in-a-box",
"self-signed",
"corporate wildcard cert on raw ip",
"older ssh banner",
"ssh offers password authentication",
"iis",
)
):
return "HIGH"
if ssh_info.get("ports") or row.get("http_status") or row.get("cert_subject_cn"):
return "MEDIUM"
return "LOW"
def autosize(ws) -> None:
widths: dict[int, int] = {}
for row in ws.iter_rows():
for cell in row:
value = "" if cell.value is None else str(cell.value)
widths[cell.column] = max(widths.get(cell.column, 0), min(len(value), 80))
for column_idx, width in widths.items():
ws.column_dimensions[get_column_letter(column_idx)].width = max(12, min(width + 2, 60))
def translate_owner(owner_type: str) -> str:
return OWNER_LABELS.get(owner_type, owner_type)
def translate_reasons(reasons: str) -> str:
if not reasons:
return ""
parts = [part.strip() for part in reasons.split(";") if part.strip()]
translated = [REASON_LABELS.get(part, part) for part in parts]
return "; ".join(translated)
def translate_priority(priority: str) -> str:
return PRIORITY_LABELS.get(priority, priority)
def translate_manual(manual: str) -> str:
return MANUAL_LABELS.get(manual, manual)
def build_workbook(
rows: list[dict[str, str]],
ssh: dict[str, dict[str, list[str]]],
ssh_auth: dict[str, dict[str, list[str] | str]],
out_path: Path,
) -> None:
wb = Workbook()
ws = wb.active
ws.title = "Активы"
headers = [
"IP",
"Проверить вручную",
"Приоритет",
"Причины ручной проверки",
"Тип владельца",
"Владелец / ASN Holder",
"ASN",
"Префикс",
"PTR",
"HTTP статус",
"HTTP сервер",
"HTTP заголовок / title",
"Итоговый URL / redirect",
"TLS subject CN",
"TLS issuer CN",
"TLS признаки",
"Технологии",
"SSH порты",
"SSH баннеры",
"SSH методы аутентификации",
"SSH пароль предложен",
"SSH примечания",
]
ws.append(headers)
for cell in ws[1]:
cell.fill = HEADER_FILL
cell.font = HEADER_FONT
cell.alignment = WRAP_ALIGN
holder_counter: Counter[str] = Counter()
owner_counter: Counter[str] = Counter()
cert_counter: Counter[str] = Counter()
manual_counter: Counter[str] = Counter()
for row in rows:
ip = row["ip"]
ssh_info = ssh.get(ip, {"ports": [], "banners": []})
ssh_auth_info = ssh_auth.get(ip, {"methods": [], "password_ports": [], "notes": []})
manual_check, review_reasons = compute_review(row, ssh_info, ssh_auth_info)
priority = compute_priority(row, manual_check, review_reasons, ssh_info, ssh_auth_info)
holder_counter[row["holder"]] += 1
owner_counter[row["owner_type"]] += 1
if row["cert_subject_cn"]:
cert_counter[row["cert_subject_cn"]] += 1
manual_counter[priority] += 1
out_row = [
ip,
translate_manual(manual_check),
translate_priority(priority),
translate_reasons(review_reasons),
translate_owner(row["owner_type"]),
row["holder"],
row["asn"],
row["prefix"],
row["ptr"],
row["http_status"],
row["http_server"],
row["http_title"],
row["http_final_url"],
row["cert_subject_cn"],
row["cert_issuer_cn"],
row["cert_flags"],
row["tech"],
", ".join(sorted(ssh_info["ports"], key=lambda x: int(x))),
"\n".join(ssh_info["banners"]),
"\n".join(ssh_auth_info.get("methods", [])),
"Да" if ssh_auth_info.get("password_ports") else "Нет",
"\n".join(ssh_auth_info.get("notes", [])),
]
ws.append(out_row)
current_row = ws.max_row
for cell in ws[current_row]:
cell.alignment = WRAP_ALIGN
if manual_check == "YES":
for cell in ws[current_row]:
cell.fill = RED_FILL
ws.cell(current_row, 2).font = RED_FONT
ws.cell(current_row, 3).font = RED_FONT
ws.cell(current_row, 4).font = RED_FONT
ws.freeze_panes = "A2"
ws.auto_filter.ref = ws.dimensions
autosize(ws)
summary = wb.create_sheet("Сводка")
summary["A1"] = "Итоговая сводка по внешней экспозиции"
summary["A1"].font = Font(bold=True, size=14)
summary["A3"] = "Всего IP"
summary["B3"] = len(rows)
summary["A4"] = "Требуют ручной проверки"
summary["B4"] = sum(1 for row_idx in range(2, ws.max_row + 1) if ws.cell(row_idx, 2).value == "Да")
summary["A5"] = "Приоритет высокий"
summary["B5"] = manual_counter["HIGH"]
summary["A6"] = "Приоритет средний"
summary["B6"] = manual_counter["MEDIUM"]
summary["A7"] = "Приоритет низкий"
summary["B7"] = manual_counter["LOW"]
summary["A9"] = "Легенда"
summary["A10"] = "Красные строки"
summary["B10"] = "Нужно проверять вручную в первую очередь"
summary["A10"].fill = RED_FILL
summary["B10"].fill = RED_FILL
summary["A12"] = "Количество по типам владельцев"
summary["A12"].font = HEADER_FONT
row_idx = 13
for owner_type, count in owner_counter.most_common():
summary.cell(row_idx, 1, translate_owner(owner_type))
summary.cell(row_idx, 2, count)
row_idx += 1
row_idx += 1
summary.cell(row_idx, 1, "Основные владельцы / провайдеры").font = HEADER_FONT
row_idx += 1
for holder, count in holder_counter.most_common(15):
if not holder:
continue
summary.cell(row_idx, 1, holder)
summary.cell(row_idx, 2, count)
row_idx += 1
row_idx += 1
summary.cell(row_idx, 1, "Частые TLS subject CN").font = HEADER_FONT
row_idx += 1
for subject, count in cert_counter.most_common(15):
summary.cell(row_idx, 1, subject)
summary.cell(row_idx, 2, count)
row_idx += 1
autosize(summary)
wb.save(out_path)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--mapping-tsv", required=True)
parser.add_argument("--ssh-file", required=True)
parser.add_argument("--ssh-auth-file", required=False, default="")
parser.add_argument("--output", required=True)
args = parser.parse_args()
rows = read_network_mapping(Path(args.mapping_tsv))
ssh = read_ssh(Path(args.ssh_file))
ssh_auth = read_ssh_auth(Path(args.ssh_auth_file)) if args.ssh_auth_file else {}
build_workbook(rows, ssh, ssh_auth, Path(args.output))
print(args.output)
return 0
if __name__ == "__main__":
raise SystemExit(main())
+303
View File
@@ -0,0 +1,303 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import json
import sys
import time
from collections import Counter, defaultdict
from pathlib import Path
from typing import Any
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import urlopen
def load_ips(path: Path) -> list[str]:
ips: list[str] = []
for raw in path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line or line.startswith("#"):
continue
ips.append(line)
return ips
def load_ptrs(path: Path) -> dict[str, str]:
ptrs: dict[str, str] = {}
if not path.exists():
return ptrs
for idx, raw in enumerate(path.read_text(encoding="utf-8").splitlines()):
if idx == 0 and raw.startswith("ip\tptr"):
continue
line = raw.rstrip("\n")
if not line:
continue
ip, _, ptr = line.partition("\t")
ptrs[ip.strip()] = ptr.strip()
return ptrs
def load_httpx(path: Path) -> dict[str, list[dict[str, Any]]]:
by_ip: dict[str, list[dict[str, Any]]] = defaultdict(list)
if not path.exists():
return by_ip
for raw in path.read_text(encoding="utf-8").splitlines():
line = raw.strip()
if not line:
continue
try:
row = json.loads(line)
except json.JSONDecodeError:
continue
ip = row.get("input")
if not ip:
continue
by_ip[ip].append(row)
return by_ip
def fetch_json(url: str, timeout: float = 20.0) -> dict[str, Any]:
with urlopen(url, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
def network_info(ip: str) -> tuple[str, str]:
url = "https://stat.ripe.net/data/network-info/data.json?" + urlencode(
{"resource": ip}
)
data = fetch_json(url).get("data", {})
asns = data.get("asns") or []
prefix = data.get("prefix") or ""
return (asns[0] if asns else "", prefix)
def as_holder(asn: str) -> str:
if not asn:
return ""
url = "https://stat.ripe.net/data/as-overview/data.json?" + urlencode(
{"resource": asn}
)
data = fetch_json(url).get("data", {})
return data.get("holder") or ""
def classify_owner(holder: str, ptr: str, techs: list[str], cert_cn: str) -> str:
blob = " ".join([holder, ptr, " ".join(techs), cert_cn]).lower()
if any(x in blob for x in ("cloudflare", "akamai", "ddos-guard", "cdn", "waf")):
return "edge/cdn"
if any(
x in blob
for x in (
"digitalocean",
"hetzner",
"amazon",
"google",
"microsoft",
"vultr",
"linode",
"selectel",
"vk cloud",
"timeweb",
"firstbyte",
"vdsina",
"your-server.de",
)
):
return "cloud/vps"
if any(x in blob for x in ("fix.ru", "blackhole.fix.ru", "vpn03.fix.ru", "*.fix.ru")):
return "corp/perimeter"
if any(x in blob for x in ("telecom", "isp", "obit", "network", "hosted-by")):
return "provider/isp"
return "other"
def summarize_http(rows: list[dict[str, Any]]) -> tuple[str, str, str, str, list[str]]:
if not rows:
return ("", "", "", "", [])
rows = sorted(
rows,
key=lambda r: (
int(str(r.get("status_code", 0)) == "200"),
int(str(r.get("port", "0")).isdigit() and r.get("port") in ("443", "80")),
len(str(r.get("title") or "")),
),
reverse=True,
)
top = rows[0]
server = str(top.get("webserver") or "")
status = str(top.get("status_code") or "")
title = str(top.get("title") or "")
final_url = str(top.get("final_url") or top.get("location") or "")
techs = sorted({t for row in rows for t in row.get("tech", []) if isinstance(t, str)})
return (status, server, title, final_url, techs)
def summarize_cert(rows: list[dict[str, Any]]) -> tuple[str, str, str]:
for row in rows:
tls = row.get("tls")
if not isinstance(tls, dict):
continue
subject = str(tls.get("subject_cn") or "")
issuer = str(tls.get("issuer_cn") or "")
flags: list[str] = []
if tls.get("mismatched"):
flags.append("mismatched")
if tls.get("wildcard_certificate"):
flags.append("wildcard")
if tls.get("self_signed"):
flags.append("self-signed")
return (subject, issuer, ",".join(flags))
return ("", "", "")
def write_outputs(
out_dir: Path,
rows: list[dict[str, str]],
owner_counts: Counter[str],
cert_counts: Counter[str],
) -> None:
tsv_path = out_dir / "network_mapping.tsv"
md_path = out_dir / "network_mapping.md"
with tsv_path.open("w", encoding="utf-8", newline="") as fh:
writer = csv.DictWriter(
fh,
delimiter="\t",
fieldnames=[
"ip",
"ptr",
"asn",
"prefix",
"holder",
"owner_type",
"http_status",
"http_server",
"http_title",
"http_final_url",
"cert_subject_cn",
"cert_issuer_cn",
"cert_flags",
"tech",
],
)
writer.writeheader()
writer.writerows(rows)
with md_path.open("w", encoding="utf-8") as fh:
fh.write("# Network Mapping Summary\n\n")
fh.write(f"- Total IPs: {len(rows)}\n")
fh.write(f"- IPs with HTTP/HTTPS responses: {sum(1 for row in rows if row['http_status'])}\n")
fh.write(f"- IPs with TLS subject CN captured: {sum(1 for row in rows if row['cert_subject_cn'])}\n\n")
fh.write("## Owner Type Counts\n\n")
for key, value in owner_counts.most_common():
fh.write(f"- {key}: {value}\n")
fh.write("\n## Frequent Certificate Subjects\n\n")
for key, value in cert_counts.most_common(10):
if not key:
continue
fh.write(f"- {key}: {value}\n")
fh.write("\n## Priority Review Candidates\n\n")
fh.write("| IP | Owner Type | Holder | HTTP | TLS Subject | PTR |\n")
fh.write("| --- | --- | --- | --- | --- | --- |\n")
for row in rows:
if row["http_status"] or row["cert_subject_cn"] or row["ptr"]:
http = " ".join(
x
for x in [row["http_status"], row["http_server"], row["http_title"]]
if x
).strip()
fh.write(
f"| {row['ip']} | {row['owner_type']} | {row['holder']} | "
f"{http} | {row['cert_subject_cn']} | {row['ptr']} |\n"
)
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--ip-file", required=True)
parser.add_argument("--ptr-file", required=True)
parser.add_argument("--httpx-file", required=True)
parser.add_argument("--out-dir", required=True)
args = parser.parse_args()
ip_file = Path(args.ip_file)
ptr_file = Path(args.ptr_file)
httpx_file = Path(args.httpx_file)
out_dir = Path(args.out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
ips = load_ips(ip_file)
ptrs = load_ptrs(ptr_file)
httpx = load_httpx(httpx_file)
holder_cache: dict[str, str] = {}
rows: list[dict[str, str]] = []
owner_counts: Counter[str] = Counter()
cert_counts: Counter[str] = Counter()
for idx, ip in enumerate(ips, start=1):
try:
asn, prefix = network_info(ip)
except (HTTPError, URLError, TimeoutError) as exc:
print(f"warning: network-info failed for {ip}: {exc}", file=sys.stderr)
asn, prefix = "", ""
holder = holder_cache.get(asn, "")
if asn and not holder:
try:
holder = as_holder(asn)
except (HTTPError, URLError, TimeoutError) as exc:
print(f"warning: as-overview failed for AS{asn}: {exc}", file=sys.stderr)
holder = ""
holder_cache[asn] = holder
http_status, http_server, http_title, http_final_url, techs = summarize_http(
httpx.get(ip, [])
)
cert_subject, cert_issuer, cert_flags = summarize_cert(httpx.get(ip, []))
owner_type = classify_owner(holder, ptrs.get(ip, ""), techs, cert_subject)
owner_counts[owner_type] += 1
if cert_subject:
cert_counts[cert_subject] += 1
rows.append(
{
"ip": ip,
"ptr": ptrs.get(ip, ""),
"asn": asn,
"prefix": prefix,
"holder": holder,
"owner_type": owner_type,
"http_status": http_status,
"http_server": http_server,
"http_title": http_title,
"http_final_url": http_final_url,
"cert_subject_cn": cert_subject,
"cert_issuer_cn": cert_issuer,
"cert_flags": cert_flags,
"tech": ",".join(techs),
}
)
if idx % 20 == 0 or idx == len(ips):
print(f"processed {idx}/{len(ips)}", file=sys.stderr)
time.sleep(0.05)
write_outputs(out_dir, rows, owner_counts, cert_counts)
print(out_dir / "network_mapping.tsv")
print(out_dir / "network_mapping.md")
return 0
if __name__ == "__main__":
raise SystemExit(main())
+151
View File
@@ -0,0 +1,151 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import re
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
HEADER_RE = re.compile(r"^=== ([0-9.]+):(\d+) ===$")
METHODS_RE = re.compile(r"Permission denied \(([^)]+)\)")
def parse_targets(path: Path) -> list[tuple[str, str]]:
targets: list[tuple[str, str]] = []
seen: set[tuple[str, str]] = set()
for raw in path.read_text(encoding="utf-8", errors="replace").splitlines():
line = raw.strip()
match = HEADER_RE.match(line)
if not match:
continue
target = (match.group(1), match.group(2))
if target not in seen:
seen.add(target)
targets.append(target)
return targets
def check_target(ip: str, port: str, username: str, timeout: int) -> dict[str, str]:
cmd = [
"ssh",
"-F",
"/dev/null",
"-o",
"StrictHostKeyChecking=no",
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"GlobalKnownHostsFile=/dev/null",
"-o",
"LogLevel=ERROR",
"-o",
"PreferredAuthentications=none",
"-o",
"PubkeyAuthentication=no",
"-o",
"PasswordAuthentication=no",
"-o",
"KbdInteractiveAuthentication=no",
"-o",
"GSSAPIAuthentication=no",
"-o",
"BatchMode=yes",
"-o",
f"ConnectTimeout={timeout}",
"-p",
port,
f"{username}@{ip}",
"true",
]
result = subprocess.run(cmd, capture_output=True, text=True)
message = (result.stderr or result.stdout or "").strip()
methods = ""
password_offered = "unknown"
interactive_offered = "unknown"
note = ""
match = METHODS_RE.search(message)
if match:
methods = match.group(1)
offered = {item.strip() for item in methods.split(",") if item.strip()}
password_offered = "yes" if "password" in offered else "no"
interactive_offered = (
"yes"
if any(item in offered for item in ("keyboard-interactive", "kbdint"))
else "no"
)
note = "auth methods advertised"
elif "Connection timed out" in message or "Operation timed out" in message:
note = "timeout"
elif "Connection refused" in message:
note = "refused"
elif "Permission denied" in message:
note = "permission denied without explicit methods"
elif message:
note = message
else:
note = "no response"
return {
"ip": ip,
"port": port,
"auth_methods": methods,
"password_offered": password_offered,
"kbdinteractive_offered": interactive_offered,
"note": note,
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--ssh-keyscan-file", required=True)
parser.add_argument("--output", required=True)
parser.add_argument("--username", default="auditcheck")
parser.add_argument("--timeout", type=int, default=5)
parser.add_argument("--workers", type=int, default=16)
args = parser.parse_args()
targets = parse_targets(Path(args.ssh_keyscan_file))
rows: list[dict[str, str]] = []
with ThreadPoolExecutor(max_workers=args.workers) as pool:
futures = [
pool.submit(check_target, ip, port, args.username, args.timeout)
for ip, port in targets
]
for idx, future in enumerate(as_completed(futures), start=1):
rows.append(future.result())
if idx % 10 == 0 or idx == len(futures):
print(f"processed {idx}/{len(futures)}", flush=True)
rows.sort(key=lambda row: (tuple(int(x) for x in row["ip"].split(".")), int(row["port"])))
out_path = Path(args.output)
with out_path.open("w", encoding="utf-8", newline="") as fh:
writer = csv.DictWriter(
fh,
delimiter="\t",
fieldnames=[
"ip",
"port",
"auth_methods",
"password_offered",
"kbdinteractive_offered",
"note",
],
)
writer.writeheader()
writer.writerows(rows)
print(out_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())
+151
View File
@@ -0,0 +1,151 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
INPUT_FILE="${1:-$ROOT_DIR/public_ipv4_from_master.txt}"
STAMP="$(date +%Y%m%d-%H%M%S)"
OUTPUT_DIR="${2:-$ROOT_DIR/reports/inventory-$STAMP}"
TARGETS_FILE="$OUTPUT_DIR/targets.txt"
RAW_BASENAME="$OUTPUT_DIR/nmap_inventory"
SUMMARY_CSV="$OUTPUT_DIR/summary.csv"
SUMMARY_MD="$OUTPUT_DIR/summary.md"
mkdir -p "$OUTPUT_DIR"
if ! command -v nmap >/dev/null 2>&1; then
echo "nmap is not installed. Install it first, then rerun this script." >&2
echo "Expected input: $INPUT_FILE" >&2
echo "Planned output directory: $OUTPUT_DIR" >&2
exit 1
fi
if [[ ! -f "$INPUT_FILE" ]]; then
echo "Input file not found: $INPUT_FILE" >&2
exit 1
fi
python3 - "$INPUT_FILE" "$TARGETS_FILE" <<'PY'
import ipaddress
import sys
src, dst = sys.argv[1], sys.argv[2]
valid = []
seen = set()
with open(src, "r", encoding="utf-8") as fh:
for raw in fh:
line = raw.strip()
if not line or line.startswith("#"):
continue
try:
ip = ipaddress.IPv4Address(line)
except ipaddress.AddressValueError:
print(f"Skipping invalid IPv4: {line}", file=sys.stderr)
continue
text = str(ip)
if text not in seen:
seen.add(text)
valid.append(text)
with open(dst, "w", encoding="utf-8") as fh:
for ip in valid:
fh.write(f"{ip}\n")
print(len(valid))
PY
TARGET_COUNT="$(wc -l < "$TARGETS_FILE" | tr -d ' ')"
if [[ "$TARGET_COUNT" -eq 0 ]]; then
echo "No valid IPv4 targets found in $INPUT_FILE" >&2
exit 1
fi
echo "Validated $TARGET_COUNT targets"
echo "Running a conservative external inventory scan"
nmap \
-Pn \
-n \
-T3 \
--top-ports 20 \
-sV \
--version-light \
--open \
--max-retries 2 \
--host-timeout 2m \
-iL "$TARGETS_FILE" \
-oA "$RAW_BASENAME"
python3 - "$RAW_BASENAME.gnmap" "$SUMMARY_CSV" "$SUMMARY_MD" "$TARGET_COUNT" <<'PY'
import csv
import sys
from collections import defaultdict
gnmap_path, csv_path, md_path, total_targets = sys.argv[1:5]
rows = []
ports_by_ip = defaultdict(list)
seen_hosts = set()
with open(gnmap_path, "r", encoding="utf-8", errors="replace") as fh:
for raw in fh:
line = raw.strip()
if not line.startswith("Host: "):
continue
if "Ports: " not in line:
continue
host = line.split()[1]
seen_hosts.add(host)
ports_blob = line.split("Ports: ", 1)[1]
for item in ports_blob.split(", "):
parts = item.split("/")
if len(parts) < 7:
continue
port, state, proto, _, service, product, extra = parts[:7]
if state != "open":
continue
product_info = " ".join(x for x in (product, extra) if x).strip()
rows.append(
{
"ip": host,
"port": port,
"protocol": proto,
"state": state,
"service": service or "unknown",
"product": product_info or "-",
}
)
ports_by_ip[host].append(f"{port}/{proto} {service or 'unknown'}".strip())
with open(csv_path, "w", newline="", encoding="utf-8") as fh:
writer = csv.DictWriter(
fh,
fieldnames=["ip", "port", "protocol", "state", "service", "product"],
)
writer.writeheader()
writer.writerows(rows)
sorted_hosts = sorted(ports_by_ip.items(), key=lambda item: (-len(item[1]), item[0]))
with open(md_path, "w", encoding="utf-8") as fh:
fh.write("# External Inventory Summary\n\n")
fh.write(f"- Total validated targets: {total_targets}\n")
fh.write(f"- Hosts with at least one open top port: {len(sorted_hosts)}\n")
fh.write(f"- CSV details: `{csv_path}`\n")
fh.write(f"- Raw Nmap files: `{gnmap_path[:-6]}` (`.nmap`, `.gnmap`, `.xml`)\n\n")
fh.write("## Prioritized review queue\n\n")
fh.write("| IP | Open ports found | Services |\n")
fh.write("| --- | ---: | --- |\n")
for ip, entries in sorted_hosts:
fh.write(f"| {ip} | {len(entries)} | {', '.join(entries)} |\n")
print(f"Wrote {len(rows)} rows to {csv_path}")
print(f"Wrote Markdown summary to {md_path}")
PY
echo
echo "Inventory complete"
echo "Output directory: $OUTPUT_DIR"
echo "Summary CSV: $SUMMARY_CSV"
echo "Summary report: $SUMMARY_MD"