| """ |
| Vehicle Detection, Tracking, Counting, and Speed Estimation System |
| =================================================================== |
| |
| A comprehensive computer vision pipeline for analyzing traffic videos, |
| detecting vehicles, tracking their movement, counting them, and estimating |
| their speeds using YOLO object detection and perspective transformation. |
| |
| Authors: |
| - Abhay Gupta (0205CC221005) |
| - Aditi Lakhera (0205CC221011) |
| - Balraj Patel (0205CC221049) |
| - Bhumika Patel (0205CC221050) |
| |
| Technical Approach: |
| - YOLO for real-time object detection |
| - ByteTrack for multi-object tracking |
| - Perspective transformation for speed calculation |
| - Line zones for vehicle counting |
| """ |
|
|
| import sys |
| import logging |
| from pathlib import Path |
| from typing import Dict, Optional, Callable |
| from time import time |
|
|
| import cv2 |
| import numpy as np |
| import supervision as sv |
| from ultralytics import YOLO |
|
|
| from src import FrameAnnotator, VehicleSpeedEstimator, PerspectiveTransformer |
| from src.exceptions import ( |
| VideoProcessingError, |
| ModelLoadError, |
| ConfigurationError |
| ) |
| from config import VehicleDetectionConfig |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| class VehicleDetectionPipeline: |
| """ |
| Main pipeline for vehicle detection, tracking, counting, and speed estimation. |
| |
| This class orchestrates the entire processing workflow, from loading the model |
| to processing each frame and generating the output video. |
| """ |
| |
| def __init__(self, config: VehicleDetectionConfig): |
| """ |
| Initialize the detection pipeline. |
| |
| Args: |
| config: Configuration object with all parameters |
| |
| Raises: |
| ModelLoadError: If model cannot be loaded |
| ConfigurationError: If configuration is invalid |
| """ |
| self.config = config |
| self.model = None |
| self.tracker = None |
| self.line_zone = None |
| self.speed_estimator = None |
| self.annotator = None |
| self.video_info = None |
| |
| logger.info(f"Initializing pipeline with config: {config}") |
| self._initialize_components() |
| |
| def _initialize_components(self) -> None: |
| """Initialize all pipeline components.""" |
| try: |
| |
| logger.info(f"Loading YOLO model: {self.config.model_path}") |
| self.model = YOLO(self.config.model_path) |
| self.model.conf = self.config.confidence_threshold |
| self.model.iou = self.config.iou_threshold |
| logger.info("Model loaded successfully") |
| |
| except Exception as e: |
| logger.error(f"Failed to load model: {e}") |
| raise ModelLoadError(f"Could not load YOLO model from {self.config.model_path}: {e}") |
| |
| def _setup_video_components(self, video_path: str) -> None: |
| """ |
| Set up video-specific components. |
| |
| Args: |
| video_path: Path to input video |
| |
| Raises: |
| VideoProcessingError: If video cannot be opened |
| """ |
| try: |
| |
| self.video_info = sv.VideoInfo.from_video_path(video_path) |
| logger.info(f"Video info: {self.video_info.width}x{self.video_info.height} @ {self.video_info.fps}fps") |
| |
| |
| self.tracker = sv.ByteTrack( |
| frame_rate=self.video_info.fps, |
| track_activation_threshold=self.config.confidence_threshold |
| ) |
| logger.info("Tracker initialized") |
| |
| |
| line_start = sv.Point( |
| x=self.config.line_offset, |
| y=self.config.line_y |
| ) |
| line_end = sv.Point( |
| x=self.video_info.width - self.config.line_offset, |
| y=self.config.line_y |
| ) |
| |
| self.line_zone = sv.LineZone( |
| start=line_start, |
| end=line_end, |
| triggering_anchors=(sv.Position.BOTTOM_CENTER,) |
| ) |
| logger.info(f"Line zone created at y={self.config.line_y}") |
| |
| |
| source_pts = np.array(self.config.source_points, dtype=np.float32) |
| target_pts = np.array(self.config.target_points, dtype=np.float32) |
| |
| transformer = PerspectiveTransformer( |
| source_points=source_pts, |
| target_points=target_pts |
| ) |
| logger.info("Perspective transformer initialized") |
| |
| |
| self.speed_estimator = VehicleSpeedEstimator( |
| fps=self.video_info.fps, |
| transformer=transformer, |
| history_duration=self.config.speed_history_seconds, |
| speed_unit=self.config.speed_unit |
| ) |
| logger.info("Speed estimator initialized") |
| |
| |
| self.annotator = FrameAnnotator( |
| video_resolution=(self.video_info.width, self.video_info.height), |
| show_boxes=self.config.enable_boxes, |
| show_labels=self.config.enable_labels, |
| show_traces=self.config.enable_traces, |
| show_line_zones=self.config.enable_line_zones, |
| trace_length=self.config.trace_length, |
| zone_polygon=source_pts |
| ) |
| logger.info("Frame annotator initialized") |
| |
| except Exception as e: |
| logger.error(f"Failed to setup video components: {e}") |
| raise VideoProcessingError(f"Error setting up video processing: {e}") |
| |
| def _process_single_frame(self, frame: np.ndarray) -> tuple: |
| """ |
| Process a single video frame. |
| |
| Args: |
| frame: Input video frame |
| |
| Returns: |
| Tuple of (annotated_frame, detections) |
| """ |
| |
| results = self.model(frame, verbose=False)[0] |
| detections = sv.Detections.from_ultralytics(results) |
| |
| |
| detections = self.tracker.update_with_detections(detections) |
| |
| |
| self.line_zone.trigger(detections) |
| |
| |
| detections = self.speed_estimator.estimate(detections) |
| |
| |
| labels = self._create_labels(detections) |
| |
| |
| annotated_frame = self.annotator.draw_annotations( |
| frame=frame, |
| detections=detections, |
| labels=labels, |
| line_zones=[self.line_zone] |
| ) |
| |
| return annotated_frame, detections |
| |
| def _create_labels(self, detections: sv.Detections) -> list: |
| """ |
| Create display labels for detected vehicles. |
| |
| Args: |
| detections: Detection results |
| |
| Returns: |
| List of label strings |
| """ |
| labels = [] |
| |
| if not hasattr(detections, 'tracker_id') or detections.tracker_id is None: |
| return labels |
| |
| for idx, tracker_id in enumerate(detections.tracker_id): |
| |
| class_name = "Vehicle" |
| if "class_name" in detections.data: |
| class_name = detections.data["class_name"][idx] |
| |
| |
| speed_text = "" |
| if "speed" in detections.data: |
| speed = detections.data["speed"][idx] |
| if speed > 0: |
| speed_text = f" {speed:.0f}{self.config.speed_unit}" |
| |
| |
| label = f"{class_name} #{tracker_id}{speed_text}" |
| labels.append(label) |
| |
| return labels |
| |
| def process_video( |
| self, |
| progress_callback: Optional[Callable[[float], None]] = None |
| ) -> Dict: |
| """ |
| Process the entire video. |
| |
| Args: |
| progress_callback: Optional callback for progress updates |
| |
| Returns: |
| Dictionary with processing statistics |
| |
| Raises: |
| VideoProcessingError: If video processing fails |
| """ |
| start_time = time() |
| |
| try: |
| |
| if not Path(self.config.input_video).exists(): |
| raise VideoProcessingError(f"Input video not found: {self.config.input_video}") |
| |
| |
| self._setup_video_components(self.config.input_video) |
| |
| |
| output_path = Path(self.config.output_video) |
| output_path.parent.mkdir(parents=True, exist_ok=True) |
| |
| |
| frame_count = 0 |
| total_frames = self.video_info.total_frames or 0 |
| all_speeds = [] |
| |
| |
| if self.config.display_enabled: |
| try: |
| cv2.namedWindow(self.config.window_name, cv2.WINDOW_NORMAL) |
| cv2.resizeWindow( |
| self.config.window_name, |
| self.video_info.width, |
| self.video_info.height |
| ) |
| except Exception as e: |
| logger.warning(f"Could not create display window (headless environment?): {e}") |
| self.config.display_enabled = False |
| |
| |
| logger.info("Starting video processing...") |
| frame_generator = sv.get_video_frames_generator(self.config.input_video) |
| |
| with sv.VideoSink(self.config.output_video, self.video_info) as sink: |
| for frame in frame_generator: |
| try: |
| |
| annotated_frame, detections = self._process_single_frame(frame) |
| |
| |
| if "speed" in detections.data: |
| speeds = detections.data["speed"] |
| all_speeds.extend([s for s in speeds if s > 0]) |
| |
| |
| sink.write_frame(annotated_frame) |
| |
| |
| if self.config.display_enabled: |
| cv2.imshow(self.config.window_name, annotated_frame) |
| |
| |
| if cv2.waitKey(1) & 0xFF == ord('q'): |
| logger.info("Processing interrupted by user") |
| break |
| |
| |
| if cv2.getWindowProperty( |
| self.config.window_name, |
| cv2.WND_PROP_VISIBLE |
| ) < 1: |
| logger.info("Window closed by user") |
| break |
| |
| |
| frame_count += 1 |
| if progress_callback and total_frames > 0: |
| progress = frame_count / total_frames |
| progress_callback(progress) |
| |
| except Exception as e: |
| logger.warning(f"Error processing frame {frame_count}: {e}") |
| continue |
| |
| |
| if self.config.display_enabled: |
| cv2.destroyAllWindows() |
| |
| |
| processing_time = time() - start_time |
| stats = { |
| 'total_count': self.line_zone.in_count + self.line_zone.out_count, |
| 'in_count': self.line_zone.in_count, |
| 'out_count': self.line_zone.out_count, |
| 'avg_speed': np.mean(all_speeds) if all_speeds else 0.0, |
| 'max_speed': np.max(all_speeds) if all_speeds else 0.0, |
| 'min_speed': np.min(all_speeds) if all_speeds else 0.0, |
| 'frames_processed': frame_count, |
| 'processing_time': processing_time, |
| 'fps': frame_count / processing_time if processing_time > 0 else 0 |
| } |
| |
| logger.info(f"Processing complete: {frame_count} frames in {processing_time:.2f}s") |
| logger.info(f"Vehicles counted: {stats['total_count']} (In: {stats['in_count']}, Out: {stats['out_count']})") |
| |
| return stats |
| |
| except Exception as e: |
| logger.error(f"Video processing failed: {e}", exc_info=True) |
| raise VideoProcessingError(f"Failed to process video: {e}") |
|
|
|
|
| def process_video( |
| config: VehicleDetectionConfig, |
| progress_callback: Optional[Callable[[float], None]] = None |
| ) -> Dict: |
| """ |
| Convenience function to process a video with given configuration. |
| |
| Args: |
| config: Configuration object |
| progress_callback: Optional progress callback |
| |
| Returns: |
| Processing statistics dictionary |
| """ |
| pipeline = VehicleDetectionPipeline(config) |
| return pipeline.process_video(progress_callback) |
|
|
|
|
| def main(): |
| """Main entry point for CLI usage.""" |
| try: |
| logger.info("=" * 60) |
| logger.info("Vehicle Speed Estimation & Counting System") |
| logger.info("=" * 60) |
| |
| |
| config = VehicleDetectionConfig() |
| logger.info(f"Configuration: {config}") |
| |
| |
| stats = process_video(config) |
| |
| |
| print("\n" + "=" * 60) |
| print("PROCESSING RESULTS") |
| print("=" * 60) |
| print(f"Output saved to: {config.output_video}") |
| print(f"\nVehicle Count:") |
| print(f" Total: {stats['total_count']}") |
| print(f" In: {stats['in_count']}") |
| print(f" Out: {stats['out_count']}") |
| print(f"\nSpeed Statistics ({config.speed_unit}):") |
| print(f" Average: {stats['avg_speed']:.1f}") |
| print(f" Maximum: {stats['max_speed']:.1f}") |
| print(f" Minimum: {stats['min_speed']:.1f}") |
| print(f"\nProcessing Info:") |
| print(f" Frames: {stats['frames_processed']}") |
| print(f" Time: {stats['processing_time']:.2f}s") |
| print(f" FPS: {stats['fps']:.1f}") |
| print("=" * 60) |
| |
| return 0 |
| |
| except KeyboardInterrupt: |
| logger.info("Processing interrupted by user") |
| return 1 |
| except Exception as e: |
| logger.error(f"Fatal error: {e}", exc_info=True) |
| print(f"\n❌ Error: {e}", file=sys.stderr) |
| return 1 |
|
|
|
|
| if __name__ == "__main__": |
| sys.exit(main()) |
|
|