import sqlite3 from contextlib import contextmanager from pathlib import Path from .config import DB_PATH SCHEMA = """ PRAGMA journal_mode=WAL; CREATE TABLE IF NOT EXISTS scans ( id INTEGER PRIMARY KEY AUTOINCREMENT, subnet TEXT NOT NULL, started_at TEXT NOT NULL, completed_at TEXT, status TEXT NOT NULL, host_count INTEGER DEFAULT 0, notes TEXT ); CREATE TABLE IF NOT EXISTS devices ( id INTEGER PRIMARY KEY AUTOINCREMENT, ip TEXT NOT NULL UNIQUE, hostname TEXT, mac TEXT, vendor TEXT, os_name TEXT, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, is_active INTEGER NOT NULL DEFAULT 1, last_scan_id INTEGER, FOREIGN KEY(last_scan_id) REFERENCES scans(id) ); CREATE TABLE IF NOT EXISTS ports ( id INTEGER PRIMARY KEY AUTOINCREMENT, device_id INTEGER NOT NULL, port INTEGER NOT NULL, protocol TEXT NOT NULL, state TEXT NOT NULL, service TEXT, product TEXT, version TEXT, extra_info TEXT, banner TEXT, headers_json TEXT, first_seen TEXT NOT NULL, last_seen TEXT NOT NULL, is_open INTEGER NOT NULL DEFAULT 1, UNIQUE(device_id, port, protocol), FOREIGN KEY(device_id) REFERENCES devices(id) ); CREATE INDEX IF NOT EXISTS idx_devices_active ON devices(is_active); CREATE INDEX IF NOT EXISTS idx_ports_device ON ports(device_id); """ def init_db() -> None: db_file = Path(DB_PATH) db_file.parent.mkdir(parents=True, exist_ok=True) with sqlite3.connect(DB_PATH) as conn: conn.executescript(SCHEMA) @contextmanager def get_conn(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row try: yield conn conn.commit() finally: conn.close()