chatgpt, i hate u

This commit is contained in:
2026-01-10 22:47:23 -06:00
parent abf57f95de
commit c209e036cb

View File

@@ -5,7 +5,6 @@ import os
import json import json
import math import math
import time import time
import pickle
import numpy as np import numpy as np
import pandas as pd import pandas as pd
@@ -25,13 +24,12 @@ from functools import partial
# =============================== # ===============================
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}") print(f"Using device: {device}")
if device.type == "cuda": if device.type == "cuda":
print("GPU:", torch.cuda.get_device_name(0)) print("GPU:", torch.cuda.get_device_name(0))
torch.backends.cudnn.benchmark = True torch.backends.cudnn.benchmark = True
# =============================== # ===============================
# DATA LOADING # DATA LOADING & FEATURE EXTRACTION
# =============================== # ===============================
def load_kaggle_asl_data(base_path): def load_kaggle_asl_data(base_path):
train_df = pd.read_csv(os.path.join(base_path, "train.csv")) train_df = pd.read_csv(os.path.join(base_path, "train.csv"))
@@ -39,13 +37,12 @@ def load_kaggle_asl_data(base_path):
sign_to_idx = json.load(f) sign_to_idx = json.load(f)
return train_df, sign_to_idx return train_df, sign_to_idx
def extract_hand_landmarks_from_parquet(path): def extract_hand_landmarks_from_parquet(path):
df = pd.read_parquet(path) df = pd.read_parquet(path)
left = df[df["type"] == "left_hand"] left = df[df["type"] == "left_hand"]
right = df[df["type"] == "right_hand"] right = df[df["type"] == "right_hand"]
hand = None
if len(left) > 0: if len(left) > 0:
hand = left hand = left
elif len(right) > 0: elif len(right) > 0:
@@ -53,67 +50,56 @@ def extract_hand_landmarks_from_parquet(path):
else: else:
return None return None
landmarks = [] # Keep all frames
for i in range(21): frames = sorted(hand['frame'].unique())
lm = hand[hand["landmark_index"] == i] landmarks_seq = []
if len(lm) == 0:
landmarks.append([0.0, 0.0, 0.0])
else:
landmarks.append([
lm["x"].mean(),
lm["y"].mean(),
lm["z"].mean()
])
return np.array(landmarks, dtype=np.float32) for frame in frames:
lm_frame = hand[hand['frame'] == frame]
lm_list = []
for i in range(21):
lm = lm_frame[lm_frame['landmark_index'] == i]
if len(lm) == 0:
lm_list.append([0.0, 0.0, 0.0])
else:
lm_list.append([
lm['x'].mean(),
lm['y'].mean(),
lm['z'].mean()
])
landmarks_seq.append(lm_list)
return np.array(landmarks_seq, dtype=np.float32) # (T, 21, 3)
def get_features(landmarks): def get_features_sequence(landmarks_seq, max_frames=100):
if landmarks is None: if landmarks_seq is None:
return None return None
# Center on wrist
points = landmarks_seq - landmarks_seq[:, 0:1, :]
scale = np.linalg.norm(points[:, 9, :], axis=1, keepdims=True)
scale[scale < 1e-6] = 1.0
points /= scale[:, np.newaxis, :]
# Flatten per frame
frames = points.reshape(points.shape[0], -1)
# Pad or truncate
if frames.shape[0] < max_frames:
pad = np.zeros((max_frames - frames.shape[0], frames.shape[1]), dtype=np.float32)
frames = np.vstack([frames, pad])
else:
frames = frames[:max_frames]
return frames # (max_frames, 63)
wrist = landmarks[0] def process_row(row, base_path, max_frames=100):
points = landmarks - wrist path = os.path.join(base_path, row['path'])
scale = np.linalg.norm(points[9])
if scale < 1e-6:
scale = 1.0
points /= scale
mean = points.mean(axis=0)
std = points.std(axis=0) + 1e-6
points = (points - mean) / std
features = points.flatten()
tips = [4, 8, 12, 16, 20]
bases = [1, 5, 9, 13, 17]
tip_dist = []
curl = []
for b, t in zip(bases, tips):
curl.append(np.linalg.norm(points[t] - points[b]))
for i in range(len(tips) - 1):
tip_dist.append(np.linalg.norm(points[tips[i]] - points[tips[i+1]]))
return np.concatenate([features, tip_dist, curl]).astype(np.float32)
def process_row(row, base_path):
path = os.path.join(base_path, row["path"])
if not os.path.exists(path): if not os.path.exists(path):
return None, None return None, None
try: try:
lm = extract_hand_landmarks_from_parquet(path) lm_seq = extract_hand_landmarks_from_parquet(path)
feat = get_features(lm) feat_seq = get_features_sequence(lm_seq, max_frames)
return feat, row["sign"] return feat_seq, row['sign']
except: except:
return None, None return None, None
# =============================== # ===============================
# LOAD + PROCESS DATA # LOAD + PROCESS DATA
# =============================== # ===============================
@@ -123,18 +109,17 @@ train_df, sign_to_idx = load_kaggle_asl_data(base_path)
rows = [row for _, row in train_df.iterrows()] rows = [row for _, row in train_df.iterrows()]
X, y = [], [] X, y = [], []
func = partial(process_row, base_path=base_path, max_frames=100)
with Pool(cpu_count()) as pool: with Pool(cpu_count()) as pool:
func = partial(process_row, base_path=base_path) for feat_seq, sign in pool.map(func, rows):
for feat, sign in pool.map(func, rows): if feat_seq is not None:
if feat is not None: X.append(feat_seq)
X.append(feat)
y.append(sign) y.append(sign)
X = np.array(X, dtype=np.float32) X = np.stack(X) # (N, T, 63)
y = np.array(y) y = np.array(y)
print("Samples:", len(X)) print("Samples:", len(X))
print("Feature dim:", X.shape[1]) print("Sequence shape:", X.shape[1:])
# =============================== # ===============================
# LABEL ENCODING # LABEL ENCODING
@@ -142,6 +127,7 @@ print("Feature dim:", X.shape[1])
le = LabelEncoder() le = LabelEncoder()
y = le.fit_transform(y) y = le.fit_transform(y)
num_classes = len(le.classes_) num_classes = len(le.classes_)
print("Num classes:", num_classes)
# =============================== # ===============================
# SPLIT # SPLIT
@@ -153,7 +139,7 @@ X_train, X_test, y_train, y_test = train_test_split(
# =============================== # ===============================
# DATASET # DATASET
# =============================== # ===============================
class ASLDataset(Dataset): class ASLSequenceDataset(Dataset):
def __init__(self, X, y): def __init__(self, X, y):
self.X = torch.tensor(X, dtype=torch.float32) self.X = torch.tensor(X, dtype=torch.float32)
self.y = torch.tensor(y, dtype=torch.long) self.y = torch.tensor(y, dtype=torch.long)
@@ -164,118 +150,92 @@ class ASLDataset(Dataset):
def __getitem__(self, idx): def __getitem__(self, idx):
return self.X[idx], self.y[idx] return self.X[idx], self.y[idx]
train_loader = DataLoader(ASLSequenceDataset(X_train, y_train), batch_size=64, shuffle=True, pin_memory=True)
train_loader = DataLoader( test_loader = DataLoader(ASLSequenceDataset(X_test, y_test), batch_size=64, shuffle=False, pin_memory=True)
ASLDataset(X_train, y_train),
batch_size=256,
shuffle=True,
pin_memory=True
)
test_loader = DataLoader(
ASLDataset(X_test, y_test),
batch_size=256,
shuffle=False,
pin_memory=True
)
# =============================== # ===============================
# MODEL (FIXED) # TRANSFORMER MODEL
# =============================== # ===============================
class TransformerASL(nn.Module): class PositionalEncoding(nn.Module):
def __init__(self, input_dim, num_classes): def __init__(self, d_model, max_len=100):
super().__init__() super().__init__()
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0)/d_model))
pe[:, 0::2] = torch.sin(position*div_term)
pe[:, 1::2] = torch.cos(position*div_term)
self.register_buffer('pe', pe.unsqueeze(0))
self.proj = nn.Linear(input_dim, 256) def forward(self, x):
self.norm = nn.LayerNorm(256) return x + self.pe[:, :x.size(1), :]
encoder_layer = nn.TransformerEncoderLayer( class TransformerASL(nn.Module):
d_model=256, def __init__(self, input_dim, num_classes, d_model=256, nhead=8, num_layers=4):
nhead=8, super().__init__()
dim_feedforward=1024, self.proj = nn.Linear(input_dim, d_model)
dropout=0.1, self.norm = nn.LayerNorm(d_model)
activation="gelu", self.pos = PositionalEncoding(d_model)
batch_first=True,
norm_first=True encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=nhead, dim_feedforward=1024,
dropout=0.1, activation='gelu', batch_first=True, norm_first=True)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.fc = nn.Sequential(
nn.Linear(d_model, 512),
nn.BatchNorm1d(512),
nn.GELU(),
nn.Dropout(0.3),
nn.Linear(512, num_classes)
) )
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=4)
self.fc1 = nn.Linear(256, 512)
self.bn1 = nn.BatchNorm1d(512)
self.drop1 = nn.Dropout(0.4)
self.fc2 = nn.Linear(512, 256)
self.bn2 = nn.BatchNorm1d(256)
self.drop2 = nn.Dropout(0.3)
self.out = nn.Linear(256, num_classes)
def forward(self, x): def forward(self, x):
x = self.proj(x) x = self.proj(x)
x = self.norm(x) x = self.norm(x)
x = self.pos(x)
x = self.encoder(x) # (B, T, d_model)
x = x.mean(dim=1) # temporal average
x = self.fc(x)
return x
x = x.unsqueeze(1) # (B, 1, 256) model = TransformerASL(input_dim=X.shape[2], num_classes=num_classes).to(device)
x = self.encoder(x)
x = x.squeeze(1)
x = F.gelu(self.bn1(self.fc1(x)))
x = self.drop1(x)
x = F.gelu(self.bn2(self.fc2(x)))
x = self.drop2(x)
return self.out(x)
model = TransformerASL(X.shape[1], num_classes).to(device)
print("Parameters:", sum(p.numel() for p in model.parameters())) print("Parameters:", sum(p.numel() for p in model.parameters()))
# =============================== # ===============================
# TRAINING SETUP # TRAIN SETUP
# =============================== # ===============================
criterion = nn.CrossEntropyLoss(label_smoothing=0.1) criterion = nn.CrossEntropyLoss(label_smoothing=0.1)
optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4) optimizer = optim.AdamW(model.parameters(), lr=3e-4, weight_decay=1e-4)
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, 10) scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=10)
# =============================== # ===============================
# TRAIN / EVAL # TRAIN / EVAL FUNCTIONS
# =============================== # ===============================
def train_epoch(): def train_epoch():
model.train() model.train()
total, correct, loss_sum = 0, 0, 0 total, correct, loss_sum = 0, 0, 0
for x, y in train_loader: for x, y in train_loader:
x, y = x.to(device), y.to(device) x, y = x.to(device), y.to(device)
optimizer.zero_grad(set_to_none=True) optimizer.zero_grad(set_to_none=True)
logits = model(x) logits = model(x)
loss = criterion(logits, y) loss = criterion(logits, y)
loss.backward() loss.backward()
torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0) torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step() optimizer.step()
loss_sum += loss.item() loss_sum += loss.item()
correct += (logits.argmax(1) == y).sum().item() correct += (logits.argmax(1) == y).sum().item()
total += y.size(0) total += y.size(0)
return loss_sum/len(train_loader), 100*correct/total
return loss_sum / len(train_loader), 100 * correct / total
@torch.no_grad() @torch.no_grad()
def evaluate(): def evaluate():
model.eval() model.eval()
total, correct = 0, 0 total, correct = 0, 0
for x, y in test_loader: for x, y in test_loader:
x, y = x.to(device), y.to(device) x, y = x.to(device), y.to(device)
logits = model(x) logits = model(x)
correct += (logits.argmax(1) == y).sum().item() correct += (logits.argmax(1) == y).sum().item()
total += y.size(0) total += y.size(0)
return 100*correct/total
return 100 * correct / total
# =============================== # ===============================
# TRAIN LOOP # TRAIN LOOP
@@ -289,24 +249,16 @@ for epoch in range(epochs):
loss, train_acc = train_epoch() loss, train_acc = train_epoch()
test_acc = evaluate() test_acc = evaluate()
scheduler.step() scheduler.step()
print(f"Epoch {epoch+1}/{epochs} | Loss {loss:.4f} | Train {train_acc:.2f}% | Test {test_acc:.2f}%")
print(f"Epoch {epoch+1}/{epochs} | "
f"Loss {loss:.4f} | "
f"Train {train_acc:.2f}% | "
f"Test {test_acc:.2f}%")
if test_acc > best_acc: if test_acc > best_acc:
best_acc = test_acc best_acc = test_acc
wait = 0 wait = 0
torch.save({ torch.save({"model": model.state_dict(), "label_encoder": le}, "asl_transformer_full.pth")
"model": model.state_dict(),
"label_encoder": le
}, "asl_transformer_fixed.pth")
else: else:
wait += 1 wait += 1
if wait >= patience:
if wait >= patience: print("Early stopping")
print("Early stopping") break
break
print("Best accuracy:", best_acc) print("Best accuracy:", best_acc)