|
|
import gradio as gr |
|
|
import av |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import tempfile |
|
|
import os |
|
|
|
|
|
def sample_frame_indices(num_frames, fps, total_frames): |
|
|
""" |
|
|
Fallback sampling function for basic frame selection. |
|
|
|
|
|
Args: |
|
|
num_frames (int): Number of frames to sample |
|
|
fps (float): Frames per second (not used in basic implementation) |
|
|
total_frames (int): Total frames in video |
|
|
|
|
|
Returns: |
|
|
list: Frame indices |
|
|
""" |
|
|
if total_frames <= num_frames: |
|
|
return list(range(total_frames)) |
|
|
|
|
|
|
|
|
indices = np.linspace(0, total_frames - 1, num_frames, dtype=int) |
|
|
return indices.tolist() |
|
|
|
|
|
def sample_frame_indices_efficient_segments(num_frames, segment_duration, num_segments, container): |
|
|
""" |
|
|
Enhanced frame sampling strategy that distributes frames across temporal segments |
|
|
of the video for better temporal coverage and content diversity. |
|
|
|
|
|
Args: |
|
|
num_frames (int): Total number of frames to sample |
|
|
segment_duration (float): Duration of each segment in seconds |
|
|
num_segments (int): Number of segments to sample from |
|
|
container (av.container): PyAV container object |
|
|
|
|
|
Returns: |
|
|
list: Exactly num_frames frame indices |
|
|
""" |
|
|
|
|
|
video_stream = container.streams.video[0] |
|
|
video_fps = float(video_stream.average_rate) |
|
|
total_video_frames = video_stream.frames |
|
|
video_duration = total_video_frames / video_fps |
|
|
|
|
|
|
|
|
if total_video_frames < num_frames or video_duration <= 0: |
|
|
return sample_frame_indices(num_frames, 4, total_video_frames) |
|
|
|
|
|
|
|
|
base_frames_per_segment = num_frames // num_segments |
|
|
extra_frames = num_frames % num_segments |
|
|
|
|
|
|
|
|
max_segment_duration = video_duration / num_segments * 0.8 |
|
|
effective_segment_duration = min(segment_duration, max_segment_duration) |
|
|
|
|
|
|
|
|
if effective_segment_duration < 0.5: |
|
|
return sample_frame_indices(num_frames, 4, total_video_frames) |
|
|
|
|
|
|
|
|
if num_segments == 1: |
|
|
segment_starts = [0] |
|
|
else: |
|
|
|
|
|
max_start_time = max(0, video_duration - effective_segment_duration) |
|
|
segment_starts = np.linspace(0, max_start_time, num_segments) |
|
|
|
|
|
all_indices = [] |
|
|
frames_collected = 0 |
|
|
|
|
|
for i, start_time in enumerate(segment_starts): |
|
|
|
|
|
segment_frames = base_frames_per_segment + (1 if i < extra_frames else 0) |
|
|
|
|
|
if segment_frames == 0: |
|
|
continue |
|
|
|
|
|
|
|
|
start_frame = int(start_time * video_fps) |
|
|
end_frame = min(int((start_time + effective_segment_duration) * video_fps), total_video_frames) |
|
|
|
|
|
|
|
|
if start_frame >= end_frame: |
|
|
end_frame = min(start_frame + int(0.5 * video_fps), total_video_frames) |
|
|
|
|
|
|
|
|
end_frame = min(end_frame, total_video_frames) |
|
|
|
|
|
|
|
|
if segment_frames == 1: |
|
|
|
|
|
frame_idx = start_frame + (end_frame - start_frame) // 2 |
|
|
segment_indices = [min(frame_idx, total_video_frames - 1)] |
|
|
elif end_frame - start_frame <= segment_frames: |
|
|
|
|
|
available_frames = list(range(start_frame, end_frame)) |
|
|
while len(available_frames) < segment_frames and available_frames: |
|
|
|
|
|
available_frames.extend(available_frames[:segment_frames - len(available_frames)]) |
|
|
segment_indices = available_frames[:segment_frames] |
|
|
else: |
|
|
|
|
|
segment_indices = np.linspace(start_frame, end_frame - 1, segment_frames, dtype=int).tolist() |
|
|
|
|
|
all_indices.extend(segment_indices) |
|
|
frames_collected += len(segment_indices) |
|
|
|
|
|
|
|
|
if frames_collected >= num_frames: |
|
|
break |
|
|
|
|
|
|
|
|
all_indices = np.array(all_indices) |
|
|
|
|
|
|
|
|
if len(all_indices) != num_frames: |
|
|
if len(all_indices) > num_frames: |
|
|
|
|
|
step = len(all_indices) / num_frames |
|
|
selected_indices = [all_indices[int(i * step)] for i in range(num_frames)] |
|
|
all_indices = np.array(selected_indices) |
|
|
else: |
|
|
|
|
|
needed = num_frames - len(all_indices) |
|
|
if len(all_indices) > 0: |
|
|
|
|
|
additional_indices = [] |
|
|
for i in range(needed): |
|
|
additional_indices.append(all_indices[i % len(all_indices)]) |
|
|
all_indices = np.concatenate([all_indices, additional_indices]) |
|
|
else: |
|
|
|
|
|
return sample_frame_indices(num_frames, 4, total_video_frames) |
|
|
|
|
|
|
|
|
all_indices = np.clip(all_indices, 0, total_video_frames - 1) |
|
|
|
|
|
|
|
|
all_indices = np.sort(all_indices) |
|
|
|
|
|
|
|
|
assert len(all_indices) == num_frames, f"Expected {num_frames} frames, got {len(all_indices)}" |
|
|
|
|
|
return all_indices.tolist() |
|
|
|
|
|
def extract_frames_at_indices(video_path, frame_indices): |
|
|
""" |
|
|
Extract frames from video at specified indices. |
|
|
|
|
|
Args: |
|
|
video_path (str): Path to video file |
|
|
frame_indices (list): List of frame indices to extract |
|
|
|
|
|
Returns: |
|
|
list: List of PIL Images |
|
|
""" |
|
|
container = av.open(video_path) |
|
|
video_stream = container.streams.video[0] |
|
|
|
|
|
frames = [] |
|
|
frame_idx = 0 |
|
|
target_indices = set(frame_indices) |
|
|
|
|
|
|
|
|
for frame in container.decode(video=0): |
|
|
if frame_idx in target_indices: |
|
|
|
|
|
img = frame.to_image() |
|
|
frames.append(img) |
|
|
|
|
|
|
|
|
target_indices.remove(frame_idx) |
|
|
|
|
|
|
|
|
if not target_indices: |
|
|
break |
|
|
|
|
|
frame_idx += 1 |
|
|
|
|
|
container.close() |
|
|
return frames |
|
|
|
|
|
def process_video(video_file, num_frames, segment_duration, num_segments): |
|
|
""" |
|
|
Main processing function for Gradio interface. |
|
|
|
|
|
Args: |
|
|
video_file: Uploaded video file |
|
|
num_frames (int): Number of frames to sample |
|
|
segment_duration (float): Duration of each segment in seconds |
|
|
num_segments (int): Number of segments |
|
|
|
|
|
Returns: |
|
|
tuple: (frames list, info string, indices list) |
|
|
""" |
|
|
if video_file is None: |
|
|
return [], "Please upload a video file", [] |
|
|
|
|
|
try: |
|
|
|
|
|
container = av.open(video_file) |
|
|
video_stream = container.streams.video[0] |
|
|
|
|
|
|
|
|
video_fps = float(video_stream.average_rate) |
|
|
total_frames = video_stream.frames |
|
|
video_duration = total_frames / video_fps if video_fps > 0 else 0 |
|
|
|
|
|
|
|
|
frame_indices = sample_frame_indices_efficient_segments( |
|
|
num_frames, segment_duration, num_segments, container |
|
|
) |
|
|
|
|
|
container.close() |
|
|
|
|
|
|
|
|
frames = extract_frames_at_indices(video_file, frame_indices) |
|
|
|
|
|
|
|
|
info = f""" |
|
|
**Video Information:** |
|
|
- Total frames: {total_frames} |
|
|
- FPS: {video_fps:.2f} |
|
|
- Duration: {video_duration:.2f} seconds |
|
|
""" |
|
|
|
|
|
|
|
|
labeled_frames = [] |
|
|
for i, (frame, idx) in enumerate(zip(frames, frame_indices)): |
|
|
|
|
|
frame_copy = frame.copy() |
|
|
|
|
|
labeled_frames.append((frame_copy, f"Frame {idx} (Sample {i+1}/{num_frames})")) |
|
|
|
|
|
return labeled_frames, info, frame_indices |
|
|
|
|
|
except Exception as e: |
|
|
return [], f"Error processing video: {str(e)}", [] |
|
|
|
|
|
|
|
|
with gr.Blocks(title="PATS: Proficiency-Aware Temporal Sampling for Multi-View Sports Skill Assessment") as demo: |
|
|
gr.Markdown(""" |
|
|
# PATS: Proficiency-Aware Temporal Sampling for Multi-View Sports Skill Assessment |
|
|
|
|
|
PATS (Proficiency-Aware Temporal Sampling) is a novel video sampling strategy designed specifically for automated sports skill assessment. |
|
|
Unlike traditional methods that randomly sample frames or use uniform intervals, PATS preserves complete fundamental movements within continuous temporal segments. |
|
|
The paper presenting PATS has been accepted at the 2025 4th IEEE Sport Technology and Research Workshop. |
|
|
|
|
|
This tool showcases the PATS sampling strategy. Find out more at the project page: https://edowhite.github.io/PATS |
|
|
|
|
|
## Core Concept |
|
|
The key insight is that athletic proficiency manifests through structured temporal patterns that require observing complete, uninterrupted movements. |
|
|
PATS addresses this by: |
|
|
|
|
|
- **Extracting continuous temporal segments** rather than isolated frames |
|
|
- **Preserving natural movement flow** essential for distinguishing expert from novice performance |
|
|
- **Distributing multiple segments** across the video timeline to maximize information coverage |
|
|
|
|
|
## Performance |
|
|
When applied to SkillFormer on the EgoExo4D benchmark, PATS achieves: |
|
|
|
|
|
- **Consistent improvements** across all viewing configurations (+0.65% to +3.05%) |
|
|
- **Substantial domain-specific gains:** +26.22% in bouldering, +2.39% in music, +1.13% in basketball |
|
|
|
|
|
""") |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
|
|
|
video_input = gr.Video(label="Upload Video") |
|
|
|
|
|
gr.Markdown("### Sampling Parameters") |
|
|
num_frames = gr.Slider( |
|
|
minimum=1, |
|
|
maximum=50, |
|
|
value=8, |
|
|
step=1, |
|
|
label="Number of Frames to Sample", |
|
|
info="Total number of frames to extract from the video" |
|
|
) |
|
|
|
|
|
num_segments = gr.Slider( |
|
|
minimum=1, |
|
|
maximum=20, |
|
|
value=4, |
|
|
step=1, |
|
|
label="Number of Segments", |
|
|
info="Number of temporal segments to divide the video into" |
|
|
) |
|
|
|
|
|
segment_duration = gr.Slider( |
|
|
minimum=0.5, |
|
|
maximum=10.0, |
|
|
value=2.0, |
|
|
step=0.5, |
|
|
label="Segment Duration (seconds)", |
|
|
info="Duration of each segment for sampling" |
|
|
) |
|
|
|
|
|
process_btn = gr.Button("Process Video", variant="primary") |
|
|
|
|
|
with gr.Column(scale=2): |
|
|
|
|
|
info_output = gr.Markdown(label="Processing Information") |
|
|
gallery_output = gr.Gallery( |
|
|
label="Sampled Frames", |
|
|
show_label=True, |
|
|
elem_id="gallery", |
|
|
columns=4, |
|
|
rows=3, |
|
|
height="auto" |
|
|
) |
|
|
indices_output = gr.JSON(label="Frame Indices", visible=False) |
|
|
|
|
|
|
|
|
process_btn.click( |
|
|
fn=process_video, |
|
|
inputs=[video_input, num_frames, segment_duration, num_segments], |
|
|
outputs=[gallery_output, info_output, indices_output] |
|
|
) |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |