utopia-surveillance-tool/camera_stream.py

281 lines
11 KiB
Python

import cv2
import os
import time
import threading
import numpy as np
import torch
from datetime import datetime
from flask import Flask, Response, render_template, jsonify, send_from_directory, request
from dotenv import load_dotenv
from ultralytics import YOLO
load_dotenv(override=True)
app = Flask(__name__)
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
USERNAME = os.getenv("username")
PASSWORD = os.getenv("password")
# Dynamically find all camera_ip_N variables in environment
CAMERA_IPS = []
for i in range(1, 11): # Supports up to 10 cameras
ip = os.getenv(f"camera_ip_{i}")
if ip and ip.strip():
CAMERA_IPS.append(ip.strip())
CAMERAS = [
{
"id": f"cam{i + 1}",
"name": f"Camera {i + 1}",
"ip": ip,
"rtsp_url": f"rtsp://{USERNAME}:{PASSWORD}@{ip}:554/cam/realmonitor?channel=1&subtype=0",
}
for i, ip in enumerate(CAMERA_IPS)
]
PROXIMITY_PX = 200 # max pixel distance to consider two people "together"
GROUP_TIME_THRESHOLD = 20 # seconds before an alert fires
ALERT_COOLDOWN = 60 # seconds between successive alerts
MIN_GROUP_SIZE = 2
YOLO_CONF = 0.25
INFERENCE_DEVICE = 0
# ---------------------------------------------------------------------------
# Shared state
# ---------------------------------------------------------------------------
lock = threading.Lock()
state = {
"cameras": {}, # camera_id -> dict with frame, metadata, status
"grid_frame": None,
"alerts": [],
"total_people_count": 0,
"alert_active": False
}
# Initialize camera states
for cam in CAMERAS:
state["cameras"][cam["id"]] = {
"frame": None,
"people_count": 0,
"groups": [],
"alert_active": False,
"fps": 0,
"status": "connecting",
"name": cam["name"]
}
# ---------------------------------------------------------------------------
# YOLO model
# ---------------------------------------------------------------------------
if not torch.cuda.is_available():
print("[WARNING] CUDA not detected. Using CPU (this will be slow!).")
device = "cpu"
else:
device = f"cuda:{INFERENCE_DEVICE}"
model = YOLO("person_detector_best.pt")
model.to(device)
# ---------------------------------------------------------------------------
# Detection Helpers
# ---------------------------------------------------------------------------
def _centroids(boxes):
return [((b[0] + b[2]) / 2, (b[1] + b[3]) / 2) for b in boxes]
def _find_groups(centroids, threshold):
n = len(centroids)
if n < MIN_GROUP_SIZE: return []
visited = set()
groups = []
for i in range(n):
if i in visited: continue
cluster, queue = [i], [i]
visited.add(i)
while queue:
cur = queue.pop(0)
for j in range(n):
if j in visited: continue
dx, dy = centroids[cur][0] - centroids[j][0], centroids[cur][1] - centroids[j][1]
if (dx*dx + dy*dy)**0.5 < threshold:
cluster.append(j); visited.add(j); queue.append(j)
if len(cluster) >= MIN_GROUP_SIZE: groups.append(cluster)
return groups
def _group_centroid(centroids, indices):
xs = [centroids[i][0] for i in indices]
ys = [centroids[i][1] for i in indices]
return (sum(xs) / len(xs), sum(ys) / len(ys))
# ---------------------------------------------------------------------------
# Stream Processing (One per camera)
# ---------------------------------------------------------------------------
def _process_stream(camera):
cam_id = camera["id"]
rtsp_url = camera["rtsp_url"]
group_trackers = {}
next_group_id = 0
last_alert_time = 0.0
cap = None
prev_time = time.time()
print(f"[Surveillance] Starting thread for {camera['name']}")
while True:
if cap is None:
cap = cv2.VideoCapture(rtsp_url)
if not cap.isOpened():
with lock: state["cameras"][cam_id]["status"] = "error"
time.sleep(5); cap = None; continue
with lock: state["cameras"][cam_id]["status"] = "live"
ret, frame = cap.read()
if not ret:
with lock: state["cameras"][cam_id]["status"] = "reconnecting"
cap.release(); time.sleep(2); cap = cv2.VideoCapture(rtsp_url); continue
now = time.time()
fps = 1.0 / max(now - prev_time, 1e-6)
prev_time = now
results = model(frame, classes=[0], verbose=False, conf=YOLO_CONF, device=device)
person_boxes = []
for r in results:
for box in r.boxes:
x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
person_boxes.append((float(x1), float(y1), float(x2), float(y2), float(box.conf[0])))
centroids = _centroids([(b[0], b[1], b[2], b[3]) for b in person_boxes])
current_groups = _find_groups(centroids, PROXIMITY_PX)
# Match Groups
matched_ids = set()
frame_group_data = []
for grp_indices in current_groups:
gc = _group_centroid(centroids, grp_indices)
best_id, best_dist = None, float("inf")
for gid, gdata in group_trackers.items():
if gid in matched_ids: continue
dist = ((gc[0]-gdata["centroid"][0])**2 + (gc[1]-gdata["centroid"][1])**2)**0.5
if dist < PROXIMITY_PX * 2 and dist < best_dist:
best_dist, best_id = dist, gid
if best_id is not None:
group_trackers[best_id].update({"centroid": gc, "last_seen": now, "member_count": len(grp_indices)})
matched_ids.add(best_id)
frame_group_data.append((best_id, grp_indices, gc))
else:
gid = next_group_id
next_group_id += 1
group_trackers[gid] = {"centroid": gc, "first_seen": now, "last_seen": now, "member_count": len(grp_indices), "alerted": False}
frame_group_data.append((gid, grp_indices, gc))
stale = [gid for gid, gd in group_trackers.items() if now - gd["last_seen"] > 3]
for gid in stale: del group_trackers[gid]
# Alerting
alert_this_frame = False
for gid, gdata in group_trackers.items():
if (gdata["last_seen"] - gdata["first_seen"]) >= GROUP_TIME_THRESHOLD and not gdata["alerted"]:
if now - last_alert_time >= ALERT_COOLDOWN:
gdata["alerted"] = True
alert_this_frame = True
last_alert_time = now
os.makedirs("alerts", exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
path = f"alerts/alert_{ts}_{cam_id}_gid{gid}.jpg"
cv2.imwrite(path, frame)
with lock:
state["alerts"].insert(0, {"time": datetime.now().strftime("%H:%M:%S"), "camera": camera["name"], "people": gdata["member_count"], "duration": round(gdata["last_seen"]-gdata["first_seen"],1), "image": path})
state["alerts"] = state["alerts"][:50]
# Overlays
display = frame.copy()
cv2.putText(display, camera["name"], (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
for x1, y1, x2, y2, conf in person_boxes:
cv2.rectangle(display, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
for gid, grp_indices, gc in frame_group_data:
cv2.circle(display, (int(gc[0]), int(gc[1])), 50, (0, 165, 255), 2)
with lock:
state["cameras"][cam_id].update({
"frame": display,
"people_count": len(person_boxes),
"groups": [{"id": gid, "count": len(gi), "duration": round(group_trackers[gid]["last_seen"]-group_trackers[gid]["first_seen"], 1)} for gid, gi, gc in frame_group_data],
"alert_active": alert_this_frame,
"fps": round(fps, 1)
})
# ---------------------------------------------------------------------------
# Grid View Generator
# ---------------------------------------------------------------------------
def _update_grid_frame():
while True:
frames = []
with lock:
for cam_id in state["cameras"]:
if state["cameras"][cam_id]["frame"] is not None:
frames.append(cv2.resize(state["cameras"][cam_id]["frame"], (640, 480)))
if not frames:
time.sleep(0.1); continue
n = len(frames)
if n == 1: grid = frames[0]
elif n == 2: grid = np.hstack(frames)
else:
row1 = np.hstack(frames[:2])
if n == 3: row2 = np.hstack([frames[2], np.zeros_like(frames[0])])
else: row2 = np.hstack(frames[2:4])
grid = np.vstack([row1, row2])
with lock:
state["grid_frame"] = grid
state["total_people_count"] = sum(c["people_count"] for c in state["cameras"].values())
state["alert_active"] = any(c["alert_active"] for c in state["cameras"].values())
time.sleep(0.04)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.route("/")
def dashboard(): return render_template("dashboard.html")
@app.route("/video_feed")
def video_feed():
def gen():
while True:
with lock:
frame = state["grid_frame"]
if frame is not None:
_, buf = cv2.imencode(".jpg", frame, [cv2.IMWRITE_JPEG_QUALITY, 80])
yield (b"--frame\r\nContent-Type: image/jpeg\r\n\r\n" + buf.tobytes() + b"\r\n")
time.sleep(0.04)
return Response(gen(), mimetype="multipart/x-mixed-replace; boundary=frame")
@app.route("/api/status")
def api_status():
with lock:
cams_info = {cid: {k:v for k,v in c.items() if k != "frame"} for cid, c in state["cameras"].items()}
return jsonify({
"total_people_count": state["total_people_count"],
"cameras": cams_info,
"alert_active": state["alert_active"],
"alerts": state["alerts"][:20]
})
@app.route("/api/cameras")
def api_cameras():
return jsonify({"cameras": [{"id": c["id"], "name": c["name"], "ip": c["ip"]} for c in CAMERAS]})
@app.route("/alerts/<path:filename>")
def serve_alert_image(filename): return send_from_directory("alerts", filename)
if __name__ == "__main__":
for cam in CAMERAS: threading.Thread(target=_process_stream, args=(cam,), daemon=True).start()
threading.Thread(target=_update_grid_frame, daemon=True).start()
print("\n Grid View Dashboard → http://localhost:5000\n")
app.run(host="0.0.0.0", port=5000, debug=False, threaded=True)