94 lines
4.4 KiB
Python
94 lines
4.4 KiB
Python
#!/usr/bin/env python3
|
|
# Build fixed-length (N frames) dataset from sequences/<split>/<CLASS>/clip_*.npz
|
|
|
|
import argparse, os, glob, json # stdlib utilities
|
|
from pathlib import Path
|
|
import numpy as np # arrays
|
|
|
|
def resample_sequence(X, N=32):
|
|
"""
|
|
Linearly resample a variable-length sequence (T,F) to exactly (N,F) over the frame index.
|
|
This preserves temporal order and distributes frames evenly across the clip.
|
|
"""
|
|
T = len(X) # original number of frames
|
|
if T == 0: return np.zeros((N, X.shape[1]), np.float32) # empty → zeros
|
|
if T == 1: return np.repeat(X, N, axis=0) # single frame → tile N times
|
|
src = np.linspace(0, T-1, num=T, dtype=np.float32) # original frame positions
|
|
dst = np.linspace(0, T-1, num=N, dtype=np.float32) # desired positions
|
|
out = np.zeros((N, X.shape[1]), np.float32) # allocate result
|
|
for d in range(X.shape[1]): # interpolate each feature independently
|
|
out[:, d] = np.interp(dst, src, X[:, d]) # linear interpolation
|
|
return out
|
|
|
|
def load_classes(seq_root: Path):
|
|
"""
|
|
Discover class subfolders under sequences/train/.
|
|
Ignores hidden/system directories. Returns sorted list of class names.
|
|
"""
|
|
train_dir = seq_root / "train"
|
|
if not train_dir.exists():
|
|
raise SystemExit(f"Missing folder: {train_dir}")
|
|
classes = sorted([
|
|
p.name for p in train_dir.iterdir()
|
|
if p.is_dir() and not p.name.startswith(".")
|
|
])
|
|
if not classes:
|
|
raise SystemExit("No classes found in sequences/train/ (folders should be class names like Mother, Father, etc.)")
|
|
return classes
|
|
|
|
def collect_split(seq_root: Path, split: str, classes, N):
|
|
"""
|
|
Collect all clips for a given split ('train' or 'val'):
|
|
- Load each clip_*.npz
|
|
- Resample to (N,F)
|
|
- Stack into X (num_clips, N, F) and y (num_clips,)
|
|
"""
|
|
Xs, ys = [], []
|
|
for ci, cls in enumerate(classes): # class index, name
|
|
for f in sorted(glob.glob(str(seq_root / split / cls / "clip_*.npz"))): # iterate clips
|
|
d = np.load(f) # load .npz
|
|
Xi = d["X"].astype(np.float32) # (T,F) features
|
|
XiN = resample_sequence(Xi, N) # (N,F) resampled
|
|
Xs.append(XiN); ys.append(ci) # add to lists
|
|
if Xs:
|
|
X = np.stack(Xs, 0); y = np.array(ys, np.int64) # stack arrays
|
|
else:
|
|
X = np.zeros((0, N, 1), np.float32); y = np.zeros((0,), np.int64) # empty split guard
|
|
return X, y
|
|
|
|
def main():
|
|
"""
|
|
CLI: read sequences/*/*/clip_*.npz, resample to --frames, and write dataset arrays and metadata.
|
|
"""
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--in", dest="in_dir", default="sequences") # source root
|
|
ap.add_argument("--out", default="landmarks_seq32") # destination folder
|
|
ap.add_argument("--frames", type=int, default=32) # target frames per clip
|
|
args = ap.parse_args()
|
|
|
|
seq_root = Path(args.in_dir) # resolve input root
|
|
outdir = Path(args.out); outdir.mkdir(parents=True, exist_ok=True)
|
|
|
|
classes = load_classes(seq_root) # discover class names
|
|
trX, trY = collect_split(seq_root, "train", classes, args.frames) # build train split
|
|
vaX, vaY = collect_split(seq_root, "val", classes, args.frames) # build val split
|
|
|
|
if trX.size == 0 and vaX.size == 0: # sanity check
|
|
raise SystemExit("Found no clips. Did you run capture and save any clip_*.npz files?")
|
|
|
|
np.save(outdir/"train_X.npy", trX) # save arrays
|
|
np.save(outdir/"train_y.npy", trY)
|
|
np.save(outdir/"val_X.npy", vaX)
|
|
np.save(outdir/"val_y.npy", vaY)
|
|
json.dump(classes, open(outdir/"class_names.json", "w")) # save labels
|
|
|
|
# Detect true feature dimension from data (in case it changes)
|
|
input_dim = int(trX.shape[-1] if trX.size else vaX.shape[-1])
|
|
json.dump({"frames": args.frames, "input_dim": input_dim}, open(outdir/"meta.json","w"))
|
|
|
|
print(f"Saved dataset → {outdir}")
|
|
print(f" train {trX.shape}, val {vaX.shape}, classes={classes}, input_dim={input_dim}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|