chatgpt, I trust you bro you got ts

This commit is contained in:
2026-01-10 22:32:58 -06:00
parent f6b69eadc2
commit abf57f95de

View File

@@ -1,256 +1,162 @@
import mediapipe as mp # ===============================
import numpy as np # IMPORTS
# ===============================
import os import os
import pandas as pd
import json import json
from sklearn.model_selection import train_test_split import math
from sklearn.preprocessing import LabelEncoder import time
import pickle import pickle
import numpy as np
import pandas as pd
import torch import torch
import torch.nn as nn import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F import torch.nn.functional as F
import math import torch.optim as optim
from pathlib import Path
# GPU Configuration from torch.utils.data import Dataset, DataLoader
print("=" * 50) from sklearn.model_selection import train_test_split
print("GPU CONFIGURATION") from sklearn.preprocessing import LabelEncoder
print("=" * 50)
# Check CUDA availability
if torch.cuda.is_available():
print(f"✓ CUDA is available!")
print(f"✓ GPU Device: {torch.cuda.get_device_name(0)}")
print(f"✓ CUDA Version: {torch.version.cuda}")
print(f"✓ Number of GPUs: {torch.cuda.device_count()}")
print(f"✓ Current GPU Memory: {torch.cuda.get_device_properties(0).total_memory / 1024 ** 3:.2f} GB")
# Set default GPU device
torch.cuda.set_device(0)
device = torch.device('cuda:0')
# Enable cuDNN benchmark for better performance
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.enabled = True
print(f"✓ cuDNN benchmark mode: enabled")
else:
print("✗ CUDA is NOT available. Using CPU.")
print(" Make sure you have:")
print(" 1. NVIDIA GPU")
print(" 2. CUDA toolkit installed")
print(" 3. PyTorch with CUDA support")
device = torch.device('cpu')
print("=" * 50)
print()
# Load the dataset
def load_kaggle_asl_data(base_path='asl_kaggle'):
"""
Load data from Kaggle ASL dataset format
base_path should contain:
- train.csv
- train_landmark_files/ directory
- sign_to_prediction_index_map.json
"""
train_df = pd.read_csv(os.path.join(base_path, 'train.csv'))
with open(os.path.join(base_path, 'sign_to_prediction_index_map.json'), 'r') as f:
sign_to_idx = json.load(f)
print(f"Total sequences: {len(train_df)}")
print(f"Unique signs: {len(sign_to_idx)}")
print(f"Signs: {list(sign_to_idx.keys())[:10]}...")
return train_df, sign_to_idx
def extract_hand_landmarks_from_parquet(parquet_path):
"""Extract hand landmarks from a parquet file"""
df = pd.read_parquet(parquet_path)
left_hand = df[df['type'] == 'left_hand']
right_hand = df[df['type'] == 'right_hand']
if len(left_hand) > len(right_hand):
hand_df = left_hand
elif len(right_hand) > 0:
hand_df = right_hand
else:
return None
landmarks_list = []
for landmark_idx in range(21):
landmark_data = hand_df[hand_df['landmark_index'] == landmark_idx]
if len(landmark_data) == 0:
landmarks_list.append([0.0, 0.0, 0.0])
else:
x = landmark_data['x'].mean()
y = landmark_data['y'].mean()
z = landmark_data['z'].mean()
landmarks_list.append([x, y, z])
return np.array(landmarks_list, dtype=np.float32)
def get_optimized_features(landmarks_array):
"""Extract optimally normalized relative coordinates from landmark array"""
if landmarks_array is None:
return None
points = landmarks_array.copy()
wrist = points[0].copy()
points_centered = points - wrist
palm_size = np.linalg.norm(points[9] - points[0])
if palm_size < 1e-6:
palm_size = 1.0
points_normalized = points_centered / palm_size
mean = np.mean(points_normalized, axis=0)
std = np.std(points_normalized, axis=0) + 1e-8
points_standardized = (points_normalized - mean) / std
features = points_standardized.flatten()
finger_tips = [4, 8, 12, 16, 20]
tip_distances = []
for i in range(len(finger_tips) - 1):
dist = np.linalg.norm(points_normalized[finger_tips[i]] - points_normalized[finger_tips[i + 1]])
tip_distances.append(dist)
palm_center = np.mean(points_normalized[[0, 5, 9, 13, 17]], axis=0)
tip_to_palm = []
for tip in finger_tips:
dist = np.linalg.norm(points_normalized[tip] - palm_center)
tip_to_palm.append(dist)
finger_curls = []
finger_bases = [1, 5, 9, 13, 17]
for base, tip in zip(finger_bases, finger_tips):
curl = np.linalg.norm(points_normalized[tip] - points_normalized[base])
finger_curls.append(curl)
all_features = np.concatenate([
features,
tip_distances,
tip_to_palm,
finger_curls
])
return all_features.astype(np.float32)
# Load dataset
print("Loading Kaggle ASL dataset...")
base_path = 'asl_kaggle'
train_df, sign_to_idx = load_kaggle_asl_data(base_path)
# Process landmarks with parallel processing
from multiprocessing import Pool, cpu_count from multiprocessing import Pool, cpu_count
from functools import partial from functools import partial
# ===============================
# GPU SETUP
# ===============================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
def process_single_sequence(row, base_path): if device.type == "cuda":
"""Process a single sequence - designed for parallel execution""" print("GPU:", torch.cuda.get_device_name(0))
parquet_path = os.path.join(base_path, row['path']) torch.backends.cudnn.benchmark = True
if not os.path.exists(parquet_path): # ===============================
# DATA LOADING
# ===============================
def load_kaggle_asl_data(base_path):
train_df = pd.read_csv(os.path.join(base_path, "train.csv"))
with open(os.path.join(base_path, "sign_to_prediction_index_map.json")) as f:
sign_to_idx = json.load(f)
return train_df, sign_to_idx
def extract_hand_landmarks_from_parquet(path):
df = pd.read_parquet(path)
left = df[df["type"] == "left_hand"]
right = df[df["type"] == "right_hand"]
if len(left) > 0:
hand = left
elif len(right) > 0:
hand = right
else:
return None
landmarks = []
for i in range(21):
lm = hand[hand["landmark_index"] == i]
if len(lm) == 0:
landmarks.append([0.0, 0.0, 0.0])
else:
landmarks.append([
lm["x"].mean(),
lm["y"].mean(),
lm["z"].mean()
])
return np.array(landmarks, dtype=np.float32)
def get_features(landmarks):
if landmarks is None:
return None
wrist = landmarks[0]
points = landmarks - wrist
scale = np.linalg.norm(points[9])
if scale < 1e-6:
scale = 1.0
points /= scale
mean = points.mean(axis=0)
std = points.std(axis=0) + 1e-6
points = (points - mean) / std
features = points.flatten()
tips = [4, 8, 12, 16, 20]
bases = [1, 5, 9, 13, 17]
tip_dist = []
curl = []
for b, t in zip(bases, tips):
curl.append(np.linalg.norm(points[t] - points[b]))
for i in range(len(tips) - 1):
tip_dist.append(np.linalg.norm(points[tips[i]] - points[tips[i+1]]))
return np.concatenate([features, tip_dist, curl]).astype(np.float32)
def process_row(row, base_path):
path = os.path.join(base_path, row["path"])
if not os.path.exists(path):
return None, None return None, None
try: try:
landmarks = extract_hand_landmarks_from_parquet(parquet_path) lm = extract_hand_landmarks_from_parquet(path)
feat = get_features(lm)
if landmarks is None: return feat, row["sign"]
return None, None except:
features = get_optimized_features(landmarks)
if features is None:
return None, None
return features, row['sign']
except Exception as e:
return None, None return None, None
print("\nProcessing landmark files with parallel processing...") # ===============================
print(f"Using {cpu_count()} CPU cores") # LOAD + PROCESS DATA
# ===============================
base_path = "asl_kaggle"
train_df, sign_to_idx = load_kaggle_asl_data(base_path)
# Convert DataFrame rows to list for parallel processing rows = [row for _, row in train_df.iterrows()]
rows_list = [row for _, row in train_df.iterrows()] X, y = [], []
# Create partial function with base_path with Pool(cpu_count()) as pool:
process_func = partial(process_single_sequence, base_path=base_path) func = partial(process_row, base_path=base_path)
for feat, sign in pool.map(func, rows):
# Process in parallel with progress updates if feat is not None:
X = [] X.append(feat)
y = [] y.append(sign)
batch_size = 1000
with Pool(processes=cpu_count()) as pool:
for i in range(0, len(rows_list), batch_size):
batch = rows_list[i:i + batch_size]
results = pool.map(process_func, batch)
for features, sign in results:
if features is not None and sign is not None:
X.append(features)
y.append(sign)
print(f"Processed {min(i + batch_size, len(rows_list))}/{len(rows_list)} sequences... (Valid: {len(X)})")
print(f"\nSuccessfully processed {len(X)} sequences")
if len(X) == 0:
print("ERROR: No valid sequences found! Check your dataset path.")
exit()
X = np.array(X, dtype=np.float32) X = np.array(X, dtype=np.float32)
y = np.array(y) y = np.array(y)
print(f"Feature vector size: {X.shape[1]} dimensions") print("Samples:", len(X))
print("Feature dim:", X.shape[1])
# Clean data # ===============================
if np.isnan(X).any(): # LABEL ENCODING
print("WARNING: NaN values detected, removing affected samples...") # ===============================
mask = ~np.isnan(X).any(axis=1) le = LabelEncoder()
X = X[mask] y = le.fit_transform(y)
y = y[mask] num_classes = len(le.classes_)
if np.isinf(X).any(): # ===============================
print("WARNING: Inf values detected, removing affected samples...") # SPLIT
mask = ~np.isinf(X).any(axis=1) # ===============================
X = X[mask]
y = y[mask]
# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(y)
num_classes = len(label_encoder.classes_)
print(f"\nNumber of classes: {num_classes}")
print(f"Sample classes: {label_encoder.classes_[:20]}...")
# Split data
X_train, X_test, y_train, y_test = train_test_split( X_train, X_test, y_train, y_test = train_test_split(
X, y_encoded, test_size=0.2, random_state=42, stratify=y_encoded X, y, test_size=0.2, stratify=y, random_state=42
) )
# ===============================
# PyTorch Dataset # DATASET
# ===============================
class ASLDataset(Dataset): class ASLDataset(Dataset):
def __init__(self, X, y): def __init__(self, X, y):
self.X = torch.FloatTensor(X) self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.LongTensor(y) self.y = torch.tensor(y, dtype=torch.long)
def __len__(self): def __len__(self):
return len(self.X) return len(self.X)
@@ -259,317 +165,148 @@ class ASLDataset(Dataset):
return self.X[idx], self.y[idx] return self.X[idx], self.y[idx]
train_dataset = ASLDataset(X_train, y_train)
test_dataset = ASLDataset(X_test, y_test)
# Optimized DataLoader settings for GPU
num_workers = 4 if device.type == 'cuda' else 0
pin_memory = True if device.type == 'cuda' else False
batch_size = 128 if device.type == 'cuda' else 64 # Larger batch size for GPU
train_loader = DataLoader( train_loader = DataLoader(
train_dataset, ASLDataset(X_train, y_train),
batch_size=batch_size, batch_size=256,
shuffle=True, shuffle=True,
num_workers=num_workers, pin_memory=True
pin_memory=pin_memory,
persistent_workers=True if num_workers > 0 else False
) )
test_loader = DataLoader( test_loader = DataLoader(
test_dataset, ASLDataset(X_test, y_test),
batch_size=batch_size, batch_size=256,
shuffle=False, shuffle=False,
num_workers=num_workers, pin_memory=True
pin_memory=pin_memory,
persistent_workers=True if num_workers > 0 else False
) )
print(f"\nDataLoader Configuration:") # ===============================
print(f" Batch size: {batch_size}") # MODEL (FIXED)
print(f" Num workers: {num_workers}") # ===============================
print(f" Pin memory: {pin_memory}") class TransformerASL(nn.Module):
def __init__(self, input_dim, num_classes):
super().__init__()
self.proj = nn.Linear(input_dim, 256)
# Positional Encoding for Transformer self.norm = nn.LayerNorm(256)
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=100):
super(PositionalEncoding, self).__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer('pe', pe)
def forward(self, x):
return x + self.pe[:, :x.size(1), :]
# Multi-Head Self-Attention Transformer + CNN Hybrid
class TransformerCNN_ASL(nn.Module):
def __init__(self, input_dim=77, num_classes=250, d_model=512, nhead=8, num_layers=6, dim_feedforward=2048):
super(TransformerCNN_ASL, self).__init__()
self.input_dim = input_dim
self.d_model = d_model
self.input_projection = nn.Linear(input_dim, d_model)
self.input_norm = nn.LayerNorm(d_model)
self.pos_encoder = PositionalEncoding(d_model, max_len=100)
encoder_layer = nn.TransformerEncoderLayer( encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model, d_model=256,
nhead=nhead, nhead=8,
dim_feedforward=dim_feedforward, dim_feedforward=1024,
dropout=0.1, dropout=0.1,
activation='gelu', activation="gelu",
batch_first=True, batch_first=True,
norm_first=True norm_first=True
) )
self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.conv1 = nn.Conv1d(d_model, 1024, kernel_size=3, padding=1) self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=4)
self.bn1 = nn.BatchNorm1d(1024)
self.pool1 = nn.MaxPool1d(2)
self.dropout1 = nn.Dropout(0.3)
self.conv2 = nn.Conv1d(1024, 2048, kernel_size=3, padding=1) self.fc1 = nn.Linear(256, 512)
self.bn2 = nn.BatchNorm1d(2048) self.bn1 = nn.BatchNorm1d(512)
self.pool2 = nn.MaxPool1d(2) self.drop1 = nn.Dropout(0.4)
self.dropout2 = nn.Dropout(0.3)
self.conv3 = nn.Conv1d(2048, 4096, kernel_size=3, padding=1) self.fc2 = nn.Linear(512, 256)
self.bn3 = nn.BatchNorm1d(4096) self.bn2 = nn.BatchNorm1d(256)
self.pool3 = nn.AdaptiveMaxPool1d(1) self.drop2 = nn.Dropout(0.3)
self.dropout3 = nn.Dropout(0.4)
self.fc1 = nn.Linear(4096, 4096) self.out = nn.Linear(256, num_classes)
self.bn_fc1 = nn.BatchNorm1d(4096)
self.dropout_fc1 = nn.Dropout(0.5)
self.fc2 = nn.Linear(4096, 2048)
self.bn_fc2 = nn.BatchNorm1d(2048)
self.dropout_fc2 = nn.Dropout(0.4)
self.fc3 = nn.Linear(2048, 1024)
self.bn_fc3 = nn.BatchNorm1d(1024)
self.dropout_fc3 = nn.Dropout(0.3)
self.fc4 = nn.Linear(1024, num_classes)
def forward(self, x): def forward(self, x):
batch_size = x.size(0) x = self.proj(x)
x = self.norm(x)
x = self.input_projection(x) x = x.unsqueeze(1) # (B, 1, 256)
x = self.input_norm(x) x = self.encoder(x)
x = x.unsqueeze(1) x = x.squeeze(1)
x = self.pos_encoder(x) x = F.gelu(self.bn1(self.fc1(x)))
x = self.transformer_encoder(x) x = self.drop1(x)
x = x.permute(0, 2, 1) x = F.gelu(self.bn2(self.fc2(x)))
x = self.drop2(x)
x = F.gelu(self.bn1(self.conv1(x))) return self.out(x)
x = self.pool1(x)
x = self.dropout1(x)
x = F.gelu(self.bn2(self.conv2(x)))
x = self.pool2(x)
x = self.dropout2(x)
x = F.gelu(self.bn3(self.conv3(x)))
x = self.pool3(x)
x = self.dropout3(x)
x = x.view(batch_size, -1)
x = F.gelu(self.bn_fc1(self.fc1(x)))
x = self.dropout_fc1(x)
x = F.gelu(self.bn_fc2(self.fc2(x)))
x = self.dropout_fc2(x)
x = F.gelu(self.bn_fc3(self.fc3(x)))
x = self.dropout_fc3(x)
x = self.fc4(x)
return x
# Initialize model model = TransformerASL(X.shape[1], num_classes).to(device)
print(f"\nInitializing model on {device}...") print("Parameters:", sum(p.numel() for p in model.parameters()))
model = TransformerCNN_ASL( # ===============================
input_dim=X.shape[1], # TRAINING SETUP
num_classes=num_classes, # ===============================
d_model=512,
nhead=8,
num_layers=6,
dim_feedforward=2048
).to(device)
# Count parameters
total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total parameters: {total_params:,}")
print(f"Trainable parameters: {trainable_params:,}")
if total_params > 50_000_000:
print(f"WARNING: Model has {total_params:,} parameters, exceeding 50M limit!")
else:
print(f"Model is within 50M parameter limit ✓")
# Display GPU memory usage
if device.type == 'cuda':
print(f"\nGPU Memory after model initialization:")
print(f" Allocated: {torch.cuda.memory_allocated(0) / 1024 ** 2:.2f} MB")
print(f" Cached: {torch.cuda.memory_reserved(0) / 1024 ** 2:.2f} MB")
# Loss and optimizer
criterion = nn.CrossEntropyLoss(label_smoothing=0.1) criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=1e-4) optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, 10)
# Cosine annealing learning rate scheduler # ===============================
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=2) # TRAIN / EVAL
# ===============================
def train_epoch():
# Training function
def train_epoch(model, loader, criterion, optimizer, device):
model.train() model.train()
total_loss = 0 total, correct, loss_sum = 0, 0, 0
correct = 0
total = 0
for X_batch, y_batch in loader: for x, y in train_loader:
X_batch, y_batch = X_batch.to(device, non_blocking=True), y_batch.to(device, non_blocking=True) x, y = x.to(device), y.to(device)
optimizer.zero_grad(set_to_none=True) # More efficient than zero_grad() optimizer.zero_grad(set_to_none=True)
outputs = model(X_batch) logits = model(x)
loss = criterion(outputs, y_batch) loss = criterion(logits, y)
loss.backward() loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0) torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step() optimizer.step()
total_loss += loss.item() loss_sum += loss.item()
_, predicted = outputs.max(1) correct += (logits.argmax(1) == y).sum().item()
total += y_batch.size(0) total += y.size(0)
correct += predicted.eq(y_batch).sum().item()
return total_loss / len(loader), 100. * correct / total return loss_sum / len(train_loader), 100 * correct / total
# Evaluation function @torch.no_grad()
def evaluate(model, loader, device): def evaluate():
model.eval() model.eval()
correct = 0 total, correct = 0, 0
total = 0
with torch.no_grad(): for x, y in test_loader:
for X_batch, y_batch in loader: x, y = x.to(device), y.to(device)
X_batch, y_batch = X_batch.to(device, non_blocking=True), y_batch.to(device, non_blocking=True) logits = model(x)
outputs = model(X_batch) correct += (logits.argmax(1) == y).sum().item()
_, predicted = outputs.max(1) total += y.size(0)
total += y_batch.size(0)
correct += predicted.eq(y_batch).sum().item()
return 100. * correct / total return 100 * correct / total
# Dynamic epoch calculation # ===============================
def calculate_epochs(dataset_size): # TRAIN LOOP
if dataset_size < 1000: # ===============================
return 200
elif dataset_size < 5000:
return 150
elif dataset_size < 10000:
return 100
elif dataset_size < 50000:
return 75
else:
return 50
num_epochs = calculate_epochs(len(X_train))
print(f"\nDynamic epoch calculation: {num_epochs} epochs for {len(X_train)} training samples")
# Early stopping
patience = 20
best_acc = 0 best_acc = 0
patience_counter = 0 patience = 15
wait = 0
print("\nStarting training with Transformer + CNN architecture...") epochs = 50
print("=" * 50)
# Track training time
import time
start_time = time.time()
for epoch in range(num_epochs):
epoch_start = time.time()
train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
test_acc = evaluate(model, test_loader, device)
for epoch in range(epochs):
loss, train_acc = train_epoch()
test_acc = evaluate()
scheduler.step() scheduler.step()
epoch_time = time.time() - epoch_start print(f"Epoch {epoch+1}/{epochs} | "
f"Loss {loss:.4f} | "
f"Train {train_acc:.2f}% | "
f"Test {test_acc:.2f}%")
if test_acc > best_acc: if test_acc > best_acc:
best_acc = test_acc best_acc = test_acc
patience_counter = 0 wait = 0
# Save best model
torch.save({ torch.save({
'model_state_dict': model.state_dict(), "model": model.state_dict(),
'label_encoder': label_encoder, "label_encoder": le
'num_classes': num_classes, }, "asl_transformer_fixed.pth")
'input_dim': X.shape[1],
'sign_to_idx': sign_to_idx,
'model_config': {
'd_model': 512,
'nhead': 8,
'num_layers': 6,
'dim_feedforward': 2048
}
}, 'asl_kaggle_transformer.pth')
else: else:
patience_counter += 1 wait += 1
if (epoch + 1) % 5 == 0: if wait >= patience:
current_lr = optimizer.param_groups[0]['lr'] print("Early stopping")
print(f"Epoch {epoch + 1}/{num_epochs} | Loss: {train_loss:.4f} | "
f"Train: {train_acc:.2f}% | Test: {test_acc:.2f}% | "
f"Best: {best_acc:.2f}% | LR: {current_lr:.6f} | "
f"Time: {epoch_time:.2f}s")
if device.type == 'cuda':
print(f" GPU Memory: {torch.cuda.memory_allocated(0) / 1024 ** 2:.2f} MB")
# Early stopping
if patience_counter >= patience:
print(f"\nEarly stopping triggered at epoch {epoch + 1}")
break break
total_time = time.time() - start_time print("Best accuracy:", best_acc)
print("=" * 50)
print(f"\nTraining complete! Best test accuracy: {best_acc:.2f}%")
print(f"Total training time: {total_time / 60:.2f} minutes")
print(f"Average time per epoch: {total_time / (epoch + 1):.2f} seconds")
print("Model saved to asl_kaggle_transformer.pth")
# Final GPU memory stats
if device.type == 'cuda':
print(f"\nFinal GPU Memory Usage:")
print(f" Allocated: {torch.cuda.memory_allocated(0) / 1024 ** 2:.2f} MB")
print(f" Cached: {torch.cuda.memory_reserved(0) / 1024 ** 2:.2f} MB")
print(f" Max Allocated: {torch.cuda.max_memory_allocated(0) / 1024 ** 2:.2f} MB")