from __future__ import annotations import re import socket import ssl import subprocess import xml.etree.ElementTree as ET from dataclasses import dataclass, field from typing import Any from .config import ( DOCKER_HOST_IP, DOCKER_SOCKET, ENABLE_DOCKER_INSIGHTS, ENABLE_OS_DETECTION, PORT_PROBE_TIMEOUT_SECONDS, SCAN_TOP_PORTS, ) try: import docker except Exception: # pragma: no cover - optional dependency/runtime docker = None HTTP_PORTS = {80, 81, 443, 8000, 8080, 8081, 8443, 8888} @dataclass class PortResult: port: int protocol: str state: str service: str | None = None product: str | None = None version: str | None = None extra_info: str | None = None banner: str | None = None headers: dict[str, str] = field(default_factory=dict) @dataclass class HostResult: ip: str hostname: str | None = None mac: str | None = None vendor: str | None = None os_name: str | None = None ports: list[PortResult] = field(default_factory=list) def run_nmap(args: list[str]) -> ET.Element: proc = subprocess.run( ["nmap", *args, "-oX", "-"], capture_output=True, text=True, check=False, ) if proc.returncode != 0: raise RuntimeError(f"nmap failed: {proc.stderr.strip() or proc.stdout.strip()}") return ET.fromstring(proc.stdout) def _run_cmd(args: list[str]) -> str: try: proc = subprocess.run(args, capture_output=True, text=True, check=False) except OSError: return "" if proc.returncode != 0: return "" return proc.stdout def _ip_neigh_table() -> dict[str, dict[str, str | None]]: output = _run_cmd(["ip", "neigh", "show"]) if not output: return {} table: dict[str, dict[str, str | None]] = {} for line in output.splitlines(): parts = line.strip().split() if len(parts) < 5: continue ip = parts[0] mac = None if "lladdr" in parts: idx = parts.index("lladdr") if idx + 1 < len(parts): mac = parts[idx + 1] if mac and re.fullmatch(r"[0-9a-fA-F:]{17}", mac): table[ip] = {"mac": mac.upper(), "vendor": None} return table def _arp_table() -> dict[str, dict[str, str | None]]: output = _run_cmd(["arp", "-an"]) if not output: return {} table: dict[str, dict[str, str | None]] = {} for line in output.splitlines(): match = re.search(r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]{17})", line) if not match: continue ip, mac = match.group(1), match.group(2).upper() table[ip] = {"mac": mac, "vendor": None} return table def arp_neighbors() -> dict[str, dict[str, str | None]]: table = _ip_neigh_table() if table: return table return _arp_table() def discover_hosts(subnet: str) -> list[dict[str, Any]]: root = run_nmap(["-sn", "-n", subnet]) hosts: list[dict[str, Any]] = [] arp_cache = arp_neighbors() for host in root.findall("host"): status = host.find("status") if status is None or status.attrib.get("state") != "up": continue ip = None mac = None vendor = None for addr in host.findall("address"): addr_type = addr.attrib.get("addrtype") if addr_type == "ipv4": ip = addr.attrib.get("addr") elif addr_type == "mac": mac = addr.attrib.get("addr") vendor = addr.attrib.get("vendor") hostname = None names = host.find("hostnames") if names is not None: first = names.find("hostname") if first is not None: hostname = first.attrib.get("name") if not ip: continue if not mac and ip in arp_cache: mac = arp_cache[ip]["mac"] vendor = vendor or arp_cache[ip]["vendor"] hosts.append({"ip": ip, "hostname": hostname, "mac": mac, "vendor": vendor}) return hosts def parse_detailed_host(xml_root: ET.Element) -> HostResult | None: host = xml_root.find("host") if host is None: return None status = host.find("status") if status is None or status.attrib.get("state") != "up": return None ip = None mac = None vendor = None for addr in host.findall("address"): addr_type = addr.attrib.get("addrtype") if addr_type == "ipv4": ip = addr.attrib.get("addr") elif addr_type == "mac": mac = addr.attrib.get("addr") vendor = addr.attrib.get("vendor") if not ip: return None hostname = None names = host.find("hostnames") if names is not None: first = names.find("hostname") if first is not None: hostname = first.attrib.get("name") os_name = None os_match = host.find("os/osmatch") if os_match is not None: os_name = os_match.attrib.get("name") result = HostResult(ip=ip, hostname=hostname, mac=mac, vendor=vendor, os_name=os_name) for port in host.findall("ports/port"): protocol = port.attrib.get("protocol", "tcp") portid = int(port.attrib.get("portid", "0")) state_node = port.find("state") state = state_node.attrib.get("state", "unknown") if state_node is not None else "unknown" if state != "open": continue service_node = port.find("service") service = service_node.attrib.get("name") if service_node is not None else None product = service_node.attrib.get("product") if service_node is not None else None version = service_node.attrib.get("version") if service_node is not None else None extrainfo = service_node.attrib.get("extrainfo") if service_node is not None else None p = PortResult( port=portid, protocol=protocol, state=state, service=service, product=product, version=version, extra_info=extrainfo, ) if protocol == "tcp": banner, headers = probe_port(ip, portid) p.banner = banner p.headers = headers result.ports.append(p) return result def probe_port(ip: str, port: int, timeout: float = PORT_PROBE_TIMEOUT_SECONDS) -> tuple[str | None, dict[str, str]]: if port not in HTTP_PORTS: return None, {} try: if port in {443, 8443}: context = ssl.create_default_context() with socket.create_connection((ip, port), timeout=timeout) as sock: with context.wrap_socket(sock, server_hostname=ip) as tls_sock: return http_probe(tls_sock, ip) with socket.create_connection((ip, port), timeout=timeout) as sock: return http_probe(sock, ip) except Exception: return None, {} def http_probe(sock: socket.socket, host: str) -> tuple[str | None, dict[str, str]]: request = ( f"HEAD / HTTP/1.1\r\n" f"Host: {host}\r\n" "User-Agent: NetTrak/1.0\r\n" "Connection: close\r\n\r\n" ) sock.sendall(request.encode("ascii", errors="ignore")) data = sock.recv(4096) text = data.decode("utf-8", errors="ignore") lines = [line.strip() for line in text.splitlines() if line.strip()] headers: dict[str, str] = {} status = lines[0] if lines else None for line in lines[1:]: if ":" not in line: continue key, value = line.split(":", 1) headers[key.strip()] = value.strip() return status, headers def scan_host(ip: str, seed_host: dict[str, Any] | None = None) -> HostResult: base_args = [ "-Pn", "-n", "--open", "-sV", "--version-light", "--top-ports", str(max(SCAN_TOP_PORTS, 1)), "-T4", "--max-retries", "1", "--host-timeout", "45s", ip, ] result: HostResult | None = None if ENABLE_OS_DETECTION: try: root = run_nmap([*base_args, "-O", "--osscan-guess"]) result = parse_detailed_host(root) except Exception: result = None if result is None: root = run_nmap(base_args) result = parse_detailed_host(root) if result is None: result = HostResult(ip=ip) if seed_host: if not result.hostname and seed_host.get("hostname"): result.hostname = seed_host["hostname"] if not result.mac and seed_host.get("mac"): result.mac = seed_host["mac"] if not result.vendor and seed_host.get("vendor"): result.vendor = seed_host["vendor"] if not result.mac: cached = arp_neighbors().get(ip) if cached: result.mac = cached.get("mac") result.vendor = result.vendor or cached.get("vendor") return result def _local_ipv4s() -> set[str]: local_ips: set[str] = {"127.0.0.1"} try: addrs = socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET) for addr in addrs: local_ips.add(addr[4][0]) except Exception: pass output = _run_cmd(["ip", "-4", "addr", "show"]) for match in re.findall(r"inet\s+(\d+\.\d+\.\d+\.\d+)/\d+", output): local_ips.add(match) return local_ips def discover_docker_ports(target_ips: set[str]) -> dict[str, list[PortResult]]: if not ENABLE_DOCKER_INSIGHTS or docker is None or not target_ips: return {} local_ips = _local_ipv4s() if DOCKER_HOST_IP: local_ips.add(DOCKER_HOST_IP) try: client = docker.DockerClient(base_url=DOCKER_SOCKET) containers = client.containers.list() except Exception: return {} ports_by_ip: dict[str, list[PortResult]] = {} seen: set[tuple[str, int, str]] = set() for container in containers: try: attrs = container.attrs except Exception: continue port_map = attrs.get("NetworkSettings", {}).get("Ports", {}) or {} image = attrs.get("Config", {}).get("Image") name = (attrs.get("Name") or "").lstrip("/") or container.name for container_port_proto, bindings in port_map.items(): if not bindings: continue container_port, protocol = container_port_proto.split("/", 1) for binding in bindings: host_port_raw = binding.get("HostPort") host_ip = binding.get("HostIp") or "" if not host_port_raw: continue try: host_port = int(host_port_raw) except ValueError: continue candidate_ips: set[str] = set() if host_ip and host_ip not in {"0.0.0.0", "::"}: candidate_ips.add(host_ip) else: explicit = (DOCKER_HOST_IP or "").strip() if explicit: candidate_ips.add(explicit) else: candidate_ips |= (local_ips & target_ips) for ip in candidate_ips: if ip not in target_ips: continue key = (ip, host_port, protocol) if key in seen: continue seen.add(key) ports_by_ip.setdefault(ip, []).append( PortResult( port=host_port, protocol=protocol, state="open", service="docker-published", product=image, extra_info=f"container={name}, internal={container_port_proto}", ) ) return ports_by_ip def merge_docker_ports(host: HostResult, docker_ports: list[PortResult]) -> HostResult: if not docker_ports: return host by_key = {(p.port, p.protocol): p for p in host.ports} for dp in docker_ports: key = (dp.port, dp.protocol) existing = by_key.get(key) if existing: note = dp.extra_info or "docker-published" existing.extra_info = f"{existing.extra_info}; {note}" if existing.extra_info else note if not existing.product: existing.product = dp.product if not existing.service: existing.service = dp.service continue host.ports.append(dp) by_key[key] = dp return host