324 lines
18 KiB
Python
324 lines
18 KiB
Python
#!/usr/bin/env python3
|
||
# capture_sequence.py
|
||
# Record N short sequences per label with MediaPipe Holistic and build per-frame features:
|
||
# RightHand(63) + LeftHand(63) + Face(468*3=1404) + Pose(33*4=132) + Face-relative hand extras(8) = 1670 dims
|
||
# Requirements: numpy, opencv-python, mediapipe
|
||
|
||
import argparse, os, time, math, re # stdlib: args, filesystem, timing, trig, regex
|
||
from pathlib import Path # pathlib for portable paths
|
||
import numpy as np, cv2, mediapipe as mp # core libs: arrays, webcam/GUI, landmarks
|
||
|
||
mp_holistic = mp.solutions.holistic # alias to the Holistic solution entry
|
||
|
||
# ---------- geometry / normalization ----------
|
||
def _angle(v):
|
||
"""
|
||
Return atan2(y, x) of a 2D vector.
|
||
Used to compute the orientation of a segment in the image plane.
|
||
"""
|
||
return math.atan2(v[1], v[0]) # angle in radians for rotation normalization
|
||
|
||
def _rot2d(t):
|
||
"""
|
||
Build a 2×2 rotation matrix for angle t (radians).
|
||
Used to rotate landmark sets into a canonical frame.
|
||
"""
|
||
c, s = math.cos(t), math.sin(t) # precompute cos/sin for speed
|
||
return np.array([[c, -s], [s, c]], dtype=np.float32) # standard 2D rotation matrix
|
||
|
||
def normalize_hand(pts, handed=None):
|
||
"""
|
||
Normalize a (21,3) hand landmark array:
|
||
1) translate so wrist (idx 0) is at origin
|
||
2) mirror X for left hands so both hands look like right
|
||
3) rotate so vector from wrist->middle-MCP (idx 9) points +Y
|
||
4) scale by max pairwise XY distance so size is comparable across frames
|
||
Returns: (21,3) float32
|
||
"""
|
||
pts = pts.astype(np.float32).copy() # make float32 copy (avoid mutating caller)
|
||
pts[:, :2] -= pts[0, :2] # translate: wrist to origin (stabilizes position)
|
||
if handed and str(handed).lower().startswith("left"):
|
||
pts[:, 0] *= -1.0 # mirror X for left hand to canonicalize handedness
|
||
v = pts[9, :2] # vector from wrist→middle MCP (index 9)
|
||
R = _rot2d(math.pi/2 - _angle(v)) # rotate so this vector points up (+Y)
|
||
pts[:, :2] = pts[:, :2] @ R.T # apply rotation to XY (keep Z as-is for now)
|
||
xy = pts[:, :2] # convenience view
|
||
d = np.linalg.norm(xy[None,:,:] - xy[:,None,:], axis=-1).max() # max pairwise XY distance (scale)
|
||
d = 1.0 if d < 1e-6 else float(d) # avoid divide-by-zero on degenerate frames
|
||
pts[:, :2] /= d; pts[:, 2] /= d # isotropic scale XY and Z by same factor
|
||
return pts # return normalized hand landmarks
|
||
|
||
def normalize_face(face):
|
||
"""
|
||
Normalize a (468,3) face mesh:
|
||
1) center at midpoint between outer eye corners (33, 263)
|
||
2) scale by inter-ocular distance
|
||
3) rotate so eye-line is horizontal
|
||
Returns: (468,3) float32
|
||
"""
|
||
f = face.astype(np.float32).copy() # safe copy
|
||
left = f[33, :2]; right = f[263, :2] # outer eye corners per MediaPipe indexing
|
||
center = 0.5 * (left + right) # center between eyes anchors the face
|
||
f[:, :2] -= center[None, :] # translate to center
|
||
eye_vec = right - left # vector from left→right eye
|
||
eye_dist = float(np.linalg.norm(eye_vec)) or 1.0 # scale factor; avoid zero
|
||
f[:, :2] /= eye_dist; f[:, 2] /= eye_dist # scale all dims consistently
|
||
R = _rot2d(-_angle(eye_vec)) # rotate so eye line aligns with +X
|
||
f[:, :2] = f[:, :2] @ R.T # apply rotation to XY
|
||
return f
|
||
|
||
def normalize_pose(pose):
|
||
"""
|
||
Normalize a (33,4) pose landmark array (x,y,z,visibility):
|
||
1) center at shoulder midpoint (11,12)
|
||
2) scale by shoulder width
|
||
3) rotate so shoulders are horizontal
|
||
Visibility channel ([:,3]) is preserved as-is.
|
||
Returns: (33,4) float32
|
||
"""
|
||
p = pose.astype(np.float32).copy() # copy to avoid mutating input
|
||
ls = p[11, :2]; rs = p[12, :2] # left/right shoulder in XY
|
||
center = 0.5 * (ls + rs) # mid-shoulder anchor
|
||
p[:, :2] -= center[None, :] # translate to center
|
||
sw_vec = rs - ls # shoulder vector (scale + rotation anchor)
|
||
sw = float(np.linalg.norm(sw_vec)) or 1.0 # shoulder width (avoid zero)
|
||
p[:, :2] /= sw; p[:, 2] /= sw # scale pose consistently
|
||
R = _rot2d(-_angle(sw_vec)) # rotate so shoulders are horizontal
|
||
p[:, :2] = p[:, :2] @ R.T # apply rotation to XY
|
||
return p
|
||
|
||
def face_frame_transform(face_pts):
|
||
"""
|
||
Compute a transform that maps image XY into the normalized face frame
|
||
(same definition as in normalize_face).
|
||
Returns:
|
||
center : (2,) eye midpoint
|
||
eye_dist: scalar inter-ocular distance
|
||
R : 2×2 rotation aligning eye-line to +X
|
||
Use downstream as: v' = ((v - center)/eye_dist) @ R.T
|
||
"""
|
||
left = face_pts[33, :2]; right = face_pts[263, :2] # reference points: eye corners
|
||
center = 0.5*(left + right) # face center
|
||
eye_vec = right - left # direction of eye line
|
||
eye_dist = float(np.linalg.norm(eye_vec)) or 1.0 # scale of face
|
||
theta = _angle(eye_vec) # angle of eye line
|
||
R = _rot2d(-theta) # rotation to align with +X
|
||
return center, eye_dist, R
|
||
|
||
def to_face_frame(pt_xy, center, eye_dist, R):
|
||
"""
|
||
Transform a 2D point from image space into the normalized face frame.
|
||
Inputs are from face_frame_transform().
|
||
"""
|
||
v = (pt_xy - center) / eye_dist # translate + scale
|
||
return (v @ R.T).astype(np.float32) # rotate into face frame
|
||
|
||
# ---------- utils ----------
|
||
def next_idx(folder: Path, prefix="clip_"):
|
||
"""
|
||
Scan a folder for files like 'clip_###.npz' and return the next index.
|
||
Keeps your saved clips sequential without collisions.
|
||
"""
|
||
pat = re.compile(rf"^{re.escape(prefix)}(\d+)\.npz$") # matches clip index
|
||
mx = 0 # track max index seen
|
||
if folder.exists(): # only if folder exists
|
||
for n in os.listdir(folder): # iterate files
|
||
m = pat.match(n) # regex match
|
||
if m: mx = max(mx, int(m.group(1))) # update max on matches
|
||
return mx + 1 # next available index
|
||
|
||
def countdown(cap, seconds=3):
|
||
"""
|
||
Show a full-screen countdown overlay before recording starts.
|
||
Press 'q' to abort during countdown.
|
||
"""
|
||
for i in range(seconds, 0, -1): # 3..2..1
|
||
start = time.time() # ensure ~1s display per number
|
||
while time.time() - start < 1.0:
|
||
ok, frame = cap.read() # read a frame
|
||
if not ok: continue # skip if camera hiccups
|
||
h, w = frame.shape[:2] # frame size for centering text
|
||
text = str(i) # the digit to render
|
||
(tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 5, 10) # size of big number
|
||
cv2.putText(frame, text, ((w - tw)//2, (h + th)//2),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 5, (0,0,255), 10, cv2.LINE_AA) # draw big red numeral
|
||
msg = "Starting in..." # helper message above the number
|
||
(mw, mh), _ = cv2.getTextSize(msg, cv2.FONT_HERSHEY_SIMPLEX, 1.2, 3)
|
||
cv2.putText(frame, msg, ((w - mw)//2, (h//2) - th - 20),
|
||
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0,255,255), 3, cv2.LINE_AA)
|
||
cv2.imshow("sequence capture", frame) # show overlay
|
||
if cv2.waitKey(1) & 0xFF == ord('q'): # allow abort
|
||
cap.release(); cv2.destroyAllWindows()
|
||
raise SystemExit("Aborted during countdown")
|
||
|
||
def draw_progress_bar(img, frac_remaining, bar_h=16, margin=12):
|
||
"""
|
||
Draw a simple progress bar at the top of the frame.
|
||
frac_remaining in [0,1] indicates time left in the clip.
|
||
"""
|
||
h, w = img.shape[:2] # image dimensions
|
||
x0, x1 = margin, w - margin # horizontal extent
|
||
y0, y1 = margin, margin + bar_h # vertical extent
|
||
cv2.rectangle(img, (x0, y0), (x1, y1), (40, 40, 40), -1) # dark background bar
|
||
cv2.rectangle(img, (x0, y0), (x1, y1), (90, 90, 90), 2) # border
|
||
rem_w = int((x1 - x0) * max(0.0, min(1.0, frac_remaining))) # filled width clamped
|
||
if rem_w > 0:
|
||
cv2.rectangle(img, (x0, y0), (x0 + rem_w, y1), (0, 200, 0), -1) # green fill
|
||
|
||
# ---------- holistic wrapper ----------
|
||
class HolisticDetector:
|
||
"""
|
||
Thin wrapper around MediaPipe Holistic to fix configuration once and expose process().
|
||
"""
|
||
def __init__(self, det_conf=0.5, track_conf=0.5, model_complexity=1):
|
||
# Build the Holistic detector with steady defaults; smooth_landmarks helps temporal stability.
|
||
self.h = mp_holistic.Holistic(
|
||
static_image_mode=False, # realtime video stream
|
||
model_complexity=model_complexity, # 0=fastest, 2=most accurate
|
||
smooth_landmarks=True, # temporal smoothing reduces jitter
|
||
enable_segmentation=False, # not needed; saves compute
|
||
refine_face_landmarks=False, # faster; we only need coarse face
|
||
min_detection_confidence=det_conf, # detection threshold
|
||
min_tracking_confidence=track_conf, # tracking threshold
|
||
)
|
||
|
||
def process(self, rgb):
|
||
"""
|
||
Run landmark detection on an RGB frame and return MediaPipe results object.
|
||
"""
|
||
return self.h.process(rgb) # delegate to MP
|
||
|
||
# ---------- main ----------
|
||
def main():
|
||
"""
|
||
CLI entry: capture N clips of length --seconds for a given --label and --split,
|
||
save per-frame 1670-D features into sequences/<split>/<label>/clip_XXX.npz.
|
||
"""
|
||
ap = argparse.ArgumentParser() # CLI flag parsing
|
||
ap.add_argument("--label", required=True, help="Class label (e.g., A, B, Mother, Father, etc.)")
|
||
ap.add_argument("--split", required=True, choices=["train","val"])
|
||
ap.add_argument("--seconds", type=float, default=0.8)
|
||
ap.add_argument("--camera", type=int, default=0)
|
||
ap.add_argument("--width", type=int, default=640)
|
||
ap.add_argument("--height", type=int, default=480)
|
||
ap.add_argument("--count", type=int, default=None)
|
||
ap.add_argument("--det-thresh", type=float, default=0.5)
|
||
ap.add_argument("--holistic-complexity", type=int, default=1, choices=[0,1,2])
|
||
args = ap.parse_args() # finalize args
|
||
|
||
L = args.label.strip() # normalized label string
|
||
if len(L) == 0 or ("/" in L or "\\" in L): # basic validation to keep clean paths
|
||
raise SystemExit("Use a non-empty label without slashes")
|
||
if args.count is None: # default count per split for convenience
|
||
args.count = 100 if args.split == "train" else 20
|
||
|
||
out_dir = Path("sequences") / args.split / L # where clip_*.npz will go
|
||
out_dir.mkdir(parents=True, exist_ok=True) # ensure directory exists
|
||
idx = next_idx(out_dir) # next clip index to use
|
||
|
||
det = HolisticDetector(args.det_thresh, args.det_thresh, args.holistic_complexity) # detector
|
||
cap = cv2.VideoCapture(args.camera) # open camera device
|
||
if not cap.isOpened(): # fail early if missing
|
||
raise SystemExit(f"Could not open camera {args.camera}")
|
||
cap.set(cv2.CAP_PROP_FRAME_WIDTH, args.width) # set capture width
|
||
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, args.height) # set capture height
|
||
|
||
print(f"Recording {args.count} clips for {L}/{args.split}, {args.seconds}s each. (R+L hands + face + pose + face-relative extras)")
|
||
countdown(cap, 3) # give operator time to get ready
|
||
|
||
for n in range(args.count): # loop over requested clips
|
||
seq_X = [] # holds per-frame features
|
||
start_t = time.time(); end_t = start_t + args.seconds # fixed-length recording window
|
||
|
||
while True: # per-frame capture loop
|
||
now = time.time()
|
||
if now >= end_t: break # stop after desired duration
|
||
ok, fr = cap.read() # grab a frame
|
||
if not ok: break # camera yielded nothing; end clip
|
||
|
||
rgb = cv2.cvtColor(fr, cv2.COLOR_BGR2RGB) # MediaPipe expects RGB
|
||
res = det.process(rgb) # run landmark detection
|
||
|
||
# hands
|
||
right_pts = left_pts = None # initialize as missing
|
||
if res.right_hand_landmarks is not None: # if right detected…
|
||
right_pts = np.array([[lm.x, lm.y, lm.z]
|
||
for lm in res.right_hand_landmarks.landmark], np.float32) # (21,3)
|
||
if res.left_hand_landmarks is not None: # if left detected…
|
||
left_pts = np.array([[lm.x, lm.y, lm.z]
|
||
for lm in res.left_hand_landmarks.landmark], np.float32) # (21,3)
|
||
|
||
# face
|
||
face_pts = None
|
||
if res.face_landmarks is not None: # 468 face landmarks
|
||
face_pts = np.array([[lm.x, lm.y, lm.z] for lm in res.face_landmarks.landmark], np.float32)
|
||
|
||
# pose
|
||
pose_arr = None
|
||
if res.pose_landmarks is not None: # 33 pose landmarks with visibility
|
||
pose_arr = np.array([[lm.x, lm.y, lm.z, lm.visibility]
|
||
for lm in res.pose_landmarks.landmark], np.float32)
|
||
|
||
# Build feature: require face present and at least one hand (pose optional)
|
||
if face_pts is not None and (right_pts is not None or left_pts is not None):
|
||
f_norm = normalize_face(face_pts) # canonicalize face geometry → (468,3)
|
||
|
||
# transform pieces to express hand positions in face frame
|
||
f_center, f_scale, f_R = face_frame_transform(face_pts) # face frame for extras
|
||
|
||
def hand_face_extras(hand_pts):
|
||
"""
|
||
For a hand, return [wrist_x, wrist_y, tip_x, tip_y] in face frame.
|
||
If hand missing, returns zeros. Keeps coarse spatial relation to face.
|
||
"""
|
||
if hand_pts is None:
|
||
return np.zeros(4, np.float32) # missing hand → zeros to keep dims fixed
|
||
wrist_xy = hand_pts[0, :2] # wrist point
|
||
tip_xy = hand_pts[8, :2] # index fingertip (salient for pointing)
|
||
w = to_face_frame(wrist_xy, f_center, f_scale, f_R) # project to face frame
|
||
t = to_face_frame(tip_xy, f_center, f_scale, f_R)
|
||
return np.array([w[0], w[1], t[0], t[1]], np.float32) # pack features
|
||
|
||
rh_ex = hand_face_extras(right_pts) # (4,) right extras
|
||
lh_ex = hand_face_extras(left_pts) # (4,) left extras
|
||
|
||
rh = normalize_hand(right_pts, "Right").reshape(-1) if right_pts is not None else np.zeros(63, np.float32) # hand (63,)
|
||
lh = normalize_hand(left_pts, "Left" ).reshape(-1) if left_pts is not None else np.zeros(63, np.float32) # hand (63,)
|
||
p_norm = normalize_pose(pose_arr).reshape(-1) if pose_arr is not None else np.zeros(33*4, np.float32) # pose (132,)
|
||
|
||
feat = np.concatenate([rh, lh, f_norm.reshape(-1), p_norm, rh_ex, lh_ex], axis=0) # (1670,) full feature
|
||
seq_X.append(feat) # push this frame’s feature vector
|
||
|
||
# optional fingertip markers for visual feedback (normalized hand to index tip)
|
||
if right_pts is not None:
|
||
pt = normalize_hand(right_pts, "Right")[8, :2] # index tip in normalized [0..1]-ish coords
|
||
cv2.circle(fr, (int(fr.shape[1]*pt[0]), int(fr.shape[0]*pt[1])), 6, (0,255,0), -1) # green dot
|
||
if left_pts is not None:
|
||
pt = normalize_hand(left_pts, "Left")[8, :2]
|
||
cv2.circle(fr, (int(fr.shape[1]*pt[0]), int(fr.shape[0]*pt[1])), 6, (255,0,0), -1) # blue/red dot
|
||
|
||
# UI overlay (progress + label)
|
||
frac_remaining = (end_t - now) / max(1e-6, args.seconds) # progress bar fraction
|
||
draw_progress_bar(fr, frac_remaining, bar_h=16, margin=12)
|
||
cv2.putText(fr, f"{L} {args.split} Clip {n+1}/{args.count}",
|
||
(20, 40), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0,255,0), 2, cv2.LINE_AA)
|
||
cv2.imshow("sequence capture", fr) # show live preview
|
||
if cv2.waitKey(1) & 0xFF == ord('q'): # allow stopping whole session
|
||
cap.release(); cv2.destroyAllWindows(); return
|
||
|
||
# After clip duration, save if we collected any valid frames
|
||
if seq_X:
|
||
X = np.stack(seq_X, 0).astype(np.float32) # (T, 1670) stack into array
|
||
path = out_dir / f"clip_{idx:03d}.npz" # next filename
|
||
np.savez_compressed(path, X=X) # compressed .npz with key 'X'
|
||
print(f"💾 saved {path} frames={X.shape[0]} dims={X.shape[1]}")
|
||
idx += 1 # advance index
|
||
else:
|
||
print("⚠️ Not enough frames with face + any hand; skipped clip.") # guardrail
|
||
|
||
print("✅ Done recording.") # session complete
|
||
cap.release(); cv2.destroyAllWindows() # clean up resources
|
||
|
||
if __name__ == "__main__":
|
||
main() # run CLI
|