Syncnet_FCN / test_sync_detection.py
Shubham
Deploy clean version
579f772
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
Stream/Video Sync Detection with FCN-SyncNet
Detect audio-video sync offset in video files or live HLS streams.
Uses trained FCN model (epoch 2) with calibration for accurate results.
Usage:
# Video file
python test_sync_detection.py --video path/to/video.mp4
# HLS stream
python test_sync_detection.py --hls http://example.com/stream.m3u8 --duration 15
# Compare FCN with Original SyncNet
python test_sync_detection.py --video video.mp4 --compare
# Original SyncNet only
python test_sync_detection.py --video video.mp4 --original
# With verbose output
python test_sync_detection.py --video video.mp4 --verbose
# Custom model
python test_sync_detection.py --video video.mp4 --model checkpoints/custom.pth
"""
import os
import sys
import argparse
import torch
import time
# Enable UTF-8 output on Windows
if sys.platform == 'win32':
import io
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
def load_model(model_path=None, device='cpu'):
"""Load the FCN-SyncNet model with trained weights."""
from SyncNetModel_FCN import StreamSyncFCN
# Default to our best trained model
if model_path is None:
model_path = 'checkpoints/syncnet_fcn_epoch2.pth'
# Check if it's a checkpoint file (.pth) or original syncnet model
if model_path.endswith('.pth') and os.path.exists(model_path):
# Load our trained FCN checkpoint
model = StreamSyncFCN(
max_offset=15,
pretrained_syncnet_path=None,
auto_load_pretrained=False
)
checkpoint = torch.load(model_path, map_location=device)
# Load only encoder weights (skip mismatched head)
if 'model_state_dict' in checkpoint:
state_dict = checkpoint['model_state_dict']
encoder_state = {k: v for k, v in state_dict.items()
if 'audio_encoder' in k or 'video_encoder' in k}
model.load_state_dict(encoder_state, strict=False)
epoch = checkpoint.get('epoch', '?')
print(f"βœ“ Loaded trained FCN model (epoch {epoch})")
else:
model.load_state_dict(checkpoint, strict=False)
print(f"βœ“ Loaded model weights")
elif os.path.exists(model_path):
# Load original SyncNet pretrained model
model = StreamSyncFCN(
pretrained_syncnet_path=model_path,
auto_load_pretrained=True
)
print(f"βœ“ Loaded pretrained SyncNet from: {model_path}")
else:
print(f"⚠ Model not found: {model_path}")
print(" Using random initialization (results may be unreliable)")
model = StreamSyncFCN(
pretrained_syncnet_path=None,
auto_load_pretrained=False
)
model.eval()
return model.to(device)
def load_original_syncnet(model_path='data/syncnet_v2.model', device='cpu'):
"""Load the original SyncNet model for comparison."""
from SyncNetInstance import SyncNetInstance
model = SyncNetInstance()
model.loadParameters(model_path)
print(f"βœ“ Loaded Original SyncNet from: {model_path}")
return model
def run_original_syncnet(model, video_path, verbose=False):
"""
Run original SyncNet on a video file.
Returns:
dict with offset_frames, offset_seconds, confidence, processing_time
"""
import argparse
# Create required options object
opt = argparse.Namespace()
opt.tmp_dir = 'data/work/pytmp'
opt.reference = 'original_test'
opt.batch_size = 20
opt.vshift = 15
start_time = time.time()
# Run evaluation
offset, confidence, dist = model.evaluate(opt, video_path)
elapsed = time.time() - start_time
return {
'offset_frames': offset,
'offset_seconds': offset / 25.0,
'confidence': confidence,
'min_dist': dist,
'processing_time': elapsed
}
def apply_calibration(raw_offset, calibration_offset=3, calibration_scale=-0.5, reference_raw=-15):
"""
Apply linear calibration to raw model output.
Calibration formula: calibrated = offset + scale * (raw - reference)
Default: calibrated = 3 + (-0.5) * (raw - (-15))
This corrects for systematic bias in the FCN model's predictions.
"""
return calibration_offset + calibration_scale * (raw_offset - reference_raw)
def detect_sync(video_path=None, hls_url=None, duration=10, model=None,
verbose=False, use_calibration=True):
"""
Detect audio-video sync offset.
Args:
video_path: Path to video file
hls_url: HLS stream URL (.m3u8)
duration: Capture duration for HLS (seconds)
model: Pre-loaded model (optional)
verbose: Print detailed output
use_calibration: Apply calibration correction
Returns:
dict with offset_frames, offset_seconds, confidence, raw_offset
"""
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# Load model if not provided
if model is None:
model = load_model(device=device)
start_time = time.time()
# Process video or HLS
if video_path:
# Use the same method as detect_sync.py for consistency
if use_calibration:
offset, confidence, raw_offset = model.detect_offset_correlation(
video_path,
calibration_offset=3,
calibration_scale=-0.5,
calibration_baseline=-15,
verbose=verbose
)
else:
raw_offset, confidence = model.process_video_file(
video_path,
verbose=verbose
)
offset = raw_offset
elif hls_url:
raw_offset, confidence = model.process_hls_stream(
hls_url,
segment_duration=duration,
verbose=verbose
)
if use_calibration:
offset = apply_calibration(raw_offset)
else:
offset = raw_offset
else:
raise ValueError("Must provide either video_path or hls_url")
elapsed = time.time() - start_time
return {
'offset_frames': round(offset),
'offset_seconds': offset / 25.0,
'confidence': confidence,
'raw_offset': raw_offset if 'raw_offset' in dir() else offset,
'processing_time': elapsed
}
def print_results(result, source_name, model_name="FCN-SyncNet"):
"""Print formatted results."""
offset = result['offset_frames']
offset_sec = result['offset_seconds']
confidence = result['confidence']
elapsed = result['processing_time']
print()
print("=" * 60)
print(f" {model_name} Detection Result")
print("=" * 60)
print(f" Source: {source_name}")
print(f" Offset: {offset:+d} frames ({offset_sec:+.3f}s)")
print(f" Confidence: {confidence:.6f}")
print(f" Time: {elapsed:.2f}s")
print("=" * 60)
# Interpretation
if offset > 1:
print(f" β†’ Audio is {offset} frames AHEAD of video")
print(f" (delay audio by {abs(offset_sec):.3f}s to fix)")
elif offset < -1:
print(f" β†’ Audio is {abs(offset)} frames BEHIND video")
print(f" (advance audio by {abs(offset_sec):.3f}s to fix)")
else:
print(" βœ“ Audio and video are IN SYNC")
print()
def print_comparison(fcn_result, original_result, source_name):
"""Print side-by-side comparison of both models."""
print()
print("β•”" + "═" * 70 + "β•—")
print("β•‘" + " Model Comparison Results".center(70) + "β•‘")
print("β•š" + "═" * 70 + "╝")
print()
print(f" Source: {source_name}")
print()
print(" " + "-" * 66)
print(f" {'Metric':<20} {'FCN-SyncNet':>20} {'Original SyncNet':>20}")
print(" " + "-" * 66)
fcn_off = fcn_result['offset_frames']
orig_off = original_result['offset_frames']
print(f" {'Offset (frames)':<20} {fcn_off:>+20d} {orig_off:>+20d}")
print(f" {'Offset (seconds)':<20} {fcn_result['offset_seconds']:>+20.3f} {original_result['offset_seconds']:>+20.3f}")
print(f" {'Confidence':<20} {fcn_result['confidence']:>20.4f} {original_result['confidence']:>20.4f}")
print(f" {'Time (seconds)':<20} {fcn_result['processing_time']:>20.2f} {original_result['processing_time']:>20.2f}")
print(" " + "-" * 66)
# Agreement check
diff = abs(fcn_off - orig_off)
if diff == 0:
print(" βœ“ Both models AGREE perfectly!")
elif diff <= 2:
print(f" β‰ˆ Models differ by {diff} frame(s) (close agreement)")
else:
print(f" βœ— Models differ by {diff} frames")
print()
def main():
parser = argparse.ArgumentParser(
description='FCN-SyncNet - Audio-Video Sync Detection',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
Video file: python test_sync_detection.py --video video.mp4
HLS stream: python test_sync_detection.py --hls http://stream.m3u8 --duration 15
Compare: python test_sync_detection.py --video video.mp4 --compare
Original: python test_sync_detection.py --video video.mp4 --original
Verbose: python test_sync_detection.py --video video.mp4 --verbose
"""
)
parser.add_argument('--video', type=str, help='Path to video file')
parser.add_argument('--hls', type=str, help='HLS stream URL (.m3u8)')
parser.add_argument('--model', type=str, default=None,
help='Model checkpoint (default: checkpoints/syncnet_fcn_epoch2.pth)')
parser.add_argument('--duration', type=int, default=10,
help='Duration for HLS capture (seconds, default: 10)')
parser.add_argument('--verbose', '-v', action='store_true',
help='Show detailed processing info')
parser.add_argument('--no-calibration', action='store_true',
help='Disable calibration correction')
parser.add_argument('--json', action='store_true',
help='Output results as JSON')
parser.add_argument('--compare', action='store_true',
help='Compare FCN-SyncNet with Original SyncNet')
parser.add_argument('--original', action='store_true',
help='Use Original SyncNet only (not FCN)')
args = parser.parse_args()
# Validate input
if not args.video and not args.hls:
print("Error: Please provide either --video or --hls")
parser.print_help()
return 1
# Original SyncNet doesn't support HLS
if args.hls and (args.original or args.compare):
print("Error: Original SyncNet does not support HLS streams")
print(" Use --video for comparison mode")
return 1
if not args.json:
print()
if args.original:
print("╔══════════════════════════════════════════════════════════════╗")
print("β•‘ Original SyncNet - Audio-Video Sync Detection β•‘")
print("β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•")
elif args.compare:
print("╔══════════════════════════════════════════════════════════════╗")
print("β•‘ Sync Detection - FCN vs Original SyncNet β•‘")
print("β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•")
else:
print("╔══════════════════════════════════════════════════════════════╗")
print("β•‘ FCN-SyncNet - Audio-Video Sync Detection β•‘")
print("β•šβ•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•β•")
print()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
if not args.json:
print(f"Device: {device}")
try:
source = os.path.basename(args.video) if args.video else args.hls
# Run Original SyncNet only
if args.original:
original_model = load_original_syncnet()
if not args.json:
print(f"\nProcessing: {args.video}")
result = run_original_syncnet(original_model, args.video, args.verbose)
if args.json:
import json
result['source'] = source
result['model'] = 'original_syncnet'
print(json.dumps(result, indent=2))
else:
print_results(result, source, "Original SyncNet")
return 0
# Run comparison mode
if args.compare:
# Load both models
fcn_model = load_model(args.model, device)
original_model = load_original_syncnet()
if not args.json:
print(f"\nProcessing: {args.video}")
print("\n[1/2] Running FCN-SyncNet...")
fcn_result = detect_sync(
video_path=args.video,
model=fcn_model,
verbose=args.verbose,
use_calibration=not args.no_calibration
)
if not args.json:
print("[2/2] Running Original SyncNet...")
original_result = run_original_syncnet(original_model, args.video, args.verbose)
if args.json:
import json
output = {
'source': source,
'fcn_syncnet': fcn_result,
'original_syncnet': original_result
}
print(json.dumps(output, indent=2))
else:
print_comparison(fcn_result, original_result, source)
return 0
# Default: FCN-SyncNet only
model = load_model(args.model, device)
if args.video:
if not args.json:
print(f"\nProcessing: {args.video}")
result = detect_sync(
video_path=args.video,
model=model,
verbose=args.verbose,
use_calibration=not args.no_calibration
)
else: # HLS
if not args.json:
print(f"\nProcessing HLS: {args.hls}")
print(f"Capturing {args.duration} seconds...")
result = detect_sync(
hls_url=args.hls,
duration=args.duration,
model=model,
verbose=args.verbose,
use_calibration=not args.no_calibration
)
source = args.hls
# Output results
if args.json:
import json
result['source'] = source
print(json.dumps(result, indent=2))
else:
print_results(result, source)
return 0
except FileNotFoundError:
print(f"\nβœ— Error: File not found - {args.video or args.hls}")
return 1
except Exception as e:
print(f"\nβœ— Error: {e}")
if args.verbose:
import traceback
traceback.print_exc()
return 1
if __name__ == "__main__":
sys.exit(main())