Files
all_ping/scripts/check_ssh_auth_methods.py
2026-07-01 02:52:14 +05:00

152 lines
4.3 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import csv
import re
import subprocess
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
HEADER_RE = re.compile(r"^=== ([0-9.]+):(\d+) ===$")
METHODS_RE = re.compile(r"Permission denied \(([^)]+)\)")
def parse_targets(path: Path) -> list[tuple[str, str]]:
targets: list[tuple[str, str]] = []
seen: set[tuple[str, str]] = set()
for raw in path.read_text(encoding="utf-8", errors="replace").splitlines():
line = raw.strip()
match = HEADER_RE.match(line)
if not match:
continue
target = (match.group(1), match.group(2))
if target not in seen:
seen.add(target)
targets.append(target)
return targets
def check_target(ip: str, port: str, username: str, timeout: int) -> dict[str, str]:
cmd = [
"ssh",
"-F",
"/dev/null",
"-o",
"StrictHostKeyChecking=no",
"-o",
"UserKnownHostsFile=/dev/null",
"-o",
"GlobalKnownHostsFile=/dev/null",
"-o",
"LogLevel=ERROR",
"-o",
"PreferredAuthentications=none",
"-o",
"PubkeyAuthentication=no",
"-o",
"PasswordAuthentication=no",
"-o",
"KbdInteractiveAuthentication=no",
"-o",
"GSSAPIAuthentication=no",
"-o",
"BatchMode=yes",
"-o",
f"ConnectTimeout={timeout}",
"-p",
port,
f"{username}@{ip}",
"true",
]
result = subprocess.run(cmd, capture_output=True, text=True)
message = (result.stderr or result.stdout or "").strip()
methods = ""
password_offered = "unknown"
interactive_offered = "unknown"
note = ""
match = METHODS_RE.search(message)
if match:
methods = match.group(1)
offered = {item.strip() for item in methods.split(",") if item.strip()}
password_offered = "yes" if "password" in offered else "no"
interactive_offered = (
"yes"
if any(item in offered for item in ("keyboard-interactive", "kbdint"))
else "no"
)
note = "auth methods advertised"
elif "Connection timed out" in message or "Operation timed out" in message:
note = "timeout"
elif "Connection refused" in message:
note = "refused"
elif "Permission denied" in message:
note = "permission denied without explicit methods"
elif message:
note = message
else:
note = "no response"
return {
"ip": ip,
"port": port,
"auth_methods": methods,
"password_offered": password_offered,
"kbdinteractive_offered": interactive_offered,
"note": note,
}
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--ssh-keyscan-file", required=True)
parser.add_argument("--output", required=True)
parser.add_argument("--username", default="auditcheck")
parser.add_argument("--timeout", type=int, default=5)
parser.add_argument("--workers", type=int, default=16)
args = parser.parse_args()
targets = parse_targets(Path(args.ssh_keyscan_file))
rows: list[dict[str, str]] = []
with ThreadPoolExecutor(max_workers=args.workers) as pool:
futures = [
pool.submit(check_target, ip, port, args.username, args.timeout)
for ip, port in targets
]
for idx, future in enumerate(as_completed(futures), start=1):
rows.append(future.result())
if idx % 10 == 0 or idx == len(futures):
print(f"processed {idx}/{len(futures)}", flush=True)
rows.sort(key=lambda row: (tuple(int(x) for x in row["ip"].split(".")), int(row["port"])))
out_path = Path(args.output)
with out_path.open("w", encoding="utf-8", newline="") as fh:
writer = csv.DictWriter(
fh,
delimiter="\t",
fieldnames=[
"ip",
"port",
"auth_methods",
"password_offered",
"kbdinteractive_offered",
"note",
],
)
writer.writeheader()
writer.writerows(rows)
print(out_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())