#!/usr/bin/python #-*- coding: utf-8 -*- """ Fully Convolutional SyncNet Instance for Inference This module provides inference capabilities for the FCN-SyncNet model, including variable-length input processing and temporal sync prediction. Key improvements over original: 1. Processes entire sequences at once (no fixed windows) 2. Returns frame-by-frame sync predictions 3. Better temporal smoothing 4. Confidence estimation per frame Author: Enhanced version Date: 2025-11-22 """ import torch import torch.nn.functional as F import numpy as np import time, os, math, glob, subprocess import cv2 import python_speech_features from scipy import signal from scipy.io import wavfile from SyncNetModel_FCN import SyncNetFCN, SyncNetFCN_WithAttention from shutil import rmtree class SyncNetInstance_FCN(torch.nn.Module): """ SyncNet instance for fully convolutional inference. Supports variable-length inputs and dense temporal predictions. """ def __init__(self, model_type='fcn', embedding_dim=512, max_offset=15, use_attention=False): super(SyncNetInstance_FCN, self).__init__() self.embedding_dim = embedding_dim self.max_offset = max_offset self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # Initialize model if use_attention: self.model = SyncNetFCN_WithAttention( embedding_dim=embedding_dim, max_offset=max_offset ).to(self.device) else: self.model = SyncNetFCN( embedding_dim=embedding_dim, max_offset=max_offset ).to(self.device) def loadParameters(self, path): """Load model parameters from checkpoint.""" loaded_state = torch.load(path, map_location=self.device) # Handle different checkpoint formats if isinstance(loaded_state, dict): if 'model_state_dict' in loaded_state: state_dict = loaded_state['model_state_dict'] elif 'state_dict' in loaded_state: state_dict = loaded_state['state_dict'] else: state_dict = loaded_state else: state_dict = loaded_state.state_dict() # Load with strict=False to allow partial loading try: self.model.load_state_dict(state_dict, strict=True) print(f"Model loaded from {path}") except: print(f"Warning: Could not load all parameters from {path}") self.model.load_state_dict(state_dict, strict=False) def preprocess_audio(self, audio_path, target_length=None): """ Load and preprocess audio file. Args: audio_path: Path to audio WAV file target_length: Optional target length in frames Returns: mfcc_tensor: [1, 1, 13, T] - MFCC features sample_rate: Audio sample rate """ # Load audio sample_rate, audio = wavfile.read(audio_path) # Compute MFCC mfcc = python_speech_features.mfcc(audio, sample_rate) mfcc = mfcc.T # [13, T] # Truncate or pad to target length if target_length is not None: if mfcc.shape[1] > target_length: mfcc = mfcc[:, :target_length] elif mfcc.shape[1] < target_length: pad_width = target_length - mfcc.shape[1] mfcc = np.pad(mfcc, ((0, 0), (0, pad_width)), mode='edge') # Add batch and channel dimensions mfcc = np.expand_dims(mfcc, axis=0) # [1, 13, T] mfcc = np.expand_dims(mfcc, axis=0) # [1, 1, 13, T] # Convert to tensor mfcc_tensor = torch.FloatTensor(mfcc) return mfcc_tensor, sample_rate def preprocess_video(self, video_path, target_length=None): """ Load and preprocess video file. Args: video_path: Path to video file or directory of frames target_length: Optional target length in frames Returns: video_tensor: [1, 3, T, H, W] - video frames """ # Load video frames if os.path.isdir(video_path): # Load from directory flist = sorted(glob.glob(os.path.join(video_path, '*.jpg'))) images = [cv2.imread(f) for f in flist] else: # Load from video file cap = cv2.VideoCapture(video_path) images = [] while True: ret, frame = cap.read() if not ret: break images.append(frame) cap.release() if len(images) == 0: raise ValueError(f"No frames found in {video_path}") # Truncate or pad to target length if target_length is not None: if len(images) > target_length: images = images[:target_length] elif len(images) < target_length: # Pad by repeating last frame last_frame = images[-1] images.extend([last_frame] * (target_length - len(images))) # Stack and normalize im = np.stack(images, axis=0) # [T, H, W, 3] im = im.astype(float) / 255.0 # Normalize to [0, 1] # Rearrange to [1, 3, T, H, W] im = np.transpose(im, (3, 0, 1, 2)) # [3, T, H, W] im = np.expand_dims(im, axis=0) # [1, 3, T, H, W] # Convert to tensor video_tensor = torch.FloatTensor(im) return video_tensor def evaluate(self, opt, videofile): """ Evaluate sync for a video file. Returns frame-by-frame sync predictions. Args: opt: Options object with configuration videofile: Path to video file Returns: offsets: [T] - predicted offset for each frame confidences: [T] - confidence for each frame sync_probs: [2K+1, T] - full probability distribution """ self.model.eval() # Create temporary directory if os.path.exists(os.path.join(opt.tmp_dir, opt.reference)): rmtree(os.path.join(opt.tmp_dir, opt.reference)) os.makedirs(os.path.join(opt.tmp_dir, opt.reference)) # Extract frames and audio print("Extracting frames and audio...") frames_path = os.path.join(opt.tmp_dir, opt.reference) audio_path = os.path.join(opt.tmp_dir, opt.reference, 'audio.wav') # Extract frames command = (f"ffmpeg -y -i {videofile} -threads 1 -f image2 " f"{os.path.join(frames_path, '%06d.jpg')}") subprocess.call(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Extract audio command = (f"ffmpeg -y -i {videofile} -async 1 -ac 1 -vn " f"-acodec pcm_s16le -ar 16000 {audio_path}") subprocess.call(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Preprocess audio and video print("Loading and preprocessing data...") audio_tensor, sample_rate = self.preprocess_audio(audio_path) video_tensor = self.preprocess_video(frames_path) # Check length consistency audio_duration = audio_tensor.shape[3] / 100.0 # MFCC is 100 fps video_duration = video_tensor.shape[2] / 25.0 # Video is 25 fps if abs(audio_duration - video_duration) > 0.1: print(f"WARNING: Audio ({audio_duration:.2f}s) and video " f"({video_duration:.2f}s) lengths differ") # Align lengths (use shorter) min_length = min( video_tensor.shape[2], # video frames audio_tensor.shape[3] // 4 # audio frames (4:1 ratio) ) video_tensor = video_tensor[:, :, :min_length, :, :] audio_tensor = audio_tensor[:, :, :, :min_length*4] print(f"Processing {min_length} frames...") # Forward pass tS = time.time() with torch.no_grad(): sync_probs, audio_feat, video_feat = self.model( audio_tensor.to(self.device), video_tensor.to(self.device) ) print(f'Compute time: {time.time()-tS:.3f} sec') # Compute offsets and confidences offsets, confidences = self.model.compute_offset(sync_probs) # Convert to numpy offsets = offsets.cpu().numpy()[0] # [T] confidences = confidences.cpu().numpy()[0] # [T] sync_probs = sync_probs.cpu().numpy()[0] # [2K+1, T] # Apply temporal smoothing to confidences confidences_smooth = signal.medfilt(confidences, kernel_size=9) # Compute overall statistics median_offset = np.median(offsets) mean_confidence = np.mean(confidences_smooth) # Find consensus offset (mode) offset_hist, offset_bins = np.histogram(offsets, bins=2*self.max_offset+1) consensus_offset = offset_bins[np.argmax(offset_hist)] # Print results np.set_printoptions(formatter={'float': '{: 0.3f}'.format}) print('\nFrame-wise confidence (smoothed):') print(confidences_smooth) print(f'\nConsensus offset: \t{consensus_offset:.1f} frames') print(f'Median offset: \t\t{median_offset:.1f} frames') print(f'Mean confidence: \t{mean_confidence:.3f}') return offsets, confidences_smooth, sync_probs def evaluate_batch(self, opt, videofile, chunk_size=100, overlap=10): """ Evaluate long videos in chunks with overlap for consistency. Args: opt: Options object videofile: Path to video file chunk_size: Number of frames per chunk overlap: Number of overlapping frames between chunks Returns: offsets: [T] - predicted offset for each frame confidences: [T] - confidence for each frame """ self.model.eval() # Create temporary directory if os.path.exists(os.path.join(opt.tmp_dir, opt.reference)): rmtree(os.path.join(opt.tmp_dir, opt.reference)) os.makedirs(os.path.join(opt.tmp_dir, opt.reference)) # Extract frames and audio frames_path = os.path.join(opt.tmp_dir, opt.reference) audio_path = os.path.join(opt.tmp_dir, opt.reference, 'audio.wav') # Extract frames command = (f"ffmpeg -y -i {videofile} -threads 1 -f image2 " f"{os.path.join(frames_path, '%06d.jpg')}") subprocess.call(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Extract audio command = (f"ffmpeg -y -i {videofile} -async 1 -ac 1 -vn " f"-acodec pcm_s16le -ar 16000 {audio_path}") subprocess.call(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) # Preprocess audio and video audio_tensor, sample_rate = self.preprocess_audio(audio_path) video_tensor = self.preprocess_video(frames_path) # Process in chunks all_offsets = [] all_confidences = [] stride = chunk_size - overlap num_chunks = (video_tensor.shape[2] - overlap) // stride + 1 for chunk_idx in range(num_chunks): start_idx = chunk_idx * stride end_idx = min(start_idx + chunk_size, video_tensor.shape[2]) # Extract chunk video_chunk = video_tensor[:, :, start_idx:end_idx, :, :] audio_chunk = audio_tensor[:, :, :, start_idx*4:end_idx*4] # Forward pass with torch.no_grad(): sync_probs, _, _ = self.model( audio_chunk.to(self.device), video_chunk.to(self.device) ) # Compute offsets offsets, confidences = self.model.compute_offset(sync_probs) # Handle overlap (average predictions) if chunk_idx > 0: # Average overlapping region overlap_frames = overlap all_offsets[-overlap_frames:] = ( all_offsets[-overlap_frames:] + offsets[:overlap_frames].cpu().numpy()[0] ) / 2 all_confidences[-overlap_frames:] = ( all_confidences[-overlap_frames:] + confidences[:overlap_frames].cpu().numpy()[0] ) / 2 # Append non-overlapping part all_offsets.extend(offsets[overlap_frames:].cpu().numpy()[0]) all_confidences.extend(confidences[overlap_frames:].cpu().numpy()[0]) else: all_offsets.extend(offsets.cpu().numpy()[0]) all_confidences.extend(confidences.cpu().numpy()[0]) offsets = np.array(all_offsets) confidences = np.array(all_confidences) return offsets, confidences def extract_features(self, opt, videofile, feature_type='both'): """ Extract audio and/or video features for downstream tasks. Args: opt: Options object videofile: Path to video file feature_type: 'audio', 'video', or 'both' Returns: features: Dictionary with audio_features and/or video_features """ self.model.eval() # Preprocess if feature_type in ['audio', 'both']: audio_path = os.path.join(opt.tmp_dir, opt.reference, 'audio.wav') audio_tensor, _ = self.preprocess_audio(audio_path) if feature_type in ['video', 'both']: frames_path = os.path.join(opt.tmp_dir, opt.reference) video_tensor = self.preprocess_video(frames_path) features = {} # Extract features with torch.no_grad(): if feature_type in ['audio', 'both']: audio_features = self.model.forward_audio(audio_tensor.to(self.device)) features['audio'] = audio_features.cpu().numpy() if feature_type in ['video', 'both']: video_features = self.model.forward_video(video_tensor.to(self.device)) features['video'] = video_features.cpu().numpy() return features # ==================== UTILITY FUNCTIONS ==================== def visualize_sync_predictions(offsets, confidences, save_path=None): """ Visualize sync predictions over time. Args: offsets: [T] - predicted offsets confidences: [T] - confidence scores save_path: Optional path to save plot """ try: import matplotlib.pyplot as plt fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8)) # Plot offsets ax1.plot(offsets, linewidth=2) ax1.axhline(y=0, color='r', linestyle='--', alpha=0.5) ax1.set_xlabel('Frame') ax1.set_ylabel('Offset (frames)') ax1.set_title('Audio-Visual Sync Offset Over Time') ax1.grid(True, alpha=0.3) # Plot confidences ax2.plot(confidences, linewidth=2, color='green') ax2.set_xlabel('Frame') ax2.set_ylabel('Confidence') ax2.set_title('Sync Detection Confidence Over Time') ax2.grid(True, alpha=0.3) plt.tight_layout() if save_path: plt.savefig(save_path, dpi=150, bbox_inches='tight') print(f"Visualization saved to {save_path}") else: plt.show() except ImportError: print("matplotlib not installed. Skipping visualization.") if __name__ == "__main__": import argparse # Parse arguments parser = argparse.ArgumentParser(description='FCN SyncNet Inference') parser.add_argument('--videofile', type=str, required=True, help='Path to input video file') parser.add_argument('--model_path', type=str, default='data/syncnet_v2.model', help='Path to model checkpoint') parser.add_argument('--tmp_dir', type=str, default='data/tmp', help='Temporary directory for processing') parser.add_argument('--reference', type=str, default='test', help='Reference name for this video') parser.add_argument('--use_attention', action='store_true', help='Use attention-based model') parser.add_argument('--visualize', action='store_true', help='Visualize results') parser.add_argument('--max_offset', type=int, default=15, help='Maximum offset to consider (frames)') opt = parser.parse_args() # Create instance print("Initializing FCN SyncNet...") syncnet = SyncNetInstance_FCN( use_attention=opt.use_attention, max_offset=opt.max_offset ) # Load model (if available) if os.path.exists(opt.model_path): print(f"Loading model from {opt.model_path}") try: syncnet.loadParameters(opt.model_path) except: print("Warning: Could not load pretrained weights. Using random initialization.") # Evaluate print(f"\nEvaluating video: {opt.videofile}") offsets, confidences, sync_probs = syncnet.evaluate(opt, opt.videofile) # Visualize if opt.visualize: viz_path = opt.videofile.replace('.mp4', '_sync_analysis.png') viz_path = viz_path.replace('.avi', '_sync_analysis.png') visualize_sync_predictions(offsets, confidences, save_path=viz_path) print("\nDone!")