#!/usr/bin/env python3 from __future__ import annotations import argparse import csv import re import subprocess from concurrent.futures import ThreadPoolExecutor, as_completed from pathlib import Path HEADER_RE = re.compile(r"^=== ([0-9.]+):(\d+) ===$") METHODS_RE = re.compile(r"Permission denied \(([^)]+)\)") def parse_targets(path: Path) -> list[tuple[str, str]]: targets: list[tuple[str, str]] = [] seen: set[tuple[str, str]] = set() for raw in path.read_text(encoding="utf-8", errors="replace").splitlines(): line = raw.strip() match = HEADER_RE.match(line) if not match: continue target = (match.group(1), match.group(2)) if target not in seen: seen.add(target) targets.append(target) return targets def check_target(ip: str, port: str, username: str, timeout: int) -> dict[str, str]: cmd = [ "ssh", "-F", "/dev/null", "-o", "StrictHostKeyChecking=no", "-o", "UserKnownHostsFile=/dev/null", "-o", "GlobalKnownHostsFile=/dev/null", "-o", "LogLevel=ERROR", "-o", "PreferredAuthentications=none", "-o", "PubkeyAuthentication=no", "-o", "PasswordAuthentication=no", "-o", "KbdInteractiveAuthentication=no", "-o", "GSSAPIAuthentication=no", "-o", "BatchMode=yes", "-o", f"ConnectTimeout={timeout}", "-p", port, f"{username}@{ip}", "true", ] result = subprocess.run(cmd, capture_output=True, text=True) message = (result.stderr or result.stdout or "").strip() methods = "" password_offered = "unknown" interactive_offered = "unknown" note = "" match = METHODS_RE.search(message) if match: methods = match.group(1) offered = {item.strip() for item in methods.split(",") if item.strip()} password_offered = "yes" if "password" in offered else "no" interactive_offered = ( "yes" if any(item in offered for item in ("keyboard-interactive", "kbdint")) else "no" ) note = "auth methods advertised" elif "Connection timed out" in message or "Operation timed out" in message: note = "timeout" elif "Connection refused" in message: note = "refused" elif "Permission denied" in message: note = "permission denied without explicit methods" elif message: note = message else: note = "no response" return { "ip": ip, "port": port, "auth_methods": methods, "password_offered": password_offered, "kbdinteractive_offered": interactive_offered, "note": note, } def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--ssh-keyscan-file", required=True) parser.add_argument("--output", required=True) parser.add_argument("--username", default="auditcheck") parser.add_argument("--timeout", type=int, default=5) parser.add_argument("--workers", type=int, default=16) args = parser.parse_args() targets = parse_targets(Path(args.ssh_keyscan_file)) rows: list[dict[str, str]] = [] with ThreadPoolExecutor(max_workers=args.workers) as pool: futures = [ pool.submit(check_target, ip, port, args.username, args.timeout) for ip, port in targets ] for idx, future in enumerate(as_completed(futures), start=1): rows.append(future.result()) if idx % 10 == 0 or idx == len(futures): print(f"processed {idx}/{len(futures)}", flush=True) rows.sort(key=lambda row: (tuple(int(x) for x in row["ip"].split(".")), int(row["port"]))) out_path = Path(args.output) with out_path.open("w", encoding="utf-8", newline="") as fh: writer = csv.DictWriter( fh, delimiter="\t", fieldnames=[ "ip", "port", "auth_methods", "password_offered", "kbdinteractive_offered", "note", ], ) writer.writeheader() writer.writerows(rows) print(out_path) return 0 if __name__ == "__main__": raise SystemExit(main())