import os import polars as pl import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.utils.data import TensorDataset, DataLoader from concurrent.futures import ProcessPoolExecutor from tqdm import tqdm from sklearn.model_selection import train_test_split # --- CONFIGURATION --- BASE_PATH = "asl_kaggle" TARGET_FRAMES = 22 # Hand landmarks + Lip landmarks (approximate indices for high-value face points) LIPS = [61, 146, 91, 181, 84, 17, 314, 405, 321, 375, 291, 308, 324, 318, 402, 317, 14, 87, 178, 88, 95] HANDS = list(range(468, 543)) SELECTED_INDICES = LIPS + HANDS NUM_FEATS = len(SELECTED_INDICES) * 3 # X, Y, Z for each selected point device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # --- DATA PROCESSING --- def load_kaggle_metadata(base_path): return pl.read_csv(os.path.join(base_path, "train.csv")) def load_and_preprocess(path, base_path=BASE_PATH, target_frames=TARGET_FRAMES): parquet_path = os.path.join(base_path, path) df = pl.read_parquet(parquet_path) # 1. Spatial Normalization (Nose Anchor) anchors = ( df.filter((pl.col("type") == "face") & (pl.col("landmark_index") == 0)) .select([pl.col("frame"), pl.col("x").alias("nx"), pl.col("y").alias("ny"), pl.col("z").alias("nz")]) ) processed = ( df.join(anchors, on="frame", how="left") .with_columns([ (pl.col("x") - pl.col("nx")).fill_null(0.0), (pl.col("y") - pl.col("ny")).fill_null(0.0), (pl.col("z") - pl.col("nz")).fill_null(0.0), ]) .sort(["frame", "type", "landmark_index"]) ) # 2. Reshape & Feature Selection # Get unique frames and total landmarks (543) raw_tensor = processed.select(["x", "y", "z"]).to_numpy().reshape(-1, 543, 3) # Slice to keep only Hands and Lips reduced_tensor = raw_tensor[:, SELECTED_INDICES, :] # 3. Temporal Normalization (Resample to fixed frame count) curr_len = reduced_tensor.shape[0] indices = np.linspace(0, curr_len - 1, num=target_frames).round().astype(int) return reduced_tensor[indices] # --- MODEL ARCHITECTURE --- class ASLClassifier(nn.Module): def __init__(self, num_classes, target_frames=TARGET_FRAMES, num_feats=NUM_FEATS): super().__init__() self.conv1 = nn.Conv1d(num_feats, 256, kernel_size=3, padding=1) self.bn1 = nn.BatchNorm1d(256) self.conv2 = nn.Conv1d(256, 512, kernel_size=3, padding=1) self.bn2 = nn.BatchNorm1d(512) self.pool = nn.MaxPool1d(2) self.dropout = nn.Dropout(0.5) self.fc = nn.Linear(512, num_classes) def forward(self, x): # x: (Batch, Frames, Selected_Landmarks, 3) x = x.view(x.shape[0], x.shape[1], -1) # Flatten landmarks/coords x = x.transpose(1, 2) # (Batch, Features, Time) x = F.relu(self.bn1(self.conv1(x))) x = self.pool(x) x = F.relu(self.bn2(self.conv2(x))) x = self.pool(x) x = F.adaptive_avg_pool1d(x, 1).squeeze(-1) x = self.dropout(x) return self.fc(x) # --- TRAINING FUNCTION --- def train_model(model, train_loader, val_loader, epochs=20, lr=0.001): criterion = nn.CrossEntropyLoss() optimizer = torch.optim.Adam(model.parameters(), lr=lr) scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', patience=3, factor=0.5) best_val_acc = 0.0 for epoch in range(epochs): # Training phase model.train() train_loss = 0.0 train_correct = 0 train_total = 0 pbar = tqdm(train_loader, desc=f"Epoch {epoch + 1}/{epochs} [Train]") for inputs, labels in pbar: inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() train_loss += loss.item() _, predicted = torch.max(outputs, 1) train_total += labels.size(0) train_correct += (predicted == labels).sum().item() pbar.set_postfix({'loss': f'{loss.item():.4f}', 'acc': f'{100 * train_correct / train_total:.2f}%'}) train_acc = 100 * train_correct / train_total avg_train_loss = train_loss / len(train_loader) # Validation phase model.eval() val_loss = 0.0 val_correct = 0 val_total = 0 with torch.no_grad(): for inputs, labels in tqdm(val_loader, desc=f"Epoch {epoch + 1}/{epochs} [Val]"): inputs, labels = inputs.to(device), labels.to(device) outputs = model(inputs) loss = criterion(outputs, labels) val_loss += loss.item() _, predicted = torch.max(outputs, 1) val_total += labels.size(0) val_correct += (predicted == labels).sum().item() val_acc = 100 * val_correct / val_total avg_val_loss = val_loss / len(val_loader) scheduler.step(avg_val_loss) print(f"\nEpoch {epoch + 1}/{epochs}:") print(f" Train Loss: {avg_train_loss:.4f} | Train Acc: {train_acc:.2f}%") print(f" Val Loss: {avg_val_loss:.4f} | Val Acc: {val_acc:.2f}%") # Save best model if val_acc > best_val_acc: best_val_acc = val_acc torch.save(model.state_dict(), 'best_asl_model.pth') print(f" ✓ New best model saved! (Val Acc: {val_acc:.2f}%)") print() print(f"Training complete! Best validation accuracy: {best_val_acc:.2f}%") # --- EXECUTION --- if __name__ == "__main__": asl_data = load_kaggle_metadata(BASE_PATH) # Process all files paths = asl_data["path"].to_list() labels = asl_data["sign"].to_list() # Create label mapping unique_signs = sorted(set(labels)) sign_to_idx = {sign: idx for idx, sign in enumerate(unique_signs)} label_indices = [sign_to_idx[sign] for sign in labels] print(f"Processing {len(paths)} files in parallel...") with ProcessPoolExecutor() as executor: results = list(tqdm(executor.map(load_and_preprocess, paths), total=len(paths))) # Create tensors X = torch.tensor(np.array(results), dtype=torch.float32) y = torch.tensor(label_indices, dtype=torch.long) print(f"Dataset Tensor Shape: {X.shape}") print(f"Labels Tensor Shape: {y.shape}") print(f"Number of unique signs: {len(unique_signs)}") # Train/Val split X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42, stratify=y) # Create DataLoaders train_dataset = TensorDataset(X_train, y_train) val_dataset = TensorDataset(X_val, y_val) train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False) print(f"Train samples: {len(train_dataset)}") print(f"Val samples: {len(val_dataset)}") # Initialize and train model model = ASLClassifier(num_classes=len(unique_signs)) model.to(device) print(f"\nModel initialized with {sum(p.numel() for p in model.parameters()):,} parameters") print("Starting training...\n") train_model(model, train_loader, val_loader, epochs=20, lr=0.002)