Spaces:
Sleeping
Sleeping
| #!/usr/bin/python | |
| # -*- coding: utf-8 -*- | |
| """ | |
| Stream/Video Sync Detection with FCN-SyncNet | |
| Detect audio-video sync offset in video files or live HLS streams. | |
| Uses trained FCN model (epoch 2) with calibration for accurate results. | |
| Usage: | |
| # Video file | |
| python test_sync_detection.py --video path/to/video.mp4 | |
| # HLS stream | |
| python test_sync_detection.py --hls http://example.com/stream.m3u8 --duration 15 | |
| # Compare FCN with Original SyncNet | |
| python test_sync_detection.py --video video.mp4 --compare | |
| # Original SyncNet only | |
| python test_sync_detection.py --video video.mp4 --original | |
| # With verbose output | |
| python test_sync_detection.py --video video.mp4 --verbose | |
| # Custom model | |
| python test_sync_detection.py --video video.mp4 --model checkpoints/custom.pth | |
| """ | |
| import os | |
| import sys | |
| import argparse | |
| import torch | |
| import time | |
| # Enable UTF-8 output on Windows | |
| if sys.platform == 'win32': | |
| import io | |
| sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace') | |
| sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace') | |
| def load_model(model_path=None, device='cpu'): | |
| """Load the FCN-SyncNet model with trained weights.""" | |
| from SyncNetModel_FCN import StreamSyncFCN | |
| # Default to our best trained model | |
| if model_path is None: | |
| model_path = 'checkpoints/syncnet_fcn_epoch2.pth' | |
| # Check if it's a checkpoint file (.pth) or original syncnet model | |
| if model_path.endswith('.pth') and os.path.exists(model_path): | |
| # Load our trained FCN checkpoint | |
| model = StreamSyncFCN( | |
| max_offset=15, | |
| pretrained_syncnet_path=None, | |
| auto_load_pretrained=False | |
| ) | |
| checkpoint = torch.load(model_path, map_location=device) | |
| # Load only encoder weights (skip mismatched head) | |
| if 'model_state_dict' in checkpoint: | |
| state_dict = checkpoint['model_state_dict'] | |
| encoder_state = {k: v for k, v in state_dict.items() | |
| if 'audio_encoder' in k or 'video_encoder' in k} | |
| model.load_state_dict(encoder_state, strict=False) | |
| epoch = checkpoint.get('epoch', '?') | |
| print(f"β Loaded trained FCN model (epoch {epoch})") | |
| else: | |
| model.load_state_dict(checkpoint, strict=False) | |
| print(f"β Loaded model weights") | |
| elif os.path.exists(model_path): | |
| # Load original SyncNet pretrained model | |
| model = StreamSyncFCN( | |
| pretrained_syncnet_path=model_path, | |
| auto_load_pretrained=True | |
| ) | |
| print(f"β Loaded pretrained SyncNet from: {model_path}") | |
| else: | |
| print(f"β Model not found: {model_path}") | |
| print(" Using random initialization (results may be unreliable)") | |
| model = StreamSyncFCN( | |
| pretrained_syncnet_path=None, | |
| auto_load_pretrained=False | |
| ) | |
| model.eval() | |
| return model.to(device) | |
| def load_original_syncnet(model_path='data/syncnet_v2.model', device='cpu'): | |
| """Load the original SyncNet model for comparison.""" | |
| from SyncNetInstance import SyncNetInstance | |
| model = SyncNetInstance() | |
| model.loadParameters(model_path) | |
| print(f"β Loaded Original SyncNet from: {model_path}") | |
| return model | |
| def run_original_syncnet(model, video_path, verbose=False): | |
| """ | |
| Run original SyncNet on a video file. | |
| Returns: | |
| dict with offset_frames, offset_seconds, confidence, processing_time | |
| """ | |
| import argparse | |
| # Create required options object | |
| opt = argparse.Namespace() | |
| opt.tmp_dir = 'data/work/pytmp' | |
| opt.reference = 'original_test' | |
| opt.batch_size = 20 | |
| opt.vshift = 15 | |
| start_time = time.time() | |
| # Run evaluation | |
| offset, confidence, dist = model.evaluate(opt, video_path) | |
| elapsed = time.time() - start_time | |
| return { | |
| 'offset_frames': offset, | |
| 'offset_seconds': offset / 25.0, | |
| 'confidence': confidence, | |
| 'min_dist': dist, | |
| 'processing_time': elapsed | |
| } | |
| def apply_calibration(raw_offset, calibration_offset=3, calibration_scale=-0.5, reference_raw=-15): | |
| """ | |
| Apply linear calibration to raw model output. | |
| Calibration formula: calibrated = offset + scale * (raw - reference) | |
| Default: calibrated = 3 + (-0.5) * (raw - (-15)) | |
| This corrects for systematic bias in the FCN model's predictions. | |
| """ | |
| return calibration_offset + calibration_scale * (raw_offset - reference_raw) | |
| def detect_sync(video_path=None, hls_url=None, duration=10, model=None, | |
| verbose=False, use_calibration=True): | |
| """ | |
| Detect audio-video sync offset. | |
| Args: | |
| video_path: Path to video file | |
| hls_url: HLS stream URL (.m3u8) | |
| duration: Capture duration for HLS (seconds) | |
| model: Pre-loaded model (optional) | |
| verbose: Print detailed output | |
| use_calibration: Apply calibration correction | |
| Returns: | |
| dict with offset_frames, offset_seconds, confidence, raw_offset | |
| """ | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| # Load model if not provided | |
| if model is None: | |
| model = load_model(device=device) | |
| start_time = time.time() | |
| # Process video or HLS | |
| if video_path: | |
| # Use the same method as detect_sync.py for consistency | |
| if use_calibration: | |
| offset, confidence, raw_offset = model.detect_offset_correlation( | |
| video_path, | |
| calibration_offset=3, | |
| calibration_scale=-0.5, | |
| calibration_baseline=-15, | |
| verbose=verbose | |
| ) | |
| else: | |
| raw_offset, confidence = model.process_video_file( | |
| video_path, | |
| verbose=verbose | |
| ) | |
| offset = raw_offset | |
| elif hls_url: | |
| raw_offset, confidence = model.process_hls_stream( | |
| hls_url, | |
| segment_duration=duration, | |
| verbose=verbose | |
| ) | |
| if use_calibration: | |
| offset = apply_calibration(raw_offset) | |
| else: | |
| offset = raw_offset | |
| else: | |
| raise ValueError("Must provide either video_path or hls_url") | |
| elapsed = time.time() - start_time | |
| return { | |
| 'offset_frames': round(offset), | |
| 'offset_seconds': offset / 25.0, | |
| 'confidence': confidence, | |
| 'raw_offset': raw_offset if 'raw_offset' in dir() else offset, | |
| 'processing_time': elapsed | |
| } | |
| def print_results(result, source_name, model_name="FCN-SyncNet"): | |
| """Print formatted results.""" | |
| offset = result['offset_frames'] | |
| offset_sec = result['offset_seconds'] | |
| confidence = result['confidence'] | |
| elapsed = result['processing_time'] | |
| print() | |
| print("=" * 60) | |
| print(f" {model_name} Detection Result") | |
| print("=" * 60) | |
| print(f" Source: {source_name}") | |
| print(f" Offset: {offset:+d} frames ({offset_sec:+.3f}s)") | |
| print(f" Confidence: {confidence:.6f}") | |
| print(f" Time: {elapsed:.2f}s") | |
| print("=" * 60) | |
| # Interpretation | |
| if offset > 1: | |
| print(f" β Audio is {offset} frames AHEAD of video") | |
| print(f" (delay audio by {abs(offset_sec):.3f}s to fix)") | |
| elif offset < -1: | |
| print(f" β Audio is {abs(offset)} frames BEHIND video") | |
| print(f" (advance audio by {abs(offset_sec):.3f}s to fix)") | |
| else: | |
| print(" β Audio and video are IN SYNC") | |
| print() | |
| def print_comparison(fcn_result, original_result, source_name): | |
| """Print side-by-side comparison of both models.""" | |
| print() | |
| print("β" + "β" * 70 + "β") | |
| print("β" + " Model Comparison Results".center(70) + "β") | |
| print("β" + "β" * 70 + "β") | |
| print() | |
| print(f" Source: {source_name}") | |
| print() | |
| print(" " + "-" * 66) | |
| print(f" {'Metric':<20} {'FCN-SyncNet':>20} {'Original SyncNet':>20}") | |
| print(" " + "-" * 66) | |
| fcn_off = fcn_result['offset_frames'] | |
| orig_off = original_result['offset_frames'] | |
| print(f" {'Offset (frames)':<20} {fcn_off:>+20d} {orig_off:>+20d}") | |
| print(f" {'Offset (seconds)':<20} {fcn_result['offset_seconds']:>+20.3f} {original_result['offset_seconds']:>+20.3f}") | |
| print(f" {'Confidence':<20} {fcn_result['confidence']:>20.4f} {original_result['confidence']:>20.4f}") | |
| print(f" {'Time (seconds)':<20} {fcn_result['processing_time']:>20.2f} {original_result['processing_time']:>20.2f}") | |
| print(" " + "-" * 66) | |
| # Agreement check | |
| diff = abs(fcn_off - orig_off) | |
| if diff == 0: | |
| print(" β Both models AGREE perfectly!") | |
| elif diff <= 2: | |
| print(f" β Models differ by {diff} frame(s) (close agreement)") | |
| else: | |
| print(f" β Models differ by {diff} frames") | |
| print() | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| description='FCN-SyncNet - Audio-Video Sync Detection', | |
| formatter_class=argparse.RawDescriptionHelpFormatter, | |
| epilog=""" | |
| Examples: | |
| Video file: python test_sync_detection.py --video video.mp4 | |
| HLS stream: python test_sync_detection.py --hls http://stream.m3u8 --duration 15 | |
| Compare: python test_sync_detection.py --video video.mp4 --compare | |
| Original: python test_sync_detection.py --video video.mp4 --original | |
| Verbose: python test_sync_detection.py --video video.mp4 --verbose | |
| """ | |
| ) | |
| parser.add_argument('--video', type=str, help='Path to video file') | |
| parser.add_argument('--hls', type=str, help='HLS stream URL (.m3u8)') | |
| parser.add_argument('--model', type=str, default=None, | |
| help='Model checkpoint (default: checkpoints/syncnet_fcn_epoch2.pth)') | |
| parser.add_argument('--duration', type=int, default=10, | |
| help='Duration for HLS capture (seconds, default: 10)') | |
| parser.add_argument('--verbose', '-v', action='store_true', | |
| help='Show detailed processing info') | |
| parser.add_argument('--no-calibration', action='store_true', | |
| help='Disable calibration correction') | |
| parser.add_argument('--json', action='store_true', | |
| help='Output results as JSON') | |
| parser.add_argument('--compare', action='store_true', | |
| help='Compare FCN-SyncNet with Original SyncNet') | |
| parser.add_argument('--original', action='store_true', | |
| help='Use Original SyncNet only (not FCN)') | |
| args = parser.parse_args() | |
| # Validate input | |
| if not args.video and not args.hls: | |
| print("Error: Please provide either --video or --hls") | |
| parser.print_help() | |
| return 1 | |
| # Original SyncNet doesn't support HLS | |
| if args.hls and (args.original or args.compare): | |
| print("Error: Original SyncNet does not support HLS streams") | |
| print(" Use --video for comparison mode") | |
| return 1 | |
| if not args.json: | |
| print() | |
| if args.original: | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| print("β Original SyncNet - Audio-Video Sync Detection β") | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| elif args.compare: | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| print("β Sync Detection - FCN vs Original SyncNet β") | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| else: | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| print("β FCN-SyncNet - Audio-Video Sync Detection β") | |
| print("ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ") | |
| print() | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| if not args.json: | |
| print(f"Device: {device}") | |
| try: | |
| source = os.path.basename(args.video) if args.video else args.hls | |
| # Run Original SyncNet only | |
| if args.original: | |
| original_model = load_original_syncnet() | |
| if not args.json: | |
| print(f"\nProcessing: {args.video}") | |
| result = run_original_syncnet(original_model, args.video, args.verbose) | |
| if args.json: | |
| import json | |
| result['source'] = source | |
| result['model'] = 'original_syncnet' | |
| print(json.dumps(result, indent=2)) | |
| else: | |
| print_results(result, source, "Original SyncNet") | |
| return 0 | |
| # Run comparison mode | |
| if args.compare: | |
| # Load both models | |
| fcn_model = load_model(args.model, device) | |
| original_model = load_original_syncnet() | |
| if not args.json: | |
| print(f"\nProcessing: {args.video}") | |
| print("\n[1/2] Running FCN-SyncNet...") | |
| fcn_result = detect_sync( | |
| video_path=args.video, | |
| model=fcn_model, | |
| verbose=args.verbose, | |
| use_calibration=not args.no_calibration | |
| ) | |
| if not args.json: | |
| print("[2/2] Running Original SyncNet...") | |
| original_result = run_original_syncnet(original_model, args.video, args.verbose) | |
| if args.json: | |
| import json | |
| output = { | |
| 'source': source, | |
| 'fcn_syncnet': fcn_result, | |
| 'original_syncnet': original_result | |
| } | |
| print(json.dumps(output, indent=2)) | |
| else: | |
| print_comparison(fcn_result, original_result, source) | |
| return 0 | |
| # Default: FCN-SyncNet only | |
| model = load_model(args.model, device) | |
| if args.video: | |
| if not args.json: | |
| print(f"\nProcessing: {args.video}") | |
| result = detect_sync( | |
| video_path=args.video, | |
| model=model, | |
| verbose=args.verbose, | |
| use_calibration=not args.no_calibration | |
| ) | |
| else: # HLS | |
| if not args.json: | |
| print(f"\nProcessing HLS: {args.hls}") | |
| print(f"Capturing {args.duration} seconds...") | |
| result = detect_sync( | |
| hls_url=args.hls, | |
| duration=args.duration, | |
| model=model, | |
| verbose=args.verbose, | |
| use_calibration=not args.no_calibration | |
| ) | |
| source = args.hls | |
| # Output results | |
| if args.json: | |
| import json | |
| result['source'] = source | |
| print(json.dumps(result, indent=2)) | |
| else: | |
| print_results(result, source) | |
| return 0 | |
| except FileNotFoundError: | |
| print(f"\nβ Error: File not found - {args.video or args.hls}") | |
| return 1 | |
| except Exception as e: | |
| print(f"\nβ Error: {e}") | |
| if args.verbose: | |
| import traceback | |
| traceback.print_exc() | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |