import cv2 import mediapipe as mp from mediapipe.tasks import python from mediapipe.tasks.python import vision import numpy as np from mediapipe.framework.formats import landmark_pb2 import argparse import os import csv # --- NEW: Helper function to create the landmark-to-feature map --- def create_landmark_map(): """Creates a mapping from landmark index to facial feature name.""" landmark_map = {} # Define the connection groups from MediaPipe's face_mesh solutions connection_groups = { 'lips': mp.solutions.face_mesh.FACEMESH_LIPS, 'left_eye': mp.solutions.face_mesh.FACEMESH_LEFT_EYE, 'right_eye': mp.solutions.face_mesh.FACEMESH_RIGHT_EYE, 'left_eyebrow': mp.solutions.face_mesh.FACEMESH_LEFT_EYEBROW, 'right_eyebrow': mp.solutions.face_mesh.FACEMESH_RIGHT_EYEBROW, 'face_oval': mp.solutions.face_mesh.FACEMESH_FACE_OVAL, 'left_iris': mp.solutions.face_mesh.FACEMESH_LEFT_IRIS, 'right_iris': mp.solutions.face_mesh.FACEMESH_RIGHT_IRIS, } # Populate the map by iterating through the connection groups for part_name, connections in connection_groups.items(): for connection in connections: landmark_map[connection[0]] = part_name landmark_map[connection[1]] = part_name return landmark_map # --- Helper Function to Draw Landmarks --- def draw_landmarks_on_image(rgb_image, detection_result): """Draws face landmarks on a single image frame.""" face_landmarks_list = detection_result.face_landmarks annotated_image = np.copy(rgb_image) # Loop through the detected faces to visualize. for face_landmarks in face_landmarks_list: face_landmarks_proto = landmark_pb2.NormalizedLandmarkList() face_landmarks_proto.landmark.extend([ landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in face_landmarks ]) mp.solutions.drawing_utils.draw_landmarks( image=annotated_image, landmark_list=face_landmarks_proto, connections=mp.solutions.face_mesh.FACEMESH_TESSELATION, landmark_drawing_spec=None, connection_drawing_spec=mp.solutions.drawing_styles .get_default_face_mesh_tesselation_style()) mp.solutions.drawing_utils.draw_landmarks( image=annotated_image, landmark_list=face_landmarks_proto, connections=mp.solutions.face_mesh.FACEMESH_CONTOURS, landmark_drawing_spec=None, connection_drawing_spec=mp.solutions.drawing_styles .get_default_face_mesh_contours_style()) mp.solutions.drawing_utils.draw_landmarks( image=annotated_image, landmark_list=face_landmarks_proto, connections=mp.solutions.face_mesh.FACEMESH_IRISES, landmark_drawing_spec=None, connection_drawing_spec=mp.solutions.drawing_styles .get_default_face_mesh_iris_connections_style()) return annotated_image def main(): parser = argparse.ArgumentParser(description='Process a video to detect and draw face landmarks.') parser.add_argument('input_video', help='The path to the input video file.') args = parser.parse_args() input_video_path = args.input_video base_name, extension = os.path.splitext(input_video_path) output_video_path = f"{base_name}_annotated{extension}" output_csv_path = f"{base_name}_landmarks.csv" # --- NEW: Create the landmark map --- landmark_to_part_map = create_landmark_map() # --- Configuration & Setup --- model_path = 'face_landmarker.task' base_options = python.BaseOptions(model_asset_path=model_path) options = vision.FaceLandmarkerOptions(base_options=base_options, output_face_blendshapes=True, output_facial_transformation_matrixes=True, num_faces=1) detector = vision.FaceLandmarker.create_from_options(options) # --- Video and CSV Setup --- cap = cv2.VideoCapture(input_video_path) if not cap.isOpened(): print(f"Error: Could not open video file {input_video_path}") return frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = int(cap.get(cv2.CAP_PROP_FPS)) fourcc = cv2.VideoWriter_fourcc(*'mp4v') out = cv2.VideoWriter(output_video_path, fourcc, fps, (frame_width, frame_height)) # Open CSV file for writing with open(output_csv_path, 'w', newline='') as csvfile: csv_writer = csv.writer(csvfile) # NEW: Write the updated header row csv_writer.writerow(['frame', 'face', 'landmark_index', 'face_part', 'x', 'y', 'z']) print(f"Processing video: {input_video_path} šŸ“¹") frame_number = 0 while(cap.isOpened()): ret, frame = cap.read() if not ret: break frame_number += 1 rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame) detection_result = detector.detect(mp_image) # Write landmark data to CSV if detection_result.face_landmarks: for face_index, face_landmarks in enumerate(detection_result.face_landmarks): for landmark_index, landmark in enumerate(face_landmarks): # NEW: Look up the face part name from the map face_part = landmark_to_part_map.get(landmark_index, 'unknown') # NEW: Write the new column to the CSV row csv_writer.writerow([frame_number, face_index, landmark_index, face_part, landmark.x, landmark.y, landmark.z]) # Draw landmarks on the frame for the video annotated_frame = draw_landmarks_on_image(rgb_frame, detection_result) bgr_annotated_frame = cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR) out.write(bgr_annotated_frame) # Release everything when the job is finished cap.release() out.release() cv2.destroyAllWindows() print(f"\nāœ… Processing complete.") print(f"Annotated video saved to: {output_video_path}") print(f"Landmarks CSV saved to: {output_csv_path}") if __name__ == '__main__': main()