From 2930ff2f2f43dc4e21a3b2f3dc2089fd780961a2 Mon Sep 17 00:00:00 2001 From: Stupdi Go Date: Sat, 10 Jan 2026 21:41:25 -0600 Subject: [PATCH] Initial Commit --- .DS_Store | Bin 0 -> 6148 bytes .gitignore | 2 + .idea/.gitignore | 8 + .idea/ASLtranslator.iml | 10 + .../inspectionProfiles/profiles_settings.xml | 6 + .idea/misc.xml | 7 + .idea/modules.xml | 8 + .idea/vcs.xml | 6 + test.py | 443 ++++++++++++++++ training.py | 500 ++++++++++++++++++ 10 files changed, 990 insertions(+) create mode 100644 .DS_Store create mode 100644 .gitignore create mode 100644 .idea/.gitignore create mode 100644 .idea/ASLtranslator.iml create mode 100644 .idea/inspectionProfiles/profiles_settings.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/vcs.xml create mode 100644 test.py create mode 100644 training.py diff --git a/.DS_Store b/.DS_Store new file mode 100644 index 0000000000000000000000000000000000000000..861d7da96a7a5ec5a6403368515bd90b1666a7d4 GIT binary patch literal 6148 zcmZQzU|@7AO)+F(5MW?n;9!8zj35RBCIAV8Fop~hRD=~|9@vZ&hD3%EhD3&9hE%BB zV5dNC=LWkthasK;+5IJ+Ir+&+Ir&M@;NfRrVBr4`1q=)fICSNt8wMxm=N5oG&V28d z2X2+Q`7SO{^Es3*eh*pHdE617>J$=G7i1t@!BQRoa?B_m4S~@R7!85Z5Eu;sNC+@O zC=PB2JxY#-z-S1Jh5$SSK;?r1q;1dO0HqrsG)Rhpk%0l+1z==gV1a331os0NKyo0h zAR43 + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..485f773 --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..7034c41 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/test.py b/test.py new file mode 100644 index 0000000..b759c50 --- /dev/null +++ b/test.py @@ -0,0 +1,443 @@ +import mediapipe as mp +import cv2 +import numpy as np +import torch +import torch.nn as nn +import torch.nn.functional as F +import math + + +# Positional Encoding +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=100): + super(PositionalEncoding, self).__init__() + + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + + pe = pe.unsqueeze(0) + self.register_buffer('pe', pe) + + def forward(self, x): + return x + self.pe[:, :x.size(1), :] + + +# Model architecture +class TransformerCNN_ASL(nn.Module): + def __init__(self, input_dim=77, num_classes=250, d_model=512, nhead=8, num_layers=6, dim_feedforward=2048): + super(TransformerCNN_ASL, self).__init__() + + self.input_dim = input_dim + self.d_model = d_model + + # Input projection + self.input_projection = nn.Linear(input_dim, d_model) + self.input_norm = nn.LayerNorm(d_model) + + # Positional encoding + self.pos_encoder = PositionalEncoding(d_model, max_len=100) + + # Transformer Encoder with Self-Attention + encoder_layer = nn.TransformerEncoderLayer( + d_model=d_model, + nhead=nhead, + dim_feedforward=dim_feedforward, + dropout=0.1, + activation='gelu', + batch_first=True, + norm_first=True + ) + self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) + + # CNN Blocks for pattern detection + self.conv1 = nn.Conv1d(d_model, 1024, kernel_size=3, padding=1) + self.bn1 = nn.BatchNorm1d(1024) + self.pool1 = nn.MaxPool1d(2) + self.dropout1 = nn.Dropout(0.3) + + self.conv2 = nn.Conv1d(1024, 2048, kernel_size=3, padding=1) + self.bn2 = nn.BatchNorm1d(2048) + self.pool2 = nn.MaxPool1d(2) + self.dropout2 = nn.Dropout(0.3) + + self.conv3 = nn.Conv1d(2048, 4096, kernel_size=3, padding=1) + self.bn3 = nn.BatchNorm1d(4096) + self.pool3 = nn.AdaptiveMaxPool1d(1) # Global pooling + self.dropout3 = nn.Dropout(0.4) + + # Fully connected layers + self.fc1 = nn.Linear(4096, 4096) + self.bn_fc1 = nn.BatchNorm1d(4096) + self.dropout_fc1 = nn.Dropout(0.5) + + self.fc2 = nn.Linear(4096, 2048) + self.bn_fc2 = nn.BatchNorm1d(2048) + self.dropout_fc2 = nn.Dropout(0.4) + + self.fc3 = nn.Linear(2048, 1024) + self.bn_fc3 = nn.BatchNorm1d(1024) + self.dropout_fc3 = nn.Dropout(0.3) + + self.fc4 = nn.Linear(1024, num_classes) + + def forward(self, x): + batch_size = x.size(0) + + # Project to d_model + x = self.input_projection(x) + x = self.input_norm(x) + x = x.unsqueeze(1) + + # Add positional encoding + x = self.pos_encoder(x) + + # Transformer encoder with self-attention + x = self.transformer_encoder(x) + + # Reshape for CNN + x = x.permute(0, 2, 1) + + # CNN pattern detection + x = F.gelu(self.bn1(self.conv1(x))) + x = self.pool1(x) + x = self.dropout1(x) + + x = F.gelu(self.bn2(self.conv2(x))) + x = self.pool2(x) + x = self.dropout2(x) + + x = F.gelu(self.bn3(self.conv3(x))) + x = self.pool3(x) + x = self.dropout3(x) + + # Flatten + x = x.view(batch_size, -1) + + # Fully connected layers + x = F.gelu(self.bn_fc1(self.fc1(x))) + x = self.dropout_fc1(x) + + x = F.gelu(self.bn_fc2(self.fc2(x))) + x = self.dropout_fc2(x) + + x = F.gelu(self.bn_fc3(self.fc3(x))) + x = self.dropout_fc3(x) + + x = self.fc4(x) + + return x + + +# Load the trained model +print("Loading model...") +checkpoint = torch.load('asl_kaggle_transformer.pth', map_location='cpu') +label_encoder = checkpoint['label_encoder'] +num_classes = checkpoint['num_classes'] +input_dim = checkpoint['input_dim'] +config = checkpoint['model_config'] + +model = TransformerCNN_ASL( + input_dim=input_dim, + num_classes=num_classes, + d_model=config['d_model'], + nhead=config['nhead'], + num_layers=config['num_layers'], + dim_feedforward=config['dim_feedforward'] +) +model.load_state_dict(checkpoint['model_state_dict']) +model.eval() + +total_params = sum(p.numel() for p in model.parameters()) +print(f"Loaded Transformer+CNN model") +print(f"Total parameters: {total_params:,}") +print(f"Number of ASL signs: {num_classes}") +print(f"Sample signs: {label_encoder.classes_[:10]}") + +# Setup MediaPipe +BaseOptions = mp.tasks.BaseOptions +HandLandmarker = mp.tasks.vision.HandLandmarker +HandLandmarkerOptions = mp.tasks.vision.HandLandmarkerOptions +VisionRunningMode = mp.tasks.vision.RunningMode + +options = HandLandmarkerOptions( + base_options=BaseOptions(model_asset_path='hand_landmarker.task'), + running_mode=VisionRunningMode.VIDEO, + num_hands=1, + min_hand_detection_confidence=0.5, + min_hand_presence_confidence=0.5, + min_tracking_confidence=0.5 +) + +landmarker = HandLandmarker.create_from_options(options) + + +def get_optimized_features(hand_landmarks): + """ + Extract optimally normalized relative coordinates from MediaPipe landmarks + Returns 77 features + """ + # Extract raw coordinates + points = np.array([[lm.x, lm.y, lm.z] for lm in hand_landmarks], dtype=np.float32) + + # Step 1: Translation invariance - center on wrist + wrist = points[0].copy() + points_centered = points - wrist + + # Step 2: Scale invariance - normalize by palm size + palm_size = np.linalg.norm(points[9] - points[0]) # wrist to middle finger base + if palm_size < 1e-6: + palm_size = 1.0 + points_normalized = points_centered / palm_size + + # Step 3: Standardization + mean = np.mean(points_normalized, axis=0) + std = np.std(points_normalized, axis=0) + 1e-8 + points_standardized = (points_normalized - mean) / std + + # Flatten base features (63 features) + features = points_standardized.flatten() + + # Step 4: Derived features + finger_tips = [4, 8, 12, 16, 20] # Thumb, Index, Middle, Ring, Pinky + + # Distances between consecutive fingertips (4 distances) + tip_distances = [] + for i in range(len(finger_tips) - 1): + dist = np.linalg.norm(points_normalized[finger_tips[i]] - points_normalized[finger_tips[i + 1]]) + tip_distances.append(dist) + + # Distance of each fingertip from palm center (5 distances) + palm_center = np.mean(points_normalized[[0, 5, 9, 13, 17]], axis=0) + tip_to_palm = [] + for tip in finger_tips: + dist = np.linalg.norm(points_normalized[tip] - palm_center) + tip_to_palm.append(dist) + + # Finger curl indicators (5 curls) + finger_curls = [] + finger_bases = [1, 5, 9, 13, 17] + for base, tip in zip(finger_bases, finger_tips): + curl = np.linalg.norm(points_normalized[tip] - points_normalized[base]) + finger_curls.append(curl) + + # Combine all features: 63 + 4 + 5 + 5 = 77 + all_features = np.concatenate([ + features, + tip_distances, + tip_to_palm, + finger_curls + ]) + + return all_features.astype(np.float32) + + +# Initialize webcam +cap = cv2.VideoCapture(0) + +if not cap.isOpened(): + print("Error: Cannot open webcam") + exit() + +# Set camera resolution for better performance +cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) +cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) + +frame_count = 0 +fps_counter = 0 +fps_start_time = cv2.getTickCount() +current_fps = 0 + +# Prediction smoothing buffer +from collections import deque + +prediction_buffer = deque(maxlen=10) + +print("\n" + "=" * 60) +print("ASL Recognition - Transformer+CNN Model") +print("=" * 60) +print("Controls:") +print(" ESC - Exit") +print(" SPACE - Clear prediction buffer") +print(" 'h' - Toggle hand landmarks visibility") +print("=" * 60 + "\n") + +show_landmarks = True + +with torch.no_grad(): + while True: + success, image = cap.read() + if not success: + print("Failed to read frame from webcam") + break + + # Flip image horizontally for mirror view + image = cv2.flip(image, 1) + + # Convert to MediaPipe format + mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=image) + + # Detect hands + results = landmarker.detect_for_video(mp_image, frame_count) + frame_count += 1 + + # Calculate FPS + fps_counter += 1 + if fps_counter >= 30: + fps_end_time = cv2.getTickCount() + time_diff = (fps_end_time - fps_start_time) / cv2.getTickFrequency() + current_fps = fps_counter / time_diff + fps_counter = 0 + fps_start_time = cv2.getTickCount() + + # Process hand landmarks if detected + if results.hand_landmarks and len(results.hand_landmarks) > 0: + hand_landmarks = results.hand_landmarks[0] + + # Draw hand landmarks if enabled + if show_landmarks: + # Draw connections + connections = [ + (0, 1), (1, 2), (2, 3), (3, 4), # Thumb + (0, 5), (5, 6), (6, 7), (7, 8), # Index + (0, 9), (9, 10), (10, 11), (11, 12), # Middle + (0, 13), (13, 14), (14, 15), (15, 16), # Ring + (0, 17), (17, 18), (18, 19), (19, 20), # Pinky + (5, 9), (9, 13), (13, 17) # Palm + ] + + # Get image dimensions + h, w = image.shape[:2] + + # Draw connections + for connection in connections: + start_idx, end_idx = connection + start = hand_landmarks[start_idx] + end = hand_landmarks[end_idx] + + start_point = (int(start.x * w), int(start.y * h)) + end_point = (int(end.x * w), int(end.y * h)) + + cv2.line(image, start_point, end_point, (0, 255, 0), 2) + + # Draw landmarks + for i, landmark in enumerate(hand_landmarks): + x = int(landmark.x * w) + y = int(landmark.y * h) + + # Different colors for different parts + if i == 0: # Wrist + color = (255, 0, 0) + radius = 8 + elif i in [4, 8, 12, 16, 20]: # Fingertips + color = (0, 0, 255) + radius = 6 + else: + color = (0, 255, 0) + radius = 4 + + cv2.circle(image, (x, y), radius, color, -1) + cv2.circle(image, (x, y), radius + 2, (255, 255, 255), 1) + + # Extract features + features = get_optimized_features(hand_landmarks) + + # Make prediction + input_tensor = torch.FloatTensor(features).unsqueeze(0) + output = model(input_tensor) + probabilities = torch.softmax(output, dim=1)[0] + + # Get top prediction + predicted_idx = torch.argmax(probabilities).item() + confidence = probabilities[predicted_idx].item() + predicted_sign = label_encoder.inverse_transform([predicted_idx])[0] + + # Add to buffer for smoothing + if confidence > 0.3: # Only add if confident enough + prediction_buffer.append(predicted_sign) + + # Get smoothed prediction (most common in buffer) + if len(prediction_buffer) >= 5: + from collections import Counter + + smoothed_sign = Counter(prediction_buffer).most_common(1)[0][0] + else: + smoothed_sign = predicted_sign + + # Get top 5 predictions + top5_prob, top5_idx = torch.topk(probabilities, min(5, num_classes)) + + # Display prediction area (dark semi-transparent overlay) + overlay = image.copy() + cv2.rectangle(overlay, (10, 10), (500, 280), (0, 0, 0), -1) + cv2.addWeighted(overlay, 0.7, image, 0.3, 0, image) + + # Display main prediction + cv2.putText(image, f"Sign: {smoothed_sign}", + (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3) + cv2.putText(image, f"Confidence: {confidence:.1%}", + (20, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 255), 2) + + # Display top 5 predictions + cv2.putText(image, "Top 5:", + (20, 130), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2) + + y_offset = 160 + for i, (prob, idx) in enumerate(zip(top5_prob, top5_idx)): + sign = label_encoder.inverse_transform([idx.item()])[0] + prob_val = prob.item() + + # Color code by confidence + if i == 0: + color = (0, 255, 0) # Green for top + elif prob_val > 0.1: + color = (0, 255, 255) # Yellow for decent confidence + else: + color = (128, 128, 128) # Gray for low confidence + + cv2.putText(image, f"{i + 1}. {sign}: {prob_val:.1%}", + (30, y_offset), cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2) + y_offset += 30 + else: + # No hand detected + cv2.putText(image, "No hand detected", + (20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 0, 255), 2) + prediction_buffer.clear() + + # Display FPS and info + info_y = image.shape[0] - 60 + cv2.putText(image, f"FPS: {current_fps:.1f}", + (20, info_y), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) + cv2.putText(image, f"Frame: {frame_count}", + (20, info_y + 25), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2) + + # Display controls at bottom right + controls_text = "ESC: Exit | SPACE: Clear | H: Landmarks" + text_size = cv2.getTextSize(controls_text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 1)[0] + cv2.putText(image, controls_text, + (image.shape[1] - text_size[0] - 10, image.shape[0] - 10), + cv2.FONT_HERSHEY_SIMPLEX, 0.5, (200, 200, 200), 1) + + # Show the image + cv2.imshow('ASL Recognition - Transformer+CNN', image) + + # Handle key presses + key = cv2.waitKey(1) & 0xFF + + if key == 27: # ESC + print("Exiting...") + break + elif key == 32: # SPACE + prediction_buffer.clear() + print("Prediction buffer cleared") + elif key == ord('h') or key == ord('H'): + show_landmarks = not show_landmarks + print(f"Hand landmarks: {'ON' if show_landmarks else 'OFF'}") + +# Cleanup +cap.release() +cv2.destroyAllWindows() +print("Recognition stopped.") \ No newline at end of file diff --git a/training.py b/training.py new file mode 100644 index 0000000..a998547 --- /dev/null +++ b/training.py @@ -0,0 +1,500 @@ +import mediapipe as mp +import numpy as np +import os +import pandas as pd +import json +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import LabelEncoder +import pickle +import torch +import torch.nn as nn +import torch.optim as optim +from torch.utils.data import Dataset, DataLoader +import torch.nn.functional as F +import math +from pathlib import Path + + +# Load the dataset +def load_kaggle_asl_data(base_path='asl_kaggle'): + """ + Load data from Kaggle ASL dataset format + base_path should contain: + - train.csv + - train_landmark_files/ directory + - sign_to_prediction_index_map.json + """ + + # Load train.csv + train_df = pd.read_csv(os.path.join(base_path, 'train.csv')) + + # Load sign mapping + with open(os.path.join(base_path, 'sign_to_prediction_index_map.json'), 'r') as f: + sign_to_idx = json.load(f) + + print(f"Total sequences: {len(train_df)}") + print(f"Unique signs: {len(sign_to_idx)}") + print(f"Signs: {list(sign_to_idx.keys())[:10]}...") # Show first 10 + + return train_df, sign_to_idx + + +def extract_hand_landmarks_from_parquet(parquet_path): + """ + Extract hand landmarks from a parquet file + The file contains landmarks for face, left_hand, pose, right_hand + We only care about hand landmarks + """ + df = pd.read_parquet(parquet_path) + + # Filter for hand landmarks only (left_hand or right_hand) + # For ASL, we'll use whichever hand is dominant in the sequence + left_hand = df[df['type'] == 'left_hand'] + right_hand = df[df['type'] == 'right_hand'] + + # Use the hand with more detected landmarks + if len(left_hand) > len(right_hand): + hand_df = left_hand + elif len(right_hand) > 0: + hand_df = right_hand + else: + return None # No hand detected + + # Get unique frames + frames = hand_df['frame'].unique() + + # We'll use the middle frame (most stable) or average across frames + # For now, let's average the landmarks across all frames + landmarks_list = [] + + for landmark_idx in range(21): # MediaPipe has 21 hand landmarks + landmark_data = hand_df[hand_df['landmark_index'] == landmark_idx] + + if len(landmark_data) == 0: + # Missing landmark, use zeros + landmarks_list.append([0.0, 0.0, 0.0]) + else: + # Average across frames + x = landmark_data['x'].mean() + y = landmark_data['y'].mean() + z = landmark_data['z'].mean() + landmarks_list.append([x, y, z]) + + return np.array(landmarks_list, dtype=np.float32) + + +def get_optimized_features(landmarks_array): + """ + Extract optimally normalized relative coordinates from landmark array + landmarks_array: (21, 3) numpy array + Returns 77 features + """ + if landmarks_array is None: + return None + + points = landmarks_array.copy() + + # Translation invariance + wrist = points[0].copy() + points_centered = points - wrist + + # Scale invariance + palm_size = np.linalg.norm(points[9] - points[0]) + if palm_size < 1e-6: + palm_size = 1.0 + points_normalized = points_centered / palm_size + + # Standardization + mean = np.mean(points_normalized, axis=0) + std = np.std(points_normalized, axis=0) + 1e-8 + points_standardized = (points_normalized - mean) / std + + features = points_standardized.flatten() + + # Derived features + finger_tips = [4, 8, 12, 16, 20] + + tip_distances = [] + for i in range(len(finger_tips) - 1): + dist = np.linalg.norm(points_normalized[finger_tips[i]] - points_normalized[finger_tips[i + 1]]) + tip_distances.append(dist) + + palm_center = np.mean(points_normalized[[0, 5, 9, 13, 17]], axis=0) + tip_to_palm = [] + for tip in finger_tips: + dist = np.linalg.norm(points_normalized[tip] - palm_center) + tip_to_palm.append(dist) + + finger_curls = [] + finger_bases = [1, 5, 9, 13, 17] + for base, tip in zip(finger_bases, finger_tips): + curl = np.linalg.norm(points_normalized[tip] - points_normalized[base]) + finger_curls.append(curl) + + all_features = np.concatenate([ + features, + tip_distances, + tip_to_palm, + finger_curls + ]) + + return all_features.astype(np.float32) + + +# Load dataset +print("Loading Kaggle ASL dataset...") +base_path = 'asl_kaggle' # Change this to your dataset path +train_df, sign_to_idx = load_kaggle_asl_data(base_path) + +# Process landmarks +X = [] +y = [] + +print("\nProcessing landmark files...") +for idx, row in train_df.iterrows(): + if idx % 1000 == 0: + print(f"Processed {idx}/{len(train_df)} sequences...") + + # Construct full path + parquet_path = os.path.join(base_path, row['path']) + + if not os.path.exists(parquet_path): + continue + + # Extract landmarks + landmarks = extract_hand_landmarks_from_parquet(parquet_path) + + if landmarks is None: + continue + + # Get features + features = get_optimized_features(landmarks) + + if features is None: + continue + + X.append(features) + y.append(row['sign']) + +print(f"\nSuccessfully processed {len(X)} sequences") + +if len(X) == 0: + print("ERROR: No valid sequences found! Check your dataset path.") + exit() + +X = np.array(X, dtype=np.float32) +y = np.array(y) + +print(f"Feature vector size: {X.shape[1]} dimensions") + +# Clean data +if np.isnan(X).any(): + print("WARNING: NaN values detected, removing affected samples...") + mask = ~np.isnan(X).any(axis=1) + X = X[mask] + y = y[mask] + +if np.isinf(X).any(): + print("WARNING: Inf values detected, removing affected samples...") + mask = ~np.isinf(X).any(axis=1) + X = X[mask] + y = y[mask] + +# Encode labels using the provided mapping +label_encoder = LabelEncoder() +y_encoded = label_encoder.fit_transform(y) +num_classes = len(label_encoder.classes_) + +print(f"\nNumber of classes: {num_classes}") +print(f"Sample classes: {label_encoder.classes_[:20]}...") + +# Split data +X_train, X_test, y_train, y_test = train_test_split( + X, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded +) + + +# PyTorch Dataset +class ASLDataset(Dataset): + def __init__(self, X, y): + self.X = torch.FloatTensor(X) + self.y = torch.LongTensor(y) + + def __len__(self): + return len(self.X) + + def __getitem__(self, idx): + return self.X[idx], self.y[idx] + + +train_dataset = ASLDataset(X_train, y_train) +test_dataset = ASLDataset(X_test, y_test) + +train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4) +test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4) + + +# Positional Encoding for Transformer +class PositionalEncoding(nn.Module): + def __init__(self, d_model, max_len=100): + super(PositionalEncoding, self).__init__() + + pe = torch.zeros(max_len, d_model) + position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) + div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) + + pe[:, 0::2] = torch.sin(position * div_term) + pe[:, 1::2] = torch.cos(position * div_term) + + pe = pe.unsqueeze(0) + self.register_buffer('pe', pe) + + def forward(self, x): + return x + self.pe[:, :x.size(1), :] + + +# Multi-Head Self-Attention Transformer + CNN Hybrid +class TransformerCNN_ASL(nn.Module): + def __init__(self, input_dim=77, num_classes=250, d_model=512, nhead=8, num_layers=6, dim_feedforward=2048): + super(TransformerCNN_ASL, self).__init__() + + self.input_dim = input_dim + self.d_model = d_model + + # Input projection + self.input_projection = nn.Linear(input_dim, d_model) + self.input_norm = nn.LayerNorm(d_model) + + # Positional encoding + self.pos_encoder = PositionalEncoding(d_model, max_len=100) + + # Transformer Encoder with Self-Attention + encoder_layer = nn.TransformerEncoderLayer( + d_model=d_model, + nhead=nhead, + dim_feedforward=dim_feedforward, + dropout=0.1, + activation='gelu', + batch_first=True, + norm_first=True + ) + self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers) + + # CNN Blocks for pattern detection + self.conv1 = nn.Conv1d(d_model, 1024, kernel_size=3, padding=1) + self.bn1 = nn.BatchNorm1d(1024) + self.pool1 = nn.MaxPool1d(2) + self.dropout1 = nn.Dropout(0.3) + + self.conv2 = nn.Conv1d(1024, 2048, kernel_size=3, padding=1) + self.bn2 = nn.BatchNorm1d(2048) + self.pool2 = nn.MaxPool1d(2) + self.dropout2 = nn.Dropout(0.3) + + self.conv3 = nn.Conv1d(2048, 4096, kernel_size=3, padding=1) + self.bn3 = nn.BatchNorm1d(4096) + self.pool3 = nn.AdaptiveMaxPool1d(1) # Global pooling + self.dropout3 = nn.Dropout(0.4) + + # Fully connected layers + self.fc1 = nn.Linear(4096, 4096) + self.bn_fc1 = nn.BatchNorm1d(4096) + self.dropout_fc1 = nn.Dropout(0.5) + + self.fc2 = nn.Linear(4096, 2048) + self.bn_fc2 = nn.BatchNorm1d(2048) + self.dropout_fc2 = nn.Dropout(0.4) + + self.fc3 = nn.Linear(2048, 1024) + self.bn_fc3 = nn.BatchNorm1d(1024) + self.dropout_fc3 = nn.Dropout(0.3) + + self.fc4 = nn.Linear(1024, num_classes) + + def forward(self, x): + batch_size = x.size(0) + + # Project to d_model + x = self.input_projection(x) + x = self.input_norm(x) + x = x.unsqueeze(1) + + # Add positional encoding + x = self.pos_encoder(x) + + # Transformer encoder with self-attention + x = self.transformer_encoder(x) + + # Reshape for CNN + x = x.permute(0, 2, 1) + + # CNN pattern detection + x = F.gelu(self.bn1(self.conv1(x))) + x = self.pool1(x) + x = self.dropout1(x) + + x = F.gelu(self.bn2(self.conv2(x))) + x = self.pool2(x) + x = self.dropout2(x) + + x = F.gelu(self.bn3(self.conv3(x))) + x = self.pool3(x) + x = self.dropout3(x) + + # Flatten + x = x.view(batch_size, -1) + + # Fully connected layers + x = F.gelu(self.bn_fc1(self.fc1(x))) + x = self.dropout_fc1(x) + + x = F.gelu(self.bn_fc2(self.fc2(x))) + x = self.dropout_fc2(x) + + x = F.gelu(self.bn_fc3(self.fc3(x))) + x = self.dropout_fc3(x) + + x = self.fc4(x) + + return x + + +# Initialize model +device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') +print(f"\nUsing device: {device}") + +model = TransformerCNN_ASL( + input_dim=X.shape[1], + num_classes=num_classes, + d_model=512, + nhead=8, + num_layers=6, + dim_feedforward=2048 +).to(device) + +# Count parameters +total_params = sum(p.numel() for p in model.parameters()) +trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad) +print(f"Total parameters: {total_params:,}") +print(f"Trainable parameters: {trainable_params:,}") + +if total_params > 50_000_000: + print(f"WARNING: Model has {total_params:,} parameters, exceeding 50M limit!") +else: + print(f"Model is within 50M parameter limit ✓") + +# Loss and optimizer +criterion = nn.CrossEntropyLoss(label_smoothing=0.1) +optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4) + +# Cosine annealing learning rate scheduler +scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2) + + +# Training function +def train_epoch(model, loader, criterion, optimizer, device): + model.train() + total_loss = 0 + correct = 0 + total = 0 + + for X_batch, y_batch in loader: + X_batch, y_batch = X_batch.to(device), y_batch.to(device) + + optimizer.zero_grad() + outputs = model(X_batch) + loss = criterion(outputs, y_batch) + loss.backward() + + # Gradient clipping + torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) + + optimizer.step() + + total_loss += loss.item() + _, predicted = outputs.max(1) + total += y_batch.size(0) + correct += predicted.eq(y_batch).sum().item() + + return total_loss / len(loader), 100. * correct / total + + +# Evaluation function +def evaluate(model, loader, device): + model.eval() + correct = 0 + total = 0 + + with torch.no_grad(): + for X_batch, y_batch in loader: + X_batch, y_batch = X_batch.to(device), y_batch.to(device) + outputs = model(X_batch) + _, predicted = outputs.max(1) + total += y_batch.size(0) + correct += predicted.eq(y_batch).sum().item() + + return 100. * correct / total + + +# Dynamic epoch calculation +def calculate_epochs(dataset_size): + if dataset_size < 1000: + return 200 + elif dataset_size < 5000: + return 150 + elif dataset_size < 10000: + return 100 + elif dataset_size < 50000: + return 75 + else: + return 50 + + +num_epochs = calculate_epochs(len(X_train)) +print(f"\nDynamic epoch calculation: {num_epochs} epochs for {len(X_train)} training samples") + +# Early stopping +patience = 20 +best_acc = 0 +patience_counter = 0 + +print("\nStarting training with Transformer + CNN architecture...") + +for epoch in range(num_epochs): + train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device) + test_acc = evaluate(model, test_loader, device) + + scheduler.step() + + if test_acc > best_acc: + best_acc = test_acc + patience_counter = 0 + # Save best model + torch.save({ + 'model_state_dict': model.state_dict(), + 'label_encoder': label_encoder, + 'num_classes': num_classes, + 'input_dim': X.shape[1], + 'sign_to_idx': sign_to_idx, + 'model_config': { + 'd_model': 512, + 'nhead': 8, + 'num_layers': 6, + 'dim_feedforward': 2048 + } + }, 'asl_kaggle_transformer.pth') + else: + patience_counter += 1 + + if (epoch + 1) % 5 == 0: + current_lr = optimizer.param_groups[0]['lr'] + print( + f"Epoch {epoch + 1}/{num_epochs} | Loss: {train_loss:.4f} | Train: {train_acc:.2f}% | Test: {test_acc:.2f}% | Best: {best_acc:.2f}% | LR: {current_lr:.6f}") + + # Early stopping + if patience_counter >= patience: + print(f"\nEarly stopping triggered at epoch {epoch + 1}") + break + +print(f"\nTraining complete! Best test accuracy: {best_acc:.2f}%") +print("Model saved to asl_kaggle_transformer.pth") \ No newline at end of file