first commit

This commit is contained in:
2025-11-07 23:30:56 -06:00
commit 8059a72988
5 changed files with 496611 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@@ -0,0 +1,7 @@
venv/**
venv2/**
*.mp4
*.jpg
*.avi
mediapipe/**
*.pt

1
.python-version Normal file
View File

@@ -0,0 +1 @@
handraise311

460849
events.json Normal file

File diff suppressed because it is too large Load Diff

35468
events.ndjson Normal file

File diff suppressed because it is too large Load Diff

286
main.py Normal file
View File

@@ -0,0 +1,286 @@
#!/usr/bin/env python3
"""
Hand-raise detector using YOLOv8-pose (no MediaPipe).
Requirements:
pip install ultralytics opencv-python torch
Supports Python 3.13+, GPU acceleration if available.
"""
import os
import sys
import json
import queue
import threading
import time
from datetime import timedelta
from collections import defaultdict
import cv2
import numpy as np
import torch
from ultralytics import YOLO
# TODO: wtf is going on with the 35468 events
# --------------------
# Config
# --------------------
VIDEO_PATH = "input.mp4"
OUT_PATH = "out.avi"
EVENT_JSON = "events.json"
EVENT_ND = "events.ndjson"
MODEL_NAME = "yolov8m-pose.pt"
IMG_SIZE = 1280 # Larger image size for better detection
CONF_THRES = 0.05 # Very low threshold
IOU = 0.45
KP_CONF_THRES = 0.1 # Very low keypoint threshold
SUSTAIN_FRAMES = 3
DRAW_SKELETON = False
TRACKER = True
FRAME_QUEUE_SIZE = 8
# ---------------------
# Utilities
# ---------------------
def format_time(seconds: float) -> str:
td = timedelta(seconds=seconds)
t = str(td)
if '.' not in t:
t += '.000'
else:
sec, ms = t.split('.')
t = f"{sec}.{ms[:3].ljust(3,'0')}"
return t
def get_bbox_from_keypoints(kpts, width, height, conf_th=KP_CONF_THRES):
if kpts is None or kpts.size == 0:
return None
vis = kpts[:, 2] >= conf_th
if not vis.any():
return None
xs = (kpts[vis, 0] * width).astype(int)
ys = (kpts[vis, 1] * height).astype(int)
x1, y1, x2, y2 = int(xs.min()), int(ys.min()), int(xs.max()), int(ys.max())
pad_x = max(4, int(0.05 * (x2 - x1 + 1)))
pad_y = max(4, int(0.05 * (y2 - y1 + 1)))
return {"x": max(0, x1 - pad_x),
"y": max(0, y1 - pad_y),
"w": (x2 - x1 + 2*pad_x),
"h": (y2 - y1 + 2*pad_y)}
def is_hand_raised(kpts, conf_th=KP_CONF_THRES):
if kpts is None or kpts.size == 0:
return False
try:
def check(s_idx, e_idx, w_idx):
s, e, w = kpts[s_idx], kpts[e_idx], kpts[w_idx]
# Only require wrist to be visible for partial bodies
if w[2] < conf_th:
return False
# If shoulder visible, check if wrist is above it
if s[2] >= conf_th:
return w[1] < s[1]
# If elbow visible, check if wrist is above it
if e[2] >= conf_th:
return w[1] < e[1]
# If only wrist visible, check if it's in upper portion of frame
return w[1] < 0.4
left = check(5,7,9)
right = check(6,8,10)
return left or right
except:
return False
# ---------------------
# Frame reader
# ---------------------
def frame_reader_worker(video_path, q: queue.Queue, stop_event: threading.Event):
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
stop_event.set()
q.put(None)
return
while not stop_event.is_set():
ret, frame = cap.read()
if not ret:
break
frame = np.ascontiguousarray(frame)
q.put(frame)
cap.release()
q.put(None)
# ---------------------
# Main pipeline
# ---------------------
def main(video_path=VIDEO_PATH):
if not os.path.exists(video_path):
raise FileNotFoundError(video_path)
cap_probe = cv2.VideoCapture(video_path)
if not cap_probe.isOpened():
raise RuntimeError("Cannot open video")
fps = cap_probe.get(cv2.CAP_PROP_FPS) or 30.0
width = int(cap_probe.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap_probe.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap_probe.get(cv2.CAP_PROP_FRAME_COUNT))
cap_probe.release()
print(f"[INFO] Video: {width}x{height} @ {fps:.2f}fps, {total_frames} frames")
fourcc = cv2.VideoWriter_fourcc(*"XVID") # XVID codec for AVI
out = cv2.VideoWriter(OUT_PATH, fourcc, fps, (width, height))
if not out.isOpened():
raise RuntimeError("Failed to create output video file")
device = "cuda:0" if torch.cuda.is_available() else "cpu"
print(f"[INFO] Using device: {device}")
model = YOLO(MODEL_NAME)
model.to(device)
print(f"[INFO] Model loaded: {MODEL_NAME}")
# Start frame reader thread
fq = queue.Queue(maxsize=FRAME_QUEUE_SIZE)
stop_event = threading.Event()
reader = threading.Thread(target=frame_reader_worker, args=(video_path,fq,stop_event), daemon=True)
reader.start()
raised_state = defaultdict(int)
hold_counter = defaultdict(int)
events = []
frame_idx = 0
last_progress = -1
saved_test_frame = False
print("[INFO] Processing frames...")
print("Progress: [" + " " * 50 + "] 0%", end="\r")
while True:
frame = fq.get()
if frame is None:
break
orig_frame = frame.copy()
# Save a test frame to verify video is readable
if not saved_test_frame and frame_idx == 10:
cv2.imwrite("test_frame.jpg", frame)
print(f"\n[DEBUG] Saved test frame: {frame.shape}, dtype: {frame.dtype}")
saved_test_frame = True
# Try detection with original BGR frame (YOLO can handle BGR)
results = model.predict(frame, imgsz=IMG_SIZE, conf=CONF_THRES, iou=IOU, verbose=False, classes=[0])
keypoints_list = []
ids = []
# Debug: Check what YOLO detected
detections_found = False
for r in results:
# Check if ANY detections exist
if hasattr(r, 'boxes') and r.boxes is not None and len(r.boxes) > 0:
print(f"\n[DEBUG] Frame {frame_idx}: Found {len(r.boxes)} boxes", end="")
if hasattr(r, "keypoints") and r.keypoints is not None:
try:
kps = r.keypoints.data.cpu().numpy() if hasattr(r.keypoints.data, "cpu") else np.asarray(r.keypoints.data)
print(f" | Keypoints shape: {kps.shape}", end="")
if kps.ndim==3 and kps.shape[0] > 0:
detections_found = True
for i in range(kps.shape[0]):
keypoints_list.append(kps[i].astype(np.float32))
ids.append(i)
print(f" | Person {i} keypoints: {np.sum(kps[i][:,2] > KP_CONF_THRES)}/17", end="")
except Exception as e:
print(f" | Error: {e}", end="")
continue
# Debug logging every 30 frames
if frame_idx % 30 == 0 and frame_idx > 0:
print(f"\n[DEBUG] Frame {frame_idx}: Detected {len(keypoints_list)} people", end="")
for local_idx, kpts in enumerate(keypoints_list):
pid = ids[local_idx]
raised = is_hand_raised(kpts)
bbox = get_bbox_from_keypoints(kpts, width, height)
prev = raised_state.get(pid,0)
counter = hold_counter.get(pid,0)
counter = counter+1 if raised else 0
sustained = counter>=SUSTAIN_FRAMES
raised_state[pid] = 1 if sustained else 0
hold_counter[pid] = counter
timestamp = format_time(frame_idx/fps)
if sustained and prev==0:
events.append({"id":pid,"event":"hand_raise_start","frame":frame_idx,"time_seconds":round(frame_idx/fps,3),"timestamp":timestamp,"bbox":bbox})
if not sustained and prev==1:
events.append({"id":pid,"event":"hand_raise_end","frame":frame_idx,"time_seconds":round(frame_idx/fps,3),"timestamp":timestamp,"bbox":bbox})
# Always draw bounding box for detected people
if bbox is not None:
x,y,w,h = bbox["x"],bbox["y"],bbox["w"],bbox["h"]
color = (0,255,0) if sustained else (255,0,0) # Green if hand up, Blue if not
thickness = 3 if sustained else 2
cv2.rectangle(orig_frame,(x,y),(x+w,y+h),color,thickness)
label = f"ID {pid} {'HAND UP' if sustained else 'detected'}"
cv2.putText(orig_frame,label,(max(0,x),max(20,y-10)),cv2.FONT_HERSHEY_SIMPLEX,0.7,color,2)
if DRAW_SKELETON:
try:
orig_frame = results[0].plot()
except:
pass
# Always draw a frame counter for debugging
cv2.putText(orig_frame, f"Frame: {frame_idx} | People: {len(keypoints_list)}",
(10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 0), 2)
out.write(orig_frame)
frame_idx += 1
if total_frames > 0:
progress = int((frame_idx / total_frames) * 100)
if progress != last_progress:
last_progress = progress
filled = int((frame_idx / total_frames) * 50)
bar = "" * filled + " " * (50 - filled)
print(f"Progress: [{bar}] {progress}% ({frame_idx}/{total_frames})", end="\r")
stop_event.set()
reader.join(timeout=1.0)
out.release()
print(f"\n[INFO] Total frames processed: {frame_idx}")
print(f"[INFO] Total people detections: {sum(1 for e in events if 'start' in e['event'])}")
print("[INFO] Writing event logs...")
# save events
output = {"video_fps": fps, "events": events}
with open(EVENT_JSON,"w") as f:
json.dump(output,f,indent=2)
with open(EVENT_ND,"w") as f:
for e in events:
f.write(json.dumps(e)+"\n")
print(f"[DONE] {len(events)} events logged in {EVENT_JSON}, video -> {OUT_PATH}")
# ---------------------
# CLI
# ---------------------
if __name__=="__main__":
if len(sys.argv)>1:
VIDEO_PATH = sys.argv[1]
start_t = time.time()
try:
main(VIDEO_PATH)
except Exception as e:
print("[FATAL]",str(e),file=sys.stderr)
sys.exit(1)
finally:
print(f"[TOTAL] elapsed {time.time()-start_t:.2f}s")