Files
CoGuide-Miniproj/main.py
2025-11-08 18:22:16 -06:00

339 lines
13 KiB
Python

#!/usr/bin/env python3
"""
Hand-raise detector using YOLOv8 object detection + pose on upper body crops.
Requirements:
pip install ultralytics opencv-python torch
Supports Python 3.13+, GPU acceleration if available.
"""
import os
import sys
import json
import queue
import threading
import time
from datetime import timedelta
from collections import defaultdict
import cv2
import numpy as np
import torch
from ultralytics import YOLO
# ---------------------
# Config
# ---------------------
VIDEO_PATH = "input.mp4"
OUT_PATH = "out.avi"
EVENT_JSON = "events.json"
EVENT_ND = "events.ndjson"
DETECT_MODEL = "yolov8n.pt" # Fast object detection
POSE_MODEL = "yolov8n-pose.pt" # Pose estimation
CONF_THRES = 0.3
UPPER_BODY_RATIO = 0.5 # Use top 50% of detected person
SUSTAIN_FRAMES = 15 # ~0.5 seconds at 30fps - more strict
FRAME_QUEUE_SIZE = 8
# ---------------------
# Utilities
# ---------------------
def format_time(seconds: float) -> str:
td = timedelta(seconds=seconds)
t = str(td)
if '.' not in t:
t += '.000'
else:
sec, ms = t.split('.')
t = f"{sec}.{ms[:3].ljust(3,'0')}"
return t
def is_hand_raised(kpts, conf_th=0.4):
"""
Check if hand is raised with stricter criteria.
Keypoints: 5=left_shoulder, 6=right_shoulder, 7=left_elbow, 8=right_elbow, 9=left_wrist, 10=right_wrist
"""
if kpts is None or kpts.size == 0:
return False
try:
def check_side(s_idx, e_idx, w_idx):
shoulder = kpts[s_idx]
elbow = kpts[e_idx]
wrist = kpts[w_idx]
# All three points must be visible for reliable detection
if shoulder[2] < conf_th or elbow[2] < conf_th or wrist[2] < conf_th:
return False
# Wrist must be significantly above shoulder (not just barely)
vertical_threshold = 0.15 # Wrist must be 15% of frame height above shoulder
if not (wrist[1] < shoulder[1] - vertical_threshold):
return False
# Elbow should be between shoulder and wrist (arm is extended upward)
# This prevents detecting hands near face as "raised"
if not (shoulder[1] > elbow[1] > wrist[1]):
return False
# Wrist and elbow should be roughly aligned horizontally (not reaching across body)
horizontal_distance = abs(wrist[0] - elbow[0])
if horizontal_distance > 0.2: # Too far apart horizontally
return False
return True
left = check_side(5, 7, 9) # left shoulder, elbow, wrist
right = check_side(6, 8, 10) # right shoulder, elbow, wrist
return left or right
except:
return False
# ---------------------
# Frame reader
# ---------------------
def frame_reader_worker(video_path, q: queue.Queue, stop_event: threading.Event):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
stop_event.set()
q.put(None)
return
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
break
frame = np.ascontiguousarray(frame)
q.put(frame)
cap.release()
q.put(None)
# ---------------------
# Main pipeline
# ---------------------
def main(video_path=VIDEO_PATH):
if not os.path.exists(video_path):
raise FileNotFoundError(video_path)
cap_probe = cv2.VideoCapture(video_path)
if not cap_probe.isOpened():
raise RuntimeError("Cannot open video")
fps = cap_probe.get(cv2.CAP_PROP_FPS) or 30.0
width = int(cap_probe.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap_probe.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap_probe.get(cv2.CAP_PROP_FRAME_COUNT))
cap_probe.release()
print(f"[INFO] Video: {width}x{height} @ {fps:.2f}fps, {total_frames} frames")
fourcc = cv2.VideoWriter_fourcc(*"XVID")
out = cv2.VideoWriter(OUT_PATH, fourcc, fps, (width, height))
if not out.isOpened():
raise RuntimeError("Failed to create output video file")
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(f"[INFO] Using device: {device}")
# Load both models
detect_model = YOLO(DETECT_MODEL)
pose_model = YOLO(POSE_MODEL)
detect_model.to(device)
pose_model.to(device)
print(f"[INFO] Models loaded: {DETECT_MODEL} + {POSE_MODEL}")
# Start frame reader thread
fq = queue.Queue(maxsize=FRAME_QUEUE_SIZE)
stop_event = threading.Event()
reader = threading.Thread(target=frame_reader_worker, args=(video_path,fq,stop_event), daemon=True)
reader.start()
raised_state = defaultdict(int)
hold_counter = defaultdict(int)
events = []
frame_idx = 0
last_progress = -1
print("[INFO] Processing frames...")
print("Progress: [" + " " * 50 + "] 0%", end="\r")
while True:
frame = fq.get()
if frame is None:
break
orig_frame = frame.copy()
# Step 1: Detect people using object detection
detect_results = detect_model.predict(frame, conf=CONF_THRES, classes=[0], verbose=False)
people_detected = 0
for det_result in detect_results:
if det_result.boxes is None or len(det_result.boxes) == 0:
continue
boxes = det_result.boxes.xyxy.cpu().numpy()
people_detected = len(boxes)
# Process each detected person
for pid, box in enumerate(boxes):
x1, y1, x2, y2 = map(int, box[:4])
# Calculate upper body region (top portion of bbox)
person_height = y2 - y1
upper_body_height = int(person_height * UPPER_BODY_RATIO)
upper_y2 = y1 + upper_body_height
# Expand bbox slightly for better context
margin = int(person_height * 0.1)
crop_x1 = max(0, x1 - margin)
crop_y1 = max(0, y1 - margin)
crop_x2 = min(width, x2 + margin)
crop_y2 = min(height, upper_y2 + margin)
# Crop upper body region
upper_body_crop = frame[crop_y1:crop_y2, crop_x1:crop_x2]
if upper_body_crop.size == 0:
continue
# Step 2: Run pose estimation on cropped upper body
pose_results = pose_model.predict(upper_body_crop, conf=0.2, verbose=False)
has_keypoints = False
for pose_result in pose_results:
if not hasattr(pose_result, "keypoints") or pose_result.keypoints is None:
continue
kps_data = pose_result.keypoints.data.cpu().numpy() if hasattr(pose_result.keypoints.data, "cpu") else np.asarray(pose_result.keypoints.data)
if kps_data.ndim == 3 and kps_data.shape[0] > 0:
has_keypoints = True
kpts = kps_data[0] # Take first person in crop
raised = is_hand_raised(kpts)
prev = raised_state.get(pid, 0)
counter = hold_counter.get(pid, 0)
counter = counter + 1 if raised else 0
sustained = counter >= SUSTAIN_FRAMES
# Require cooldown period before allowing another raise event
# This prevents flickering detections
if not raised and prev == 1:
counter = -5 # Cooldown for 5 frames
raised_state[pid] = 1 if sustained else 0
hold_counter[pid] = counter
timestamp = format_time(frame_idx/fps)
bbox = {"x": x1, "y": y1, "w": x2-x1, "h": y2-y1}
if sustained and prev == 0:
events.append({"id": pid, "event": "hand_raise_start", "frame": frame_idx,
"time_seconds": round(frame_idx/fps, 3), "timestamp": timestamp, "bbox": bbox})
if not sustained and prev == 1:
events.append({"id": pid, "event": "hand_raise_end", "frame": frame_idx,
"time_seconds": round(frame_idx/fps, 3), "timestamp": timestamp, "bbox": bbox})
# Draw on original frame
color = (0, 255, 0) if sustained else (255, 0, 0)
thickness = 3 if sustained else 2
cv2.rectangle(orig_frame, (x1, y1), (x2, y2), color, thickness)
# Draw upper body region
cv2.rectangle(orig_frame, (crop_x1, crop_y1), (crop_x2, crop_y2), (255, 255, 0), 1)
# Draw skeleton on original frame
# Convert keypoints from crop coordinates to original frame coordinates
crop_height, crop_width = upper_body_crop.shape[:2]
for i, kp in enumerate(kpts):
if kp[2] > 0.3: # If keypoint is visible
# Convert from normalized coords to crop coords to frame coords
kp_x = int(kp[0] * crop_width + crop_x1)
kp_y = int(kp[1] * crop_height + crop_y1)
cv2.circle(orig_frame, (kp_x, kp_y), 4, (0, 255, 255), -1)
# Draw skeleton connections (upper body only)
connections = [
(5, 6), # shoulders
(5, 7), # left shoulder to elbow
(7, 9), # left elbow to wrist
(6, 8), # right shoulder to elbow
(8, 10), # right elbow to wrist
(5, 11), # left shoulder to hip
(6, 12), # right shoulder to hip
(11, 12), # hips
]
for conn in connections:
pt1_idx, pt2_idx = conn
if kpts[pt1_idx][2] > 0.3 and kpts[pt2_idx][2] > 0.3:
pt1_x = int(kpts[pt1_idx][0] * crop_width + crop_x1)
pt1_y = int(kpts[pt1_idx][1] * crop_height + crop_y1)
pt2_x = int(kpts[pt2_idx][0] * crop_width + crop_x1)
pt2_y = int(kpts[pt2_idx][1] * crop_height + crop_y1)
cv2.line(orig_frame, (pt1_x, pt1_y), (pt2_x, pt2_y), (0, 255, 255), 2)
label = f"ID {pid} {'HAND UP' if sustained else 'detected'}"
# Show counter for debugging
if counter > 0:
label += f" ({counter}/{SUSTAIN_FRAMES})"
cv2.putText(orig_frame, label, (max(0, x1), max(20, y1-10)),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
# Show visible keypoints count
visible_kpts = np.sum(kpts[:, 2] > 0.3)
cv2.putText(orig_frame, f"KP: {visible_kpts}/17", (max(0, x1), max(40, y1+15)),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
break
# Frame counter
cv2.putText(orig_frame, f"Frame: {frame_idx} | People: {people_detected}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 2)
out.write(orig_frame)
frame_idx += 1
# Update progress bar
if total_frames > 0:
progress = int((frame_idx / total_frames) * 100)
if progress != last_progress:
last_progress = progress
filled = int((frame_idx / total_frames) * 50)
bar = "" * filled + " " * (50 - filled)
print(f"Progress: [{bar}] {progress}% ({frame_idx}/{total_frames})", end="\r")
stop_event.set()
reader.join(timeout=1.0)
out.release()
print(f"\n[INFO] Total frames processed: {frame_idx}")
print(f"[INFO] Total hand raise events: {sum(1 for e in events if 'start' in e['event'])}")
print("[INFO] Writing event logs...")
# save events
output = {"video_fps": fps, "events": events}
with open(EVENT_JSON, "w") as f:
json.dump(output, f, indent=2)
with open(EVENT_ND, "w") as f:
for e in events:
f.write(json.dumps(e)+"\n")
print(f"[DONE] {len(events)} events logged in {EVENT_JSON}, video -> {OUT_PATH}")
# ---------------------
# CLI
# ---------------------
if __name__=="__main__":
if len(sys.argv)>1:
VIDEO_PATH = sys.argv[1]
start_t = time.time()
try:
main(VIDEO_PATH)
except Exception as e:
print("[FATAL]", str(e), file=sys.stderr)
sys.exit(1)
finally:
print(f"[TOTAL] elapsed {time.time()-start_t:.2f}s")