import cv2 import os import time import threading import numpy as np import torch from datetime import datetime from flask import Flask, Response, render_template, jsonify, send_from_directory, request from dotenv import load_dotenv from ultralytics import YOLO load_dotenv(override=True) app = Flask(__name__) # Load-time debug for camera config (safe to leave; only prints on startup) print( "[Surveillance] camera_ip_1=" + str(os.getenv("camera_ip_1")) + " camera_ip_2=" + str(os.getenv("camera_ip_2")) ) # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- USERNAME = os.getenv("username") PASSWORD = os.getenv("password") CAMERA_IPS = [os.getenv(f"camera_ip_{i}") for i in range(1, 3)] CAMERAS = [ { "id": f"cam{i + 1}", "name": f"Camera {i + 1}", "ip": ip, "rtsp_url": f"rtsp://{USERNAME}:{PASSWORD}@{ip}:554/cam/realmonitor?channel=1&subtype=0", } for i, ip in enumerate(CAMERA_IPS) ] DEFAULT_CAMERA_ID = CAMERAS[0]["id"] if CAMERAS else "cam1" PROXIMITY_PX = 200 # max pixel distance to consider two people "together" GROUP_TIME_THRESHOLD = 20 # seconds before an alert fires ALERT_COOLDOWN = 60 # seconds between successive alerts MIN_GROUP_SIZE = 2 YOLO_CONF = 0.25 INFERENCE_DEVICE = 0 # --------------------------------------------------------------------------- # Shared state (protected by lock) # --------------------------------------------------------------------------- lock = threading.Lock() state = { "frame": None, "people_count": 0, "groups": [], "alert_active": False, "alerts": [], "fps": 0, "stream_status": "connecting", "selected_camera_id": DEFAULT_CAMERA_ID, "active_camera_id": DEFAULT_CAMERA_ID, } # --------------------------------------------------------------------------- # YOLO model (downloaded on first run) # --------------------------------------------------------------------------- if not torch.cuda.is_available(): raise RuntimeError( "CUDA GPU is required but not available. Install a CUDA-enabled PyTorch build " "and verify NVIDIA drivers." ) model = YOLO("yolo26m.pt") model.to(f"cuda:{INFERENCE_DEVICE}") # Group tracking _group_trackers: dict = {} _next_group_id = 0 _last_alert_time = 0.0 # --------------------------------------------------------------------------- # Detection helpers # --------------------------------------------------------------------------- def _centroids(boxes): """Return list of (cx, cy) from xyxy boxes.""" return [((b[0] + b[2]) / 2, (b[1] + b[3]) / 2) for b in boxes] def _find_groups(centroids, threshold): """BFS clustering — returns list of index-lists with >= MIN_GROUP_SIZE.""" n = len(centroids) if n < MIN_GROUP_SIZE: return [] visited = set() groups = [] for i in range(n): if i in visited: continue cluster = [i] visited.add(i) queue = [i] while queue: cur = queue.pop(0) for j in range(n): if j in visited: continue dx = centroids[cur][0] - centroids[j][0] dy = centroids[cur][1] - centroids[j][1] if (dx * dx + dy * dy) ** 0.5 < threshold: cluster.append(j) visited.add(j) queue.append(j) if len(cluster) >= MIN_GROUP_SIZE: groups.append(cluster) return groups def _group_centroid(centroids, indices): xs = [centroids[i][0] for i in indices] ys = [centroids[i][1] for i in indices] return (sum(xs) / len(xs), sum(ys) / len(ys)) def _get_camera_by_id(camera_id): for cam in CAMERAS: if cam["id"] == camera_id: return cam return CAMERAS[0] if CAMERAS else None def _reset_tracking(): global _group_trackers, _next_group_id, _last_alert_time _group_trackers = {} _next_group_id = 0 _last_alert_time = 0.0 # --------------------------------------------------------------------------- # Main processing loop (runs in background thread) # --------------------------------------------------------------------------- def _process_stream(): global _next_group_id, _last_alert_time cap = None active_camera_id = None prev_time = time.time() while True: with lock: selected_camera_id = state["selected_camera_id"] selected_camera = _get_camera_by_id(selected_camera_id) if not selected_camera: with lock: state["stream_status"] = "error" time.sleep(2) continue if cap is None or active_camera_id != selected_camera["id"]: if cap is not None: cap.release() cap = cv2.VideoCapture(selected_camera["rtsp_url"]) active_camera_id = selected_camera["id"] _reset_tracking() with lock: state["active_camera_id"] = active_camera_id if not cap.isOpened(): with lock: state["stream_status"] = "error" print(f"[ERROR] Cannot open RTSP stream: {selected_camera['rtsp_url']}") time.sleep(2) cap = None continue with lock: state["stream_status"] = "live" ret, frame = cap.read() if not ret: with lock: state["stream_status"] = "reconnecting" cap.release() time.sleep(2) cap = cv2.VideoCapture(selected_camera["rtsp_url"]) if not cap.isOpened(): with lock: state["stream_status"] = "error" cap = None else: with lock: state["stream_status"] = "live" continue now = time.time() fps = 1.0 / max(now - prev_time, 1e-6) prev_time = now # --- YOLO inference (person = class 0) --- results = model( frame, classes=[0], verbose=False, conf=YOLO_CONF, device=INFERENCE_DEVICE, ) person_boxes = [] for r in results: for box in r.boxes: x1, y1, x2, y2 = box.xyxy[0].cpu().numpy() conf = float(box.conf[0]) person_boxes.append((float(x1), float(y1), float(x2), float(y2), conf)) centroids = _centroids([(b[0], b[1], b[2], b[3]) for b in person_boxes]) current_groups = _find_groups(centroids, PROXIMITY_PX) # --- Match current groups to tracked groups --- matched_ids: set = set() frame_group_data = [] for grp_indices in current_groups: gc = _group_centroid(centroids, grp_indices) best_id, best_dist = None, float("inf") for gid, gdata in _group_trackers.items(): if gid in matched_ids: continue dx = gc[0] - gdata["centroid"][0] dy = gc[1] - gdata["centroid"][1] dist = (dx * dx + dy * dy) ** 0.5 if dist < PROXIMITY_PX * 2 and dist < best_dist: best_dist = dist best_id = gid if best_id is not None: _group_trackers[best_id]["centroid"] = gc _group_trackers[best_id]["last_seen"] = now _group_trackers[best_id]["member_count"] = len(grp_indices) matched_ids.add(best_id) frame_group_data.append((best_id, grp_indices, gc)) else: gid = _next_group_id _next_group_id += 1 _group_trackers[gid] = { "centroid": gc, "first_seen": now, "last_seen": now, "member_count": len(grp_indices), "alerted": False, } frame_group_data.append((gid, grp_indices, gc)) # Remove stale groups (not seen for > 3 s) stale = [gid for gid, gd in _group_trackers.items() if now - gd["last_seen"] > 3] for gid in stale: del _group_trackers[gid] # --- Alert logic (mark + metadata; save after drawing overlays) --- alert_this_frame = False pending_alerts = [] # list of (gid, people_count, duration_seconds) for gid, gdata in _group_trackers.items(): duration = gdata["last_seen"] - gdata["first_seen"] if duration >= GROUP_TIME_THRESHOLD and not gdata["alerted"]: if now - _last_alert_time >= ALERT_COOLDOWN: gdata["alerted"] = True alert_this_frame = True _last_alert_time = now pending_alerts.append((gid, gdata["member_count"], duration)) # --- Draw overlays --- display = frame.copy() for x1, y1, x2, y2, conf in person_boxes: cv2.rectangle(display, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2) cv2.putText( display, f"{conf:.0%}", (int(x1), int(y1) - 6), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2, ) for gid, grp_indices, gc in frame_group_data: gdata = _group_trackers.get(gid) if gdata is None: continue duration = gdata["last_seen"] - gdata["first_seen"] radius = int(PROXIMITY_PX * 0.6) is_alert = duration >= GROUP_TIME_THRESHOLD color = (0, 0, 255) if is_alert else (0, 165, 255) cv2.circle(display, (int(gc[0]), int(gc[1])), radius, color, 2) label = f"Group: {len(grp_indices)} | {duration:.0f}s" cv2.putText( display, label, (int(gc[0]) - 70, int(gc[1]) - radius - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2, ) if is_alert: cv2.putText( display, "ALERT", (int(gc[0]) - 35, int(gc[1]) + radius + 25), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 3, ) # --- Save annotated alerts --- if pending_alerts: os.makedirs("alerts", exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") for gid, people, duration in pending_alerts: # Include group id to avoid collisions when multiple groups alert in one second. alert_path = f"alerts/alert_{ts}_gid{gid}.jpg" cv2.imwrite(alert_path, display) alert_info = { "time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "people": people, "duration": round(duration, 1), "image": alert_path, } with lock: state["alerts"].insert(0, alert_info) state["alerts"] = state["alerts"][:50] # --- Update shared state --- groups_json = [] for gid, gi, gc in frame_group_data: gdata = _group_trackers.get(gid) if gdata: groups_json.append({ "id": gid, "count": len(gi), "duration": round(gdata["last_seen"] - gdata["first_seen"], 1), }) with lock: state["frame"] = display state["people_count"] = len(person_boxes) state["groups"] = groups_json state["alert_active"] = alert_this_frame or any( (gd["last_seen"] - gd["first_seen"]) >= GROUP_TIME_THRESHOLD for gd in _group_trackers.values() ) state["fps"] = round(fps, 1) # --------------------------------------------------------------------------- # MJPEG generator # --------------------------------------------------------------------------- def _generate_frames(): while True: with lock: frame = state["frame"] if frame is not None: ok, buf = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 80]) if ok: yield ( b"--frame\r\n" b"Content-Type: image/jpeg\r\n\r\n" + buf.tobytes() + b"\r\n" ) time.sleep(0.033) # --------------------------------------------------------------------------- # Routes # --------------------------------------------------------------------------- @app.route("/") def dashboard(): return render_template("dashboard.html") @app.route("/video_feed") def video_feed(): return Response( _generate_frames(), mimetype="multipart/x-mixed-replace; boundary=frame", ) @app.route("/api/status") def api_status(): with lock: active_camera = _get_camera_by_id(state["active_camera_id"]) return jsonify({ "people_count": state["people_count"], "groups": state["groups"], "alert_active": state["alert_active"], "alerts": state["alerts"][:20], "fps": state["fps"], "stream_status": state["stream_status"], "selected_camera_id": state["selected_camera_id"], "active_camera_name": active_camera["name"] if active_camera else "Unknown", }) @app.route("/api/cameras") def api_cameras(): return jsonify({ "cameras": [{"id": c["id"], "name": c["name"], "ip": c["ip"]} for c in CAMERAS] }) @app.route("/api/camera/select", methods=["POST"]) def api_camera_select(): data = request.get_json(silent=True) or {} camera_id = data.get("camera_id") camera = _get_camera_by_id(camera_id) if camera is None: return jsonify({"ok": False, "error": "Invalid camera id"}), 400 with lock: state["selected_camera_id"] = camera["id"] state["frame"] = None state["groups"] = [] state["people_count"] = 0 state["alert_active"] = False state["fps"] = 0 state["stream_status"] = "connecting" return jsonify({"ok": True}) @app.route("/alerts/") def serve_alert_image(filename): return send_from_directory("alerts", filename) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": threading.Thread(target=_process_stream, daemon=True).start() print(f"\n Surveillance Dashboard → http://localhost:5000\n") app.run(host="0.0.0.0", port=5000, debug=False, threaded=True)