Files
NetTrak/app/scanner.py
Keith Solomon 9fb58a9677 Initial commit
2026-03-08 15:06:50 -05:00

211 lines
6.4 KiB
Python

from __future__ import annotations
import socket
import ssl
import subprocess
import xml.etree.ElementTree as ET
from dataclasses import dataclass, field
from typing import Any
HTTP_PORTS = {80, 81, 443, 8000, 8080, 8081, 8443, 8888}
@dataclass
class PortResult:
port: int
protocol: str
state: str
service: str | None = None
product: str | None = None
version: str | None = None
extra_info: str | None = None
banner: str | None = None
headers: dict[str, str] = field(default_factory=dict)
@dataclass
class HostResult:
ip: str
hostname: str | None = None
mac: str | None = None
vendor: str | None = None
os_name: str | None = None
ports: list[PortResult] = field(default_factory=list)
def run_nmap(args: list[str]) -> ET.Element:
proc = subprocess.run(
["nmap", *args, "-oX", "-"],
capture_output=True,
text=True,
check=False,
)
if proc.returncode != 0:
raise RuntimeError(f"nmap failed: {proc.stderr.strip() or proc.stdout.strip()}")
return ET.fromstring(proc.stdout)
def discover_hosts(subnet: str) -> list[dict[str, Any]]:
root = run_nmap(["-sn", subnet])
hosts: list[dict[str, Any]] = []
for host in root.findall("host"):
status = host.find("status")
if status is None or status.attrib.get("state") != "up":
continue
ip = None
mac = None
vendor = None
for addr in host.findall("address"):
addr_type = addr.attrib.get("addrtype")
if addr_type == "ipv4":
ip = addr.attrib.get("addr")
elif addr_type == "mac":
mac = addr.attrib.get("addr")
vendor = addr.attrib.get("vendor")
hostname = None
names = host.find("hostnames")
if names is not None:
first = names.find("hostname")
if first is not None:
hostname = first.attrib.get("name")
if ip:
hosts.append({"ip": ip, "hostname": hostname, "mac": mac, "vendor": vendor})
return hosts
def parse_detailed_host(xml_root: ET.Element) -> HostResult | None:
host = xml_root.find("host")
if host is None:
return None
status = host.find("status")
if status is None or status.attrib.get("state") != "up":
return None
ip = None
mac = None
vendor = None
for addr in host.findall("address"):
addr_type = addr.attrib.get("addrtype")
if addr_type == "ipv4":
ip = addr.attrib.get("addr")
elif addr_type == "mac":
mac = addr.attrib.get("addr")
vendor = addr.attrib.get("vendor")
if not ip:
return None
hostname = None
names = host.find("hostnames")
if names is not None:
first = names.find("hostname")
if first is not None:
hostname = first.attrib.get("name")
os_name = None
os_match = host.find("os/osmatch")
if os_match is not None:
os_name = os_match.attrib.get("name")
result = HostResult(ip=ip, hostname=hostname, mac=mac, vendor=vendor, os_name=os_name)
for port in host.findall("ports/port"):
protocol = port.attrib.get("protocol", "tcp")
portid = int(port.attrib.get("portid", "0"))
state_node = port.find("state")
state = state_node.attrib.get("state", "unknown") if state_node is not None else "unknown"
if state != "open":
continue
service_node = port.find("service")
service = service_node.attrib.get("name") if service_node is not None else None
product = service_node.attrib.get("product") if service_node is not None else None
version = service_node.attrib.get("version") if service_node is not None else None
extrainfo = service_node.attrib.get("extrainfo") if service_node is not None else None
p = PortResult(
port=portid,
protocol=protocol,
state=state,
service=service,
product=product,
version=version,
extra_info=extrainfo,
)
if protocol == "tcp":
banner, headers = probe_port(ip, portid)
p.banner = banner
p.headers = headers
result.ports.append(p)
return result
def probe_port(ip: str, port: int, timeout: float = 1.5) -> tuple[str | None, dict[str, str]]:
if port not in HTTP_PORTS:
return grab_banner(ip, port, timeout), {}
try:
if port in {443, 8443}:
context = ssl.create_default_context()
with socket.create_connection((ip, port), timeout=timeout) as sock:
with context.wrap_socket(sock, server_hostname=ip) as tls_sock:
return http_probe(tls_sock, ip)
with socket.create_connection((ip, port), timeout=timeout) as sock:
return http_probe(sock, ip)
except Exception:
return grab_banner(ip, port, timeout), {}
def http_probe(sock: socket.socket, host: str) -> tuple[str | None, dict[str, str]]:
request = (
f"HEAD / HTTP/1.1\r\n"
f"Host: {host}\r\n"
"User-Agent: NetTrak/1.0\r\n"
"Connection: close\r\n\r\n"
)
sock.sendall(request.encode("ascii", errors="ignore"))
data = sock.recv(4096)
text = data.decode("utf-8", errors="ignore")
lines = [line.strip() for line in text.splitlines() if line.strip()]
headers: dict[str, str] = {}
status = lines[0] if lines else None
for line in lines[1:]:
if ":" not in line:
continue
key, value = line.split(":", 1)
headers[key.strip()] = value.strip()
return status, headers
def grab_banner(ip: str, port: int, timeout: float) -> str | None:
try:
with socket.create_connection((ip, port), timeout=timeout) as sock:
sock.settimeout(timeout)
data = sock.recv(1024)
if not data:
return None
return data.decode("utf-8", errors="ignore").strip()[:300]
except Exception:
return None
def scan_host(ip: str) -> HostResult | None:
# `-O` may fail without extra privileges, so we gracefully retry without it.
base_args = ["-Pn", "-sV", "--top-ports", "200", ip]
try:
root = run_nmap([*base_args, "-O", "--osscan-guess"])
return parse_detailed_host(root)
except Exception:
root = run_nmap(base_args)
return parse_detailed_host(root)