Files
NetTrak/app/scanner.py
2026-03-08 19:39:11 -05:00

422 lines
12 KiB
Python

from __future__ import annotations
import re
import socket
import ssl
import subprocess
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from typing import Any
from .config import (
DOCKER_HOST_IP,
DOCKER_SOCKET,
ENABLE_DOCKER_INSIGHTS,
ENABLE_OS_DETECTION,
PORT_PROBE_TIMEOUT_SECONDS,
SCAN_PORT_SPEC,
SCAN_TOP_PORTS,
)
try:
import docker
except Exception: # pragma: no cover - optional dependency/runtime
docker = None
HTTP_PORTS = {80, 81, 443, 8000, 8080, 8081, 8443, 8888}
@dataclass
class PortResult:
port: int
protocol: str
state: str
service: str | None = None
product: str | None = None
version: str | None = None
extra_info: str | None = None
banner: str | None = None
headers: dict[str, str] = field(default_factory=dict)
@dataclass
class HostResult:
ip: str
hostname: str | None = None
mac: str | None = None
vendor: str | None = None
os_name: str | None = None
ports: list[PortResult] = field(default_factory=list)
def run_nmap(args: list[str]) -> ET.Element:
proc = subprocess.run(
["nmap", *args, "-oX", "-"],
capture_output=True,
text=True,
check=False,
)
if proc.returncode != 0:
raise RuntimeError(f"nmap failed: {proc.stderr.strip() or proc.stdout.strip()}")
return ET.fromstring(proc.stdout)
def _run_cmd(args: list[str]) -> str:
try:
proc = subprocess.run(args, capture_output=True, text=True, check=False)
except OSError:
return ""
if proc.returncode != 0:
return ""
return proc.stdout
def _ip_neigh_table() -> dict[str, dict[str, str | None]]:
output = _run_cmd(["ip", "neigh", "show"])
if not output:
return {}
table: dict[str, dict[str, str | None]] = {}
for line in output.splitlines():
parts = line.strip().split()
if len(parts) < 5:
continue
ip = parts[0]
mac = None
if "lladdr" in parts:
idx = parts.index("lladdr")
if idx + 1 < len(parts):
mac = parts[idx + 1]
if mac and re.fullmatch(r"[0-9a-fA-F:]{17}", mac):
table[ip] = {"mac": mac.upper(), "vendor": None}
return table
def _arp_table() -> dict[str, dict[str, str | None]]:
output = _run_cmd(["arp", "-an"])
if not output:
return {}
table: dict[str, dict[str, str | None]] = {}
for line in output.splitlines():
match = re.search(r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]{17})", line)
if not match:
continue
ip, mac = match.group(1), match.group(2).upper()
table[ip] = {"mac": mac, "vendor": None}
return table
def arp_neighbors() -> dict[str, dict[str, str | None]]:
table = _ip_neigh_table()
if table:
return table
return _arp_table()
def discover_hosts(subnet: str) -> list[dict[str, Any]]:
root = run_nmap(["-sn", "-n", subnet])
hosts: list[dict[str, Any]] = []
arp_cache = arp_neighbors()
for host in root.findall("host"):
status = host.find("status")
if status is None or status.attrib.get("state") != "up":
continue
ip = None
mac = None
vendor = None
for addr in host.findall("address"):
addr_type = addr.attrib.get("addrtype")
if addr_type == "ipv4":
ip = addr.attrib.get("addr")
elif addr_type == "mac":
mac = addr.attrib.get("addr")
vendor = addr.attrib.get("vendor")
hostname = None
names = host.find("hostnames")
if names is not None:
first = names.find("hostname")
if first is not None:
hostname = first.attrib.get("name")
if not ip:
continue
if not mac and ip in arp_cache:
mac = arp_cache[ip]["mac"]
vendor = vendor or arp_cache[ip]["vendor"]
hosts.append({"ip": ip, "hostname": hostname, "mac": mac, "vendor": vendor})
return hosts
def parse_detailed_host(xml_root: ET.Element) -> HostResult | None:
host = xml_root.find("host")
if host is None:
return None
status = host.find("status")
if status is None or status.attrib.get("state") != "up":
return None
ip = None
mac = None
vendor = None
for addr in host.findall("address"):
addr_type = addr.attrib.get("addrtype")
if addr_type == "ipv4":
ip = addr.attrib.get("addr")
elif addr_type == "mac":
mac = addr.attrib.get("addr")
vendor = addr.attrib.get("vendor")
if not ip:
return None
hostname = None
names = host.find("hostnames")
if names is not None:
first = names.find("hostname")
if first is not None:
hostname = first.attrib.get("name")
os_name = None
os_match = host.find("os/osmatch")
if os_match is not None:
os_name = os_match.attrib.get("name")
result = HostResult(ip=ip, hostname=hostname, mac=mac, vendor=vendor, os_name=os_name)
for port in host.findall("ports/port"):
protocol = port.attrib.get("protocol", "tcp")
portid = int(port.attrib.get("portid", "0"))
state_node = port.find("state")
state = state_node.attrib.get("state", "unknown") if state_node is not None else "unknown"
if state != "open":
continue
service_node = port.find("service")
service = service_node.attrib.get("name") if service_node is not None else None
product = service_node.attrib.get("product") if service_node is not None else None
version = service_node.attrib.get("version") if service_node is not None else None
extrainfo = service_node.attrib.get("extrainfo") if service_node is not None else None
p = PortResult(
port=portid,
protocol=protocol,
state=state,
service=service,
product=product,
version=version,
extra_info=extrainfo,
)
if protocol == "tcp":
banner, headers = probe_port(ip, portid)
p.banner = banner
p.headers = headers
result.ports.append(p)
return result
def probe_port(ip: str, port: int, timeout: float = PORT_PROBE_TIMEOUT_SECONDS) -> tuple[str | None, dict[str, str]]:
if port not in HTTP_PORTS:
return None, {}
try:
if port in {443, 8443}:
context = ssl.create_default_context()
with socket.create_connection((ip, port), timeout=timeout) as sock:
with context.wrap_socket(sock, server_hostname=ip) as tls_sock:
return http_probe(tls_sock, ip)
with socket.create_connection((ip, port), timeout=timeout) as sock:
return http_probe(sock, ip)
except Exception:
return None, {}
def http_probe(sock: socket.socket, host: str) -> tuple[str | None, dict[str, str]]:
request = (
f"HEAD / HTTP/1.1\r\n"
f"Host: {host}\r\n"
"User-Agent: NetTrak/1.0\r\n"
"Connection: close\r\n\r\n"
)
sock.sendall(request.encode("ascii", errors="ignore"))
data = sock.recv(4096)
text = data.decode("utf-8", errors="ignore")
lines = [line.strip() for line in text.splitlines() if line.strip()]
headers: dict[str, str] = {}
status = lines[0] if lines else None
for line in lines[1:]:
if ":" not in line:
continue
key, value = line.split(":", 1)
headers[key.strip()] = value.strip()
return status, headers
def scan_host(ip: str, seed_host: dict[str, Any] | None = None) -> HostResult:
base_args = [
"-Pn",
"-n",
"--open",
"-sV",
"--version-light",
"-T4",
"--max-retries",
"1",
"--host-timeout",
"45s",
ip,
]
if SCAN_PORT_SPEC:
base_args[5:5] = ["-p", SCAN_PORT_SPEC]
else:
base_args[5:5] = ["--top-ports", str(max(SCAN_TOP_PORTS, 1))]
result: HostResult | None = None
if ENABLE_OS_DETECTION:
try:
root = run_nmap([*base_args, "-O", "--osscan-guess"])
result = parse_detailed_host(root)
except Exception:
result = None
if result is None:
root = run_nmap(base_args)
result = parse_detailed_host(root)
if result is None:
result = HostResult(ip=ip)
if seed_host:
if not result.hostname and seed_host.get("hostname"):
result.hostname = seed_host["hostname"]
if not result.mac and seed_host.get("mac"):
result.mac = seed_host["mac"]
if not result.vendor and seed_host.get("vendor"):
result.vendor = seed_host["vendor"]
if not result.mac:
cached = arp_neighbors().get(ip)
if cached:
result.mac = cached.get("mac")
result.vendor = result.vendor or cached.get("vendor")
return result
def _local_ipv4s() -> set[str]:
local_ips: set[str] = {"127.0.0.1"}
try:
addrs = socket.getaddrinfo(socket.gethostname(), None, socket.AF_INET)
for addr in addrs:
local_ips.add(addr[4][0])
except Exception:
pass
output = _run_cmd(["ip", "-4", "addr", "show"])
for match in re.findall(r"inet\s+(\d+\.\d+\.\d+\.\d+)/\d+", output):
local_ips.add(match)
return local_ips
def discover_docker_ports(target_ips: set[str]) -> dict[str, list[PortResult]]:
if not ENABLE_DOCKER_INSIGHTS or docker is None or not target_ips:
return {}
local_ips = _local_ipv4s()
if DOCKER_HOST_IP:
local_ips.add(DOCKER_HOST_IP)
try:
client = docker.DockerClient(base_url=DOCKER_SOCKET)
containers = client.containers.list()
except Exception:
return {}
ports_by_ip: dict[str, list[PortResult]] = {}
seen: set[tuple[str, int, str]] = set()
for container in containers:
try:
attrs = container.attrs
except Exception:
continue
port_map = attrs.get("NetworkSettings", {}).get("Ports", {}) or {}
image = attrs.get("Config", {}).get("Image")
name = (attrs.get("Name") or "").lstrip("/") or container.name
for container_port_proto, bindings in port_map.items():
if not bindings:
continue
container_port, protocol = container_port_proto.split("/", 1)
for binding in bindings:
host_port_raw = binding.get("HostPort")
host_ip = binding.get("HostIp") or ""
if not host_port_raw:
continue
try:
host_port = int(host_port_raw)
except ValueError:
continue
candidate_ips: set[str] = set()
if host_ip and host_ip not in {"0.0.0.0", "::"}:
candidate_ips.add(host_ip)
else:
explicit = (DOCKER_HOST_IP or "").strip()
if explicit:
candidate_ips.add(explicit)
else:
candidate_ips |= (local_ips & target_ips)
for ip in candidate_ips:
if ip not in target_ips:
continue
key = (ip, host_port, protocol)
if key in seen:
continue
seen.add(key)
ports_by_ip.setdefault(ip, []).append(
PortResult(
port=host_port,
protocol=protocol,
state="open",
service="docker-published",
product=image,
extra_info=f"container={name}, internal={container_port_proto}",
)
)
return ports_by_ip
def merge_docker_ports(host: HostResult, docker_ports: list[PortResult]) -> HostResult:
if not docker_ports:
return host
by_key = {(p.port, p.protocol): p for p in host.ports}
for dp in docker_ports:
key = (dp.port, dp.protocol)
existing = by_key.get(key)
if existing:
note = dp.extra_info or "docker-published"
existing.extra_info = f"{existing.extra_info}; {note}" if existing.extra_info else note
if not existing.product:
existing.product = dp.product
if not existing.service:
existing.service = dp.service
continue
host.ports.append(dp)
by_key[key] = dp
return host