339 lines
13 KiB
Python
339 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Hand-raise detector using YOLOv8 object detection + pose on upper body crops.
|
|
|
|
Requirements:
|
|
pip install ultralytics opencv-python torch
|
|
|
|
Supports Python 3.13+, GPU acceleration if available.
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import queue
|
|
import threading
|
|
import time
|
|
from datetime import timedelta
|
|
from collections import defaultdict
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import torch
|
|
from ultralytics import YOLO
|
|
|
|
# ---------------------
|
|
# Config
|
|
# ---------------------
|
|
VIDEO_PATH = "input.mp4"
|
|
OUT_PATH = "out.avi"
|
|
EVENT_JSON = "events.json"
|
|
EVENT_ND = "events.ndjson"
|
|
|
|
DETECT_MODEL = "yolov8n.pt" # Fast object detection
|
|
POSE_MODEL = "yolov8n-pose.pt" # Pose estimation
|
|
CONF_THRES = 0.3
|
|
UPPER_BODY_RATIO = 0.5 # Use top 50% of detected person
|
|
SUSTAIN_FRAMES = 15 # ~0.5 seconds at 30fps - more strict
|
|
FRAME_QUEUE_SIZE = 8
|
|
|
|
# ---------------------
|
|
# Utilities
|
|
# ---------------------
|
|
def format_time(seconds: float) -> str:
|
|
td = timedelta(seconds=seconds)
|
|
t = str(td)
|
|
if '.' not in t:
|
|
t += '.000'
|
|
else:
|
|
sec, ms = t.split('.')
|
|
t = f"{sec}.{ms[:3].ljust(3,'0')}"
|
|
return t
|
|
|
|
def is_hand_raised(kpts, conf_th=0.4):
|
|
"""
|
|
Check if hand is raised with stricter criteria.
|
|
Keypoints: 5=left_shoulder, 6=right_shoulder, 7=left_elbow, 8=right_elbow, 9=left_wrist, 10=right_wrist
|
|
"""
|
|
if kpts is None or kpts.size == 0:
|
|
return False
|
|
try:
|
|
def check_side(s_idx, e_idx, w_idx):
|
|
shoulder = kpts[s_idx]
|
|
elbow = kpts[e_idx]
|
|
wrist = kpts[w_idx]
|
|
|
|
# All three points must be visible for reliable detection
|
|
if shoulder[2] < conf_th or elbow[2] < conf_th or wrist[2] < conf_th:
|
|
return False
|
|
|
|
# Wrist must be significantly above shoulder (not just barely)
|
|
vertical_threshold = 0.15 # Wrist must be 15% of frame height above shoulder
|
|
if not (wrist[1] < shoulder[1] - vertical_threshold):
|
|
return False
|
|
|
|
# Elbow should be between shoulder and wrist (arm is extended upward)
|
|
# This prevents detecting hands near face as "raised"
|
|
if not (shoulder[1] > elbow[1] > wrist[1]):
|
|
return False
|
|
|
|
# Wrist and elbow should be roughly aligned horizontally (not reaching across body)
|
|
horizontal_distance = abs(wrist[0] - elbow[0])
|
|
if horizontal_distance > 0.2: # Too far apart horizontally
|
|
return False
|
|
|
|
return True
|
|
|
|
left = check_side(5, 7, 9) # left shoulder, elbow, wrist
|
|
right = check_side(6, 8, 10) # right shoulder, elbow, wrist
|
|
return left or right
|
|
except:
|
|
return False
|
|
|
|
# ---------------------
|
|
# Frame reader
|
|
# ---------------------
|
|
def frame_reader_worker(video_path, q: queue.Queue, stop_event: threading.Event):
|
|
cap = cv2.VideoCapture(video_path)
|
|
if not cap.isOpened():
|
|
stop_event.set()
|
|
q.put(None)
|
|
return
|
|
while not stop_event.is_set():
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
frame = np.ascontiguousarray(frame)
|
|
q.put(frame)
|
|
cap.release()
|
|
q.put(None)
|
|
|
|
# ---------------------
|
|
# Main pipeline
|
|
# ---------------------
|
|
def main(video_path=VIDEO_PATH):
|
|
if not os.path.exists(video_path):
|
|
raise FileNotFoundError(video_path)
|
|
|
|
cap_probe = cv2.VideoCapture(video_path)
|
|
if not cap_probe.isOpened():
|
|
raise RuntimeError("Cannot open video")
|
|
fps = cap_probe.get(cv2.CAP_PROP_FPS) or 30.0
|
|
width = int(cap_probe.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
height = int(cap_probe.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
total_frames = int(cap_probe.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
cap_probe.release()
|
|
|
|
print(f"[INFO] Video: {width}x{height} @ {fps:.2f}fps, {total_frames} frames")
|
|
|
|
fourcc = cv2.VideoWriter_fourcc(*"XVID")
|
|
out = cv2.VideoWriter(OUT_PATH, fourcc, fps, (width, height))
|
|
|
|
if not out.isOpened():
|
|
raise RuntimeError("Failed to create output video file")
|
|
|
|
device = "cuda:0" if torch.cuda.is_available() else "cpu"
|
|
print(f"[INFO] Using device: {device}")
|
|
|
|
# Load both models
|
|
detect_model = YOLO(DETECT_MODEL)
|
|
pose_model = YOLO(POSE_MODEL)
|
|
detect_model.to(device)
|
|
pose_model.to(device)
|
|
print(f"[INFO] Models loaded: {DETECT_MODEL} + {POSE_MODEL}")
|
|
|
|
# Start frame reader thread
|
|
fq = queue.Queue(maxsize=FRAME_QUEUE_SIZE)
|
|
stop_event = threading.Event()
|
|
reader = threading.Thread(target=frame_reader_worker, args=(video_path,fq,stop_event), daemon=True)
|
|
reader.start()
|
|
|
|
raised_state = defaultdict(int)
|
|
hold_counter = defaultdict(int)
|
|
events = []
|
|
frame_idx = 0
|
|
last_progress = -1
|
|
|
|
print("[INFO] Processing frames...")
|
|
print("Progress: [" + " " * 50 + "] 0%", end="\r")
|
|
|
|
while True:
|
|
frame = fq.get()
|
|
if frame is None:
|
|
break
|
|
|
|
orig_frame = frame.copy()
|
|
|
|
# Step 1: Detect people using object detection
|
|
detect_results = detect_model.predict(frame, conf=CONF_THRES, classes=[0], verbose=False)
|
|
|
|
people_detected = 0
|
|
|
|
for det_result in detect_results:
|
|
if det_result.boxes is None or len(det_result.boxes) == 0:
|
|
continue
|
|
|
|
boxes = det_result.boxes.xyxy.cpu().numpy()
|
|
people_detected = len(boxes)
|
|
|
|
# Process each detected person
|
|
for pid, box in enumerate(boxes):
|
|
x1, y1, x2, y2 = map(int, box[:4])
|
|
|
|
# Calculate upper body region (top portion of bbox)
|
|
person_height = y2 - y1
|
|
upper_body_height = int(person_height * UPPER_BODY_RATIO)
|
|
upper_y2 = y1 + upper_body_height
|
|
|
|
# Expand bbox slightly for better context
|
|
margin = int(person_height * 0.1)
|
|
crop_x1 = max(0, x1 - margin)
|
|
crop_y1 = max(0, y1 - margin)
|
|
crop_x2 = min(width, x2 + margin)
|
|
crop_y2 = min(height, upper_y2 + margin)
|
|
|
|
# Crop upper body region
|
|
upper_body_crop = frame[crop_y1:crop_y2, crop_x1:crop_x2]
|
|
|
|
if upper_body_crop.size == 0:
|
|
continue
|
|
|
|
# Step 2: Run pose estimation on cropped upper body
|
|
pose_results = pose_model.predict(upper_body_crop, conf=0.2, verbose=False)
|
|
|
|
has_keypoints = False
|
|
for pose_result in pose_results:
|
|
if not hasattr(pose_result, "keypoints") or pose_result.keypoints is None:
|
|
continue
|
|
|
|
kps_data = pose_result.keypoints.data.cpu().numpy() if hasattr(pose_result.keypoints.data, "cpu") else np.asarray(pose_result.keypoints.data)
|
|
|
|
if kps_data.ndim == 3 and kps_data.shape[0] > 0:
|
|
has_keypoints = True
|
|
kpts = kps_data[0] # Take first person in crop
|
|
|
|
raised = is_hand_raised(kpts)
|
|
prev = raised_state.get(pid, 0)
|
|
counter = hold_counter.get(pid, 0)
|
|
counter = counter + 1 if raised else 0
|
|
sustained = counter >= SUSTAIN_FRAMES
|
|
|
|
# Require cooldown period before allowing another raise event
|
|
# This prevents flickering detections
|
|
if not raised and prev == 1:
|
|
counter = -5 # Cooldown for 5 frames
|
|
|
|
raised_state[pid] = 1 if sustained else 0
|
|
hold_counter[pid] = counter
|
|
timestamp = format_time(frame_idx/fps)
|
|
|
|
bbox = {"x": x1, "y": y1, "w": x2-x1, "h": y2-y1}
|
|
|
|
if sustained and prev == 0:
|
|
events.append({"id": pid, "event": "hand_raise_start", "frame": frame_idx,
|
|
"time_seconds": round(frame_idx/fps, 3), "timestamp": timestamp, "bbox": bbox})
|
|
if not sustained and prev == 1:
|
|
events.append({"id": pid, "event": "hand_raise_end", "frame": frame_idx,
|
|
"time_seconds": round(frame_idx/fps, 3), "timestamp": timestamp, "bbox": bbox})
|
|
|
|
# Draw on original frame
|
|
color = (0, 255, 0) if sustained else (255, 0, 0)
|
|
thickness = 3 if sustained else 2
|
|
cv2.rectangle(orig_frame, (x1, y1), (x2, y2), color, thickness)
|
|
|
|
# Draw upper body region
|
|
cv2.rectangle(orig_frame, (crop_x1, crop_y1), (crop_x2, crop_y2), (255, 255, 0), 1)
|
|
|
|
# Draw skeleton on original frame
|
|
# Convert keypoints from crop coordinates to original frame coordinates
|
|
crop_height, crop_width = upper_body_crop.shape[:2]
|
|
for i, kp in enumerate(kpts):
|
|
if kp[2] > 0.3: # If keypoint is visible
|
|
# Convert from normalized coords to crop coords to frame coords
|
|
kp_x = int(kp[0] * crop_width + crop_x1)
|
|
kp_y = int(kp[1] * crop_height + crop_y1)
|
|
cv2.circle(orig_frame, (kp_x, kp_y), 4, (0, 255, 255), -1)
|
|
|
|
# Draw skeleton connections (upper body only)
|
|
connections = [
|
|
(5, 6), # shoulders
|
|
(5, 7), # left shoulder to elbow
|
|
(7, 9), # left elbow to wrist
|
|
(6, 8), # right shoulder to elbow
|
|
(8, 10), # right elbow to wrist
|
|
(5, 11), # left shoulder to hip
|
|
(6, 12), # right shoulder to hip
|
|
(11, 12), # hips
|
|
]
|
|
|
|
for conn in connections:
|
|
pt1_idx, pt2_idx = conn
|
|
if kpts[pt1_idx][2] > 0.3 and kpts[pt2_idx][2] > 0.3:
|
|
pt1_x = int(kpts[pt1_idx][0] * crop_width + crop_x1)
|
|
pt1_y = int(kpts[pt1_idx][1] * crop_height + crop_y1)
|
|
pt2_x = int(kpts[pt2_idx][0] * crop_width + crop_x1)
|
|
pt2_y = int(kpts[pt2_idx][1] * crop_height + crop_y1)
|
|
cv2.line(orig_frame, (pt1_x, pt1_y), (pt2_x, pt2_y), (0, 255, 255), 2)
|
|
|
|
label = f"ID {pid} {'HAND UP' if sustained else 'detected'}"
|
|
# Show counter for debugging
|
|
if counter > 0:
|
|
label += f" ({counter}/{SUSTAIN_FRAMES})"
|
|
cv2.putText(orig_frame, label, (max(0, x1), max(20, y1-10)),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2)
|
|
|
|
# Show visible keypoints count
|
|
visible_kpts = np.sum(kpts[:, 2] > 0.3)
|
|
cv2.putText(orig_frame, f"KP: {visible_kpts}/17", (max(0, x1), max(40, y1+15)),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
|
|
break
|
|
|
|
# Frame counter
|
|
cv2.putText(orig_frame, f"Frame: {frame_idx} | People: {people_detected}",
|
|
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 2)
|
|
|
|
out.write(orig_frame)
|
|
frame_idx += 1
|
|
|
|
# Update progress bar
|
|
if total_frames > 0:
|
|
progress = int((frame_idx / total_frames) * 100)
|
|
if progress != last_progress:
|
|
last_progress = progress
|
|
filled = int((frame_idx / total_frames) * 50)
|
|
bar = "█" * filled + " " * (50 - filled)
|
|
print(f"Progress: [{bar}] {progress}% ({frame_idx}/{total_frames})", end="\r")
|
|
|
|
stop_event.set()
|
|
reader.join(timeout=1.0)
|
|
out.release()
|
|
|
|
print(f"\n[INFO] Total frames processed: {frame_idx}")
|
|
print(f"[INFO] Total hand raise events: {sum(1 for e in events if 'start' in e['event'])}")
|
|
print("[INFO] Writing event logs...")
|
|
|
|
# save events
|
|
output = {"video_fps": fps, "events": events}
|
|
with open(EVENT_JSON, "w") as f:
|
|
json.dump(output, f, indent=2)
|
|
with open(EVENT_ND, "w") as f:
|
|
for e in events:
|
|
f.write(json.dumps(e)+"\n")
|
|
|
|
print(f"[DONE] {len(events)} events logged in {EVENT_JSON}, video -> {OUT_PATH}")
|
|
|
|
# ---------------------
|
|
# CLI
|
|
# ---------------------
|
|
if __name__=="__main__":
|
|
if len(sys.argv)>1:
|
|
VIDEO_PATH = sys.argv[1]
|
|
start_t = time.time()
|
|
try:
|
|
main(VIDEO_PATH)
|
|
except Exception as e:
|
|
print("[FATAL]", str(e), file=sys.stderr)
|
|
sys.exit(1)
|
|
finally:
|
|
print(f"[TOTAL] elapsed {time.time()-start_t:.2f}s")
|