#!/usr/bin/env python3 # Train BiGRU on (T, F=1662) sequences; reads input_dim from meta.json import os, json, argparse import numpy as np import torch, torch.nn as nn from torch.utils.data import Dataset, DataLoader def get_device(): return torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu") class SeqDataset(Dataset): def __init__(self, X, y, augment=False): self.X = X.astype(np.float32) self.y = y.astype(np.int64) self.augment = augment def __len__(self): return len(self.y) def _augment(self, seq): # Light Gaussian noise — safe for high-D features return seq + np.random.normal(0, 0.01, size=seq.shape).astype(np.float32) def __getitem__(self, i): xi = self.X[i] if self.augment: xi = self._augment(xi) return torch.from_numpy(xi).float(), int(self.y[i]) class SeqGRU(nn.Module): def __init__(self, input_dim, hidden=128, num_classes=26): super().__init__() self.gru = nn.GRU(input_dim, hidden, batch_first=True, bidirectional=True) self.head = nn.Sequential( nn.Linear(hidden*2, 128), nn.ReLU(), nn.Dropout(0.2), nn.Linear(128, num_classes), ) def forward(self, x): h,_ = self.gru(x) return self.head(h[:, -1, :]) def main(): ap = argparse.ArgumentParser() ap.add_argument("--landmarks", default="landmarks_seq32") ap.add_argument("--epochs", type=int, default=40) ap.add_argument("--batch", type=int, default=64) ap.add_argument("--lr", type=float, default=1e-3) ap.add_argument("--out", default="asl_seq32_gru.pt") args = ap.parse_args() trX = np.load(os.path.join(args.landmarks,"train_X.npy")) trY = np.load(os.path.join(args.landmarks,"train_y.npy")) vaX = np.load(os.path.join(args.landmarks,"val_X.npy")) vaY = np.load(os.path.join(args.landmarks,"val_y.npy")) classes = json.load(open(os.path.join(args.landmarks,"class_names.json"))) meta = json.load(open(os.path.join(args.landmarks,"meta.json"))) T = int(meta["frames"]) input_dim = int(meta.get("input_dim", trX.shape[-1])) print(f"Loaded: train {trX.shape} val {vaX.shape} classes={classes} input_dim={input_dim}") # Global normalization (feature-wise) X_mean = trX.reshape(-1, trX.shape[-1]).mean(axis=0, keepdims=True).astype(np.float32) X_std = trX.reshape(-1, trX.shape[-1]).std(axis=0, keepdims=True).astype(np.float32) + 1e-6 trXn = (trX - X_mean) / X_std vaXn = (vaX - X_mean) / X_std tr_ds = SeqDataset(trXn, trY, augment=True) va_ds = SeqDataset(vaXn, vaY, augment=False) tr_dl = DataLoader(tr_ds, batch_size=args.batch, shuffle=True) va_dl = DataLoader(va_ds, batch_size=args.batch, shuffle=False) device = get_device() model = SeqGRU(input_dim=input_dim, hidden=128, num_classes=len(classes)).to(device) crit = nn.CrossEntropyLoss() opt = torch.optim.AdamW(model.parameters(), lr=args.lr, weight_decay=1e-4) sch = torch.optim.lr_scheduler.CosineAnnealingLR(opt, T_max=args.epochs) best_acc, best_state = 0.0, None for epoch in range(1, args.epochs+1): model.train() tot, correct, loss_sum = 0, 0, 0.0 for xb, yb in tr_dl: xb, yb = xb.to(device), yb.to(device) opt.zero_grad(set_to_none=True) logits = model(xb) loss = crit(logits, yb) loss.backward() opt.step() loss_sum += loss.item() * yb.size(0) correct += (logits.argmax(1)==yb).sum().item() tot += yb.size(0) tr_loss = loss_sum / max(1, tot) tr_acc = correct / max(1, tot) model.eval() vtot, vcorrect = 0, 0 with torch.no_grad(): for xb, yb in va_dl: xb, yb = xb.to(device), yb.to(device) logits = model(xb) vcorrect += (logits.argmax(1)==yb).sum().item() vtot += yb.size(0) va_acc = vcorrect / max(1, vtot) sch.step() print(f"Epoch {epoch:02d}: train_loss={tr_loss:.4f} train_acc={tr_acc:.3f} val_acc={va_acc:.3f}") if va_acc > best_acc: best_acc = va_acc best_state = { "model": model.state_dict(), "classes": classes, "frames": T, "X_mean": torch.from_numpy(X_mean), "X_std": torch.from_numpy(X_std), } torch.save(best_state, args.out) print(f" ✅ Saved best → {args.out} (val_acc={best_acc:.3f})") print("Done. Best val_acc:", best_acc) if __name__ == "__main__": main()