utopia-surveillance-tool/camera_stream.py

514 lines
18 KiB
Python

import cv2
import os
import time
import threading
import numpy as np
import torch
from datetime import datetime
from flask import Flask, Response, render_template, jsonify, send_from_directory, request
from dotenv import load_dotenv
from ultralytics import YOLO
load_dotenv(override=True)
app = Flask(__name__)
# Load-time debug for camera config (safe to leave; only prints on startup)
print(
"[Surveillance] camera_ip_1=" + str(os.getenv("camera_ip_1")) +
" camera_ip_2=" + str(os.getenv("camera_ip_2"))
)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
USERNAME = os.getenv("username")
PASSWORD = os.getenv("password")
CAMERA_IPS = [os.getenv(f"camera_ip_{i}") for i in range(1, 3)]
CAMERAS = [
{
"id": f"cam{i + 1}",
"name": f"Camera {i + 1}",
"ip": ip,
"rtsp_url": f"rtsp://{USERNAME}:{PASSWORD}@{ip}:554/cam/realmonitor?channel=1&subtype=0",
}
for i, ip in enumerate(CAMERA_IPS)
]
DEFAULT_CAMERA_ID = CAMERAS[0]["id"] if CAMERAS else "cam1"
PROXIMITY_PX = 200 # max pixel distance to consider two people "together"
GROUP_TIME_THRESHOLD = 20 # seconds before an alert fires
ALERT_COOLDOWN = 60 # seconds between successive alerts
MIN_GROUP_SIZE = 2
YOLO_CONF = 0.25
INFERENCE_DEVICE = 0
# ---------------------------------------------------------------------------
# Shared state (protected by lock)
# ---------------------------------------------------------------------------
lock = threading.Lock()
state = {
"frame": None,
"people_count": 0,
"groups": [],
"alert_active": False,
"alerts": [],
"fps": 0,
"stream_status": "connecting",
"selected_camera_id": DEFAULT_CAMERA_ID,
"active_camera_id": DEFAULT_CAMERA_ID,
}
# ---------------------------------------------------------------------------
# YOLO model (downloaded on first run)
# ---------------------------------------------------------------------------
if not torch.cuda.is_available():
raise RuntimeError(
"CUDA GPU is required but not available. Install a CUDA-enabled PyTorch build "
"and verify NVIDIA drivers."
)
model = YOLO("person_detector_best.pt")
model.to(f"cuda:{INFERENCE_DEVICE}")
# Group tracking
_group_trackers: dict = {}
_next_group_id = 0
_last_alert_time = 0.0
# ---------------------------------------------------------------------------
# Detection helpers
# ---------------------------------------------------------------------------
def _centroids(boxes):
"""Return list of (cx, cy) from xyxy boxes."""
return [((b[0] + b[2]) / 2, (b[1] + b[3]) / 2) for b in boxes]
def _find_groups(centroids, threshold):
"""BFS clustering — returns list of index-lists with >= MIN_GROUP_SIZE."""
n = len(centroids)
if n < MIN_GROUP_SIZE:
return []
visited = set()
groups = []
for i in range(n):
if i in visited:
continue
cluster = [i]
visited.add(i)
queue = [i]
while queue:
cur = queue.pop(0)
for j in range(n):
if j in visited:
continue
dx = centroids[cur][0] - centroids[j][0]
dy = centroids[cur][1] - centroids[j][1]
if (dx * dx + dy * dy) ** 0.5 < threshold:
cluster.append(j)
visited.add(j)
queue.append(j)
if len(cluster) >= MIN_GROUP_SIZE:
groups.append(cluster)
return groups
def _group_centroid(centroids, indices):
xs = [centroids[i][0] for i in indices]
ys = [centroids[i][1] for i in indices]
return (sum(xs) / len(xs), sum(ys) / len(ys))
def _get_camera_by_id(camera_id):
for cam in CAMERAS:
if cam["id"] == camera_id:
return cam
return CAMERAS[0] if CAMERAS else None
def _reset_tracking():
global _group_trackers, _next_group_id, _last_alert_time
_group_trackers = {}
_next_group_id = 0
_last_alert_time = 0.0
# ---------------------------------------------------------------------------
# Main processing loop (runs in background thread)
# ---------------------------------------------------------------------------
def _process_stream():
global _next_group_id, _last_alert_time
cap = None
active_camera_id = None
prev_time = time.time()
while True:
with lock:
selected_camera_id = state["selected_camera_id"]
selected_camera = _get_camera_by_id(selected_camera_id)
if not selected_camera:
with lock:
state["stream_status"] = "error"
time.sleep(2)
continue
if cap is None or active_camera_id != selected_camera["id"]:
if cap is not None:
cap.release()
cap = cv2.VideoCapture(selected_camera["rtsp_url"])
active_camera_id = selected_camera["id"]
_reset_tracking()
with lock:
state["active_camera_id"] = active_camera_id
if not cap.isOpened():
with lock:
state["stream_status"] = "error"
print(f"[ERROR] Cannot open RTSP stream: {selected_camera['rtsp_url']}")
time.sleep(2)
cap = None
continue
with lock:
state["stream_status"] = "live"
ret, frame = cap.read()
if not ret:
with lock:
state["stream_status"] = "reconnecting"
cap.release()
time.sleep(2)
cap = cv2.VideoCapture(selected_camera["rtsp_url"])
if not cap.isOpened():
with lock:
state["stream_status"] = "error"
cap = None
else:
with lock:
state["stream_status"] = "live"
continue
now = time.time()
fps = 1.0 / max(now - prev_time, 1e-6)
prev_time = now
# --- YOLO inference (person = class 0) ---
results = model(
frame,
classes=[0],
verbose=False,
conf=YOLO_CONF,
device=INFERENCE_DEVICE,
)
person_boxes = []
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
conf = float(box.conf[0])
person_boxes.append((float(x1), float(y1), float(x2), float(y2), conf))
centroids = _centroids([(b[0], b[1], b[2], b[3]) for b in person_boxes])
current_groups = _find_groups(centroids, PROXIMITY_PX)
# --- Match current groups to tracked groups ---
matched_ids: set = set()
frame_group_data = []
for grp_indices in current_groups:
gc = _group_centroid(centroids, grp_indices)
best_id, best_dist = None, float("inf")
for gid, gdata in _group_trackers.items():
if gid in matched_ids:
continue
dx = gc[0] - gdata["centroid"][0]
dy = gc[1] - gdata["centroid"][1]
dist = (dx * dx + dy * dy) ** 0.5
if dist < PROXIMITY_PX * 2 and dist < best_dist:
best_dist = dist
best_id = gid
if best_id is not None:
_group_trackers[best_id]["centroid"] = gc
_group_trackers[best_id]["last_seen"] = now
_group_trackers[best_id]["member_count"] = len(grp_indices)
matched_ids.add(best_id)
frame_group_data.append((best_id, grp_indices, gc))
else:
gid = _next_group_id
_next_group_id += 1
_group_trackers[gid] = {
"centroid": gc,
"first_seen": now,
"last_seen": now,
"member_count": len(grp_indices),
"alerted": False,
}
frame_group_data.append((gid, grp_indices, gc))
# Remove stale groups (not seen for > 3 s)
stale = [gid for gid, gd in _group_trackers.items() if now - gd["last_seen"] > 3]
for gid in stale:
del _group_trackers[gid]
# --- Alert logic (mark + metadata; save after drawing overlays) ---
alert_this_frame = False
pending_alerts = [] # list of (gid, people_count, duration_seconds)
for gid, gdata in _group_trackers.items():
duration = gdata["last_seen"] - gdata["first_seen"]
if duration >= GROUP_TIME_THRESHOLD and not gdata["alerted"]:
if now - _last_alert_time >= ALERT_COOLDOWN:
gdata["alerted"] = True
alert_this_frame = True
_last_alert_time = now
pending_alerts.append((gid, gdata["member_count"], duration))
# --- Draw overlays ---
display = frame.copy()
pending_gid_set = {gid for gid, _, _ in pending_alerts}
alert_person_indices = set()
for gid, grp_indices, _gc in frame_group_data:
if gid in pending_gid_set:
alert_person_indices.update(grp_indices)
# Live overlay: mark any person that belongs to any alerting group.
if pending_alerts:
max_people = max(people for _gid, people, _dur in pending_alerts)
max_dur = max(dur for _gid, _people, dur in pending_alerts)
cv2.putText(
display,
f"ALERT: {len(pending_alerts)} group(s) | {max_people} people | {int(max_dur)}s",
(12, 32),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
(0, 0, 255),
3,
)
for idx, (x1, y1, x2, y2, conf) in enumerate(person_boxes):
is_alert_person = idx in alert_person_indices
box_color = (0, 0, 255) if is_alert_person else (0, 255, 0)
cv2.rectangle(display, (int(x1), int(y1)), (int(x2), int(y2)), box_color, 2)
cv2.putText(
display, f"{conf:.0%}",
(int(x1), int(y1) - 6),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, box_color, 2,
)
for gid, grp_indices, gc in frame_group_data:
gdata = _group_trackers.get(gid)
if gdata is None:
continue
duration = gdata["last_seen"] - gdata["first_seen"]
radius = int(PROXIMITY_PX * 0.6)
is_alert = duration >= GROUP_TIME_THRESHOLD
color = (0, 0, 255) if is_alert else (0, 165, 255)
cv2.circle(display, (int(gc[0]), int(gc[1])), radius, color, 2)
label = f"Group: {len(grp_indices)} | {duration:.0f}s"
cv2.putText(
display, label,
(int(gc[0]) - 70, int(gc[1]) - radius - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.55, color, 2,
)
if is_alert:
cv2.putText(
display, "ALERT",
(int(gc[0]) - 35, int(gc[1]) + radius + 25),
cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 3,
)
# --- Save annotated alerts (separate per group) ---
if pending_alerts:
os.makedirs("alerts", exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
frame_group_by_id = {gid: (grp_indices, gc) for gid, grp_indices, gc in frame_group_data}
for gid, people, duration in pending_alerts:
grp_indices, gc = frame_group_by_id.get(gid, (None, None))
if grp_indices is None:
continue
alert_display = frame.copy()
alert_person_set = set(grp_indices)
# Header annotation for this specific alert group.
cv2.putText(
alert_display,
f"ALERT GROUP {gid} | {people} people | {int(duration)}s",
(12, 32),
cv2.FONT_HERSHEY_SIMPLEX,
0.9,
(0, 0, 255),
3,
)
# Draw only the person boxes relevant to this group.
for idx, (x1, y1, x2, y2, conf) in enumerate(person_boxes):
is_alert_person = idx in alert_person_set
box_color = (0, 0, 255) if is_alert_person else (0, 255, 0)
cv2.rectangle(alert_display, (int(x1), int(y1)), (int(x2), int(y2)), box_color, 2)
cv2.putText(
alert_display,
f"{conf:.0%}",
(int(x1), int(y1) - 6),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
box_color,
2,
)
# Draw only the circle/label for this group.
radius = int(PROXIMITY_PX * 0.6)
cv2.circle(alert_display, (int(gc[0]), int(gc[1])), radius, (0, 0, 255), 2)
cv2.putText(
alert_display,
f"Group: {len(grp_indices)} | {duration:.0f}s",
(int(gc[0]) - 70, int(gc[1]) - radius - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.55,
(0, 0, 255),
2,
)
cv2.putText(
alert_display,
"ALERT",
(int(gc[0]) - 35, int(gc[1]) + radius + 25),
cv2.FONT_HERSHEY_SIMPLEX,
0.8,
(0, 0, 255),
3,
)
# Include group id to avoid collisions when multiple groups alert in one second.
alert_path = f"alerts/alert_{ts}_gid{gid}.jpg"
cv2.imwrite(alert_path, alert_display)
alert_info = {
"time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"people": people,
"duration": round(duration, 1),
"image": alert_path,
}
with lock:
state["alerts"].insert(0, alert_info)
state["alerts"] = state["alerts"][:50]
# --- Update shared state ---
groups_json = []
for gid, gi, gc in frame_group_data:
gdata = _group_trackers.get(gid)
if gdata:
groups_json.append({
"id": gid,
"count": len(gi),
"duration": round(gdata["last_seen"] - gdata["first_seen"], 1),
})
with lock:
state["frame"] = display
state["people_count"] = len(person_boxes)
state["groups"] = groups_json
state["alert_active"] = alert_this_frame or any(
(gd["last_seen"] - gd["first_seen"]) >= GROUP_TIME_THRESHOLD
for gd in _group_trackers.values()
)
state["fps"] = round(fps, 1)
# ---------------------------------------------------------------------------
# MJPEG generator
# ---------------------------------------------------------------------------
def _generate_frames():
while True:
with lock:
frame = state["frame"]
if frame is not None:
ok, buf = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
if ok:
yield (
b"--frame\r\n"
b"Content-Type: image/jpeg\r\n\r\n" + buf.tobytes() + b"\r\n"
)
time.sleep(0.033)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.route("/")
def dashboard():
return render_template("dashboard.html")
@app.route("/video_feed")
def video_feed():
return Response(
_generate_frames(),
mimetype="multipart/x-mixed-replace; boundary=frame",
)
@app.route("/api/status")
def api_status():
with lock:
active_camera = _get_camera_by_id(state["active_camera_id"])
return jsonify({
"people_count": state["people_count"],
"groups": state["groups"],
"alert_active": state["alert_active"],
"alerts": state["alerts"][:20],
"fps": state["fps"],
"stream_status": state["stream_status"],
"selected_camera_id": state["selected_camera_id"],
"active_camera_name": active_camera["name"] if active_camera else "Unknown",
})
@app.route("/api/cameras")
def api_cameras():
return jsonify({
"cameras": [{"id": c["id"], "name": c["name"], "ip": c["ip"]} for c in CAMERAS]
})
@app.route("/api/camera/select", methods=["POST"])
def api_camera_select():
data = request.get_json(silent=True) or {}
camera_id = data.get("camera_id")
camera = _get_camera_by_id(camera_id)
if camera is None:
return jsonify({"ok": False, "error": "Invalid camera id"}), 400
with lock:
state["selected_camera_id"] = camera["id"]
state["frame"] = None
state["groups"] = []
state["people_count"] = 0
state["alert_active"] = False
state["fps"] = 0
state["stream_status"] = "connecting"
return jsonify({"ok": True})
@app.route("/alerts/<path:filename>")
def serve_alert_image(filename):
return send_from_directory("alerts", filename)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
threading.Thread(target=_process_stream, daemon=True).start()
print(f"\n Surveillance Dashboard → http://localhost:5000\n")
app.run(host="0.0.0.0", port=5000, debug=False, threaded=True)