Spaces:
Runtime error
Runtime error
| # flake8: noqa: E501 | |
| # Copyright (c) 2025 ByteDance Ltd. and/or its affiliates | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| """ | |
| Model backend service for Depth Anything 3. | |
| Provides HTTP API for model inference with persistent model loading. | |
| """ | |
| import gc | |
| import os | |
| import posixpath | |
| import time | |
| import uuid | |
| from concurrent.futures import ThreadPoolExecutor | |
| from typing import Any, Dict, List, Optional | |
| from urllib.parse import quote | |
| import numpy as np | |
| import torch | |
| import uvicorn | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.responses import FileResponse, HTMLResponse | |
| from pydantic import BaseModel | |
| from ..api import DepthAnything3 | |
| class InferenceRequest(BaseModel): | |
| """Request model for inference API.""" | |
| image_paths: List[str] | |
| export_dir: Optional[str] = None | |
| export_format: str = "mini_npz-glb" | |
| extrinsics: Optional[List[List[List[float]]]] = None | |
| intrinsics: Optional[List[List[List[float]]]] = None | |
| process_res: int = 504 | |
| process_res_method: str = "upper_bound_resize" | |
| export_feat_layers: List[int] = [] | |
| align_to_input_ext_scale: bool = True | |
| # GLB export parameters | |
| conf_thresh_percentile: float = 40.0 | |
| num_max_points: int = 1_000_000 | |
| show_cameras: bool = True | |
| # Feat_vis export parameters | |
| feat_vis_fps: int = 15 | |
| class InferenceResponse(BaseModel): | |
| """Response model for inference API.""" | |
| success: bool | |
| message: str | |
| task_id: Optional[str] = None | |
| export_dir: Optional[str] = None | |
| export_format: str = "mini_npz-glb" | |
| processing_time: Optional[float] = None | |
| class TaskStatus(BaseModel): | |
| """Task status model.""" | |
| task_id: str | |
| status: str # "pending", "running", "completed", "failed" | |
| message: str | |
| progress: Optional[float] = None # 0.0 to 1.0 | |
| created_at: float | |
| started_at: Optional[float] = None | |
| completed_at: Optional[float] = None | |
| export_dir: Optional[str] = None | |
| request: Optional[InferenceRequest] = None # Store the original request | |
| # Essential task parameters | |
| num_images: Optional[int] = None # Number of input images | |
| export_format: Optional[str] = None # Export format | |
| process_res_method: Optional[str] = None # Processing resolution method | |
| video_path: Optional[str] = None # Source video path | |
| class ModelBackend: | |
| """Model backend service with persistent model loading.""" | |
| def __init__(self, model_dir: str, device: str = "cuda"): | |
| self.model_dir = model_dir | |
| self.device = device | |
| self.model = None | |
| self.model_loaded = False | |
| self.load_time = None | |
| self.load_start_time = None # Time when model loading started | |
| self.load_completed_time = None # Time when model loading completed | |
| self.last_used = None | |
| def load_model(self): | |
| """Load model if not already loaded.""" | |
| if self.model_loaded and self.model is not None: | |
| self.last_used = time.time() | |
| return self.model | |
| try: | |
| print(f"Loading model from {self.model_dir}...") | |
| self.load_start_time = time.time() | |
| start_time = time.time() | |
| self.model = DepthAnything3.from_pretrained(self.model_dir).to(self.device) | |
| self.model.eval() | |
| self.model_loaded = True | |
| self.load_time = time.time() - start_time | |
| self.load_completed_time = time.time() | |
| self.last_used = time.time() | |
| print(f"Model loaded successfully in {self.load_time:.2f}s") | |
| return self.model | |
| except Exception as e: | |
| print(f"Failed to load model: {e}") | |
| raise e | |
| def get_model(self): | |
| """Get model, loading if necessary.""" | |
| if not self.model_loaded: | |
| return self.load_model() | |
| self.last_used = time.time() | |
| return self.model | |
| def get_status(self) -> Dict[str, Any]: | |
| """Get backend status information.""" | |
| # Calculate uptime from when model loading completed | |
| uptime = 0 | |
| if self.model_loaded and self.load_completed_time: | |
| uptime = time.time() - self.load_completed_time | |
| return { | |
| "model_loaded": self.model_loaded, | |
| "model_dir": self.model_dir, | |
| "device": self.device, | |
| "load_time": self.load_time, | |
| "last_used": self.last_used, | |
| "uptime": uptime, | |
| } | |
| # Global backend instance | |
| _backend: Optional[ModelBackend] = None | |
| _app: Optional[FastAPI] = None | |
| _tasks: Dict[str, TaskStatus] = {} | |
| _executor = ThreadPoolExecutor(max_workers=1) # Restrict to single-task execution | |
| _running_task_id: Optional[str] = None # Currently running task ID | |
| _task_queue: List[str] = [] # Pending task queue | |
| # Task cleanup configuration | |
| MAX_TASK_HISTORY = 100 # Maximum number of tasks to keep in memory | |
| CLEANUP_INTERVAL = 300 # Cleanup interval in seconds (5 minutes) | |
| def _process_next_task(): | |
| """Process the next task in the queue.""" | |
| global _task_queue, _running_task_id | |
| if not _task_queue or _running_task_id is not None: | |
| return | |
| # Get next task from queue | |
| task_id = _task_queue.pop(0) | |
| # Get task request from tasks dict (we need to store the request) | |
| if task_id not in _tasks: | |
| return | |
| # Submit task to executor | |
| _executor.submit(_run_inference_task, task_id) | |
| def _get_gpu_memory_info(): | |
| """Get current GPU memory usage information.""" | |
| if not torch.cuda.is_available(): | |
| return None | |
| try: | |
| device = torch.cuda.current_device() | |
| total_memory = torch.cuda.get_device_properties(device).total_memory | |
| allocated_memory = torch.cuda.memory_allocated(device) | |
| reserved_memory = torch.cuda.memory_reserved(device) | |
| free_memory = total_memory - reserved_memory | |
| return { | |
| "total_gb": total_memory / 1024**3, | |
| "allocated_gb": allocated_memory / 1024**3, | |
| "reserved_gb": reserved_memory / 1024**3, | |
| "free_gb": free_memory / 1024**3, | |
| "utilization": (reserved_memory / total_memory) * 100, | |
| } | |
| except Exception as e: | |
| print(f"Warning: Failed to get GPU memory info: {e}") | |
| return None | |
| def _cleanup_cuda_memory(): | |
| """Helper function to perform comprehensive CUDA memory cleanup.""" | |
| try: | |
| if torch.cuda.is_available(): | |
| # Log memory before cleanup | |
| mem_before = _get_gpu_memory_info() | |
| torch.cuda.synchronize() | |
| torch.cuda.empty_cache() | |
| torch.cuda.ipc_collect() | |
| gc.collect() | |
| # Log memory after cleanup | |
| mem_after = _get_gpu_memory_info() | |
| if mem_before and mem_after: | |
| freed = mem_before["reserved_gb"] - mem_after["reserved_gb"] | |
| print( | |
| f"CUDA cleanup: freed {freed:.2f}GB, " | |
| f"available: {mem_after['free_gb']:.2f}GB/{mem_after['total_gb']:.2f}GB" | |
| ) | |
| else: | |
| print("CUDA memory cleanup completed") | |
| except Exception as e: | |
| print(f"Warning: CUDA cleanup failed: {e}") | |
| def _check_memory_availability(required_gb: float = 2.0) -> tuple[bool, str]: | |
| """ | |
| Check if there's enough GPU memory available. | |
| Args: | |
| required_gb: Minimum required memory in GB | |
| Returns: | |
| Tuple of (is_available, message) | |
| """ | |
| if not torch.cuda.is_available(): | |
| return False, "CUDA is not available" | |
| try: | |
| mem_info = _get_gpu_memory_info() | |
| if mem_info is None: | |
| return True, "Cannot check memory, proceeding anyway" | |
| if mem_info["free_gb"] < required_gb: | |
| return False, ( | |
| f"Insufficient GPU memory: {mem_info['free_gb']:.2f}GB available, " | |
| f"{required_gb:.2f}GB required. " | |
| f"Total: {mem_info['total_gb']:.2f}GB, " | |
| f"Used: {mem_info['reserved_gb']:.2f}GB ({mem_info['utilization']:.1f}%)" | |
| ) | |
| return True, ( | |
| f"Memory check passed: {mem_info['free_gb']:.2f}GB available, " | |
| f"{required_gb:.2f}GB required" | |
| ) | |
| except Exception as e: | |
| return True, f"Memory check failed: {e}, proceeding anyway" | |
| def _estimate_memory_requirement(num_images: int, process_res: int) -> float: | |
| """ | |
| Estimate GPU memory requirement in GB. | |
| Args: | |
| num_images: Number of images to process | |
| process_res: Processing resolution | |
| Returns: | |
| Estimated memory requirement in GB | |
| """ | |
| # Rough estimation: base model (2GB) + per-image overhead | |
| base_memory = 2.0 | |
| per_image_memory = (process_res / 504) ** 2 * 0.5 # Scale with resolution | |
| total_memory = base_memory + ( | |
| num_images * per_image_memory * 0.1 | |
| ) # Batch processing reduces per-image cost | |
| return total_memory | |
| def _run_inference_task(task_id: str): | |
| """Run inference task in background thread with OOM protection.""" | |
| global _tasks, _backend, _running_task_id, _task_queue | |
| model = None | |
| inference_started = False | |
| start_time = time.time() | |
| try: | |
| # Get task request | |
| if task_id not in _tasks or _tasks[task_id].request is None: | |
| print(f"[{task_id}] Task not found or request missing") | |
| return | |
| request = _tasks[task_id].request | |
| num_images = len(request.image_paths) | |
| # Set current running task | |
| _running_task_id = task_id | |
| # Update task status to running | |
| _tasks[task_id].status = "running" | |
| _tasks[task_id].started_at = start_time | |
| _tasks[task_id].message = f"[{task_id}] Starting inference on {num_images} frames..." | |
| print(f"[{task_id}] Starting inference on {num_images} frames") | |
| # Pre-inference cleanup to ensure maximum available memory | |
| print(f"[{task_id}] Pre-inference cleanup...") | |
| _cleanup_cuda_memory() | |
| # Check memory availability | |
| estimated_memory = _estimate_memory_requirement(num_images, request.process_res) | |
| mem_available, mem_msg = _check_memory_availability(estimated_memory) | |
| print(f"[{task_id}] {mem_msg}") | |
| if not mem_available: | |
| # Try aggressive cleanup | |
| print(f"[{task_id}] Insufficient memory, attempting aggressive cleanup...") | |
| _cleanup_cuda_memory() | |
| time.sleep(0.5) # Give system time to reclaim memory | |
| # Check again | |
| mem_available, mem_msg = _check_memory_availability(estimated_memory) | |
| if not mem_available: | |
| raise RuntimeError( | |
| f"Insufficient GPU memory after cleanup. {mem_msg}\n" | |
| f"Suggestions:\n" | |
| f" 1. Reduce process_res (current: {request.process_res})\n" | |
| f" 2. Process fewer images at once (current: {num_images})\n" | |
| f" 3. Clear other GPU processes" | |
| ) | |
| # Get model (with error handling) | |
| print(f"[{task_id}] Loading model...") | |
| _tasks[task_id].message = f"[{task_id}] Loading model..." | |
| _tasks[task_id].progress = 0.1 | |
| try: | |
| model = _backend.get_model() | |
| except RuntimeError as e: | |
| if "out of memory" in str(e).lower(): | |
| _cleanup_cuda_memory() | |
| raise RuntimeError( | |
| f"OOM during model loading: {str(e)}\n" | |
| f"Try reducing the batch size or resolution." | |
| ) | |
| raise | |
| print(f"[{task_id}] Model loaded successfully") | |
| _tasks[task_id].progress = 0.2 | |
| # Prepare inference parameters | |
| inference_kwargs = { | |
| "image": request.image_paths, | |
| "export_format": request.export_format, | |
| "process_res": request.process_res, | |
| "process_res_method": request.process_res_method, | |
| "export_feat_layers": request.export_feat_layers, | |
| "align_to_input_ext_scale": request.align_to_input_ext_scale, | |
| "conf_thresh_percentile": request.conf_thresh_percentile, | |
| "num_max_points": request.num_max_points, | |
| "show_cameras": request.show_cameras, | |
| "feat_vis_fps": request.feat_vis_fps, | |
| } | |
| if request.export_dir: | |
| inference_kwargs["export_dir"] = request.export_dir | |
| if request.extrinsics: | |
| inference_kwargs["extrinsics"] = np.array(request.extrinsics, dtype=np.float32) | |
| if request.intrinsics: | |
| inference_kwargs["intrinsics"] = np.array(request.intrinsics, dtype=np.float32) | |
| # Run inference with timing | |
| inference_start_time = time.time() | |
| print(f"[{task_id}] Running model inference...") | |
| _tasks[task_id].message = f"[{task_id}] Running model inference on {num_images} images..." | |
| _tasks[task_id].progress = 0.3 | |
| inference_started = True | |
| try: | |
| model.inference(**inference_kwargs) | |
| inference_time = time.time() - inference_start_time | |
| avg_time_per_image = inference_time / num_images if num_images > 0 else 0 | |
| print( | |
| f"[{task_id}] Inference completed in {inference_time:.2f}s " | |
| f"({avg_time_per_image:.2f}s per image)" | |
| ) | |
| except RuntimeError as e: | |
| if "out of memory" in str(e).lower(): | |
| _cleanup_cuda_memory() | |
| raise RuntimeError( | |
| f"OOM during inference: {str(e)}\n" | |
| f"Settings: {num_images} images, resolution={request.process_res}\n" | |
| f"Suggestions:\n" | |
| f" 1. Reduce process_res to {int(request.process_res * 0.75)}\n" | |
| f" 2. Process images in smaller batches\n" | |
| f" 3. Use process_res_method='resize' instead of 'upper_bound_resize'" | |
| ) | |
| raise | |
| _tasks[task_id].progress = 0.9 | |
| # Post-inference cleanup | |
| print(f"[{task_id}] Post-inference cleanup...") | |
| _cleanup_cuda_memory() | |
| # Calculate total processing time | |
| total_time = time.time() - start_time | |
| # Update task status to completed | |
| _tasks[task_id].status = "completed" | |
| _tasks[task_id].completed_at = time.time() | |
| _tasks[task_id].message = ( | |
| f"[{task_id}] Completed in {total_time:.2f}s " f"({avg_time_per_image:.2f}s per image)" | |
| ) | |
| _tasks[task_id].progress = 1.0 | |
| _tasks[task_id].export_dir = request.export_dir | |
| # Clear running state | |
| _running_task_id = None | |
| # Process next task in queue | |
| _process_next_task() | |
| print(f"[{task_id}] Task completed successfully") | |
| print( | |
| f"[{task_id}] Total time: {total_time:.2f}s, " | |
| f"Inference time: {inference_time:.2f}s, " | |
| f"Avg per image: {avg_time_per_image:.2f}s" | |
| ) | |
| except Exception as e: | |
| # Update task status to failed | |
| error_msg = str(e) | |
| total_time = time.time() - start_time | |
| print(f"[{task_id}] Task failed after {total_time:.2f}s: {error_msg}") | |
| # Always attempt cleanup on failure | |
| _cleanup_cuda_memory() | |
| _tasks[task_id].status = "failed" | |
| _tasks[task_id].completed_at = time.time() | |
| _tasks[task_id].message = f"[{task_id}] Failed after {total_time:.2f}s: {error_msg}" | |
| # Clear running state | |
| _running_task_id = None | |
| # Process next task in queue | |
| _process_next_task() | |
| finally: | |
| # Final cleanup in finally block to ensure it always runs | |
| # This is critical for releasing resources even if unexpected errors occur | |
| try: | |
| if inference_started: | |
| print(f"[{task_id}] Final cleanup in finally block...") | |
| _cleanup_cuda_memory() | |
| except Exception as e: | |
| print(f"[{task_id}] Warning: Finally block cleanup failed: {e}") | |
| # Schedule cleanup after task completion | |
| _schedule_task_cleanup() | |
| def _cleanup_old_tasks(): | |
| """Clean up old completed/failed tasks to prevent memory buildup.""" | |
| global _tasks | |
| current_time = time.time() | |
| tasks_to_remove = [] | |
| # Find tasks to remove - more aggressive cleanup | |
| for task_id, task in _tasks.items(): | |
| # Remove completed/failed tasks older than 10 minutes (instead of 1 hour) | |
| if ( | |
| task.status in ["completed", "failed"] | |
| and task.completed_at | |
| and current_time - task.completed_at > 600 | |
| ): # 10 minutes | |
| tasks_to_remove.append(task_id) | |
| # Remove old tasks | |
| for task_id in tasks_to_remove: | |
| del _tasks[task_id] | |
| print(f"[CLEANUP] Removed old task: {task_id}") | |
| # If still too many tasks, remove oldest completed/failed tasks | |
| if len(_tasks) > MAX_TASK_HISTORY: | |
| completed_tasks = [ | |
| (task_id, task) | |
| for task_id, task in _tasks.items() | |
| if task.status in ["completed", "failed"] | |
| ] | |
| completed_tasks.sort(key=lambda x: x[1].completed_at or 0) | |
| excess_count = len(_tasks) - MAX_TASK_HISTORY | |
| for i in range(min(excess_count, len(completed_tasks))): | |
| task_id = completed_tasks[i][0] | |
| del _tasks[task_id] | |
| print(f"[CLEANUP] Removed excess task: {task_id}") | |
| # Count active tasks (only pending and running) | |
| active_count = sum(1 for task in _tasks.values() if task.status in ["pending", "running"]) | |
| print( | |
| "[CLEANUP] Task cleanup completed. " | |
| f"Total tasks: {len(_tasks)}, Active tasks: {active_count}" | |
| ) | |
| def _schedule_task_cleanup(): | |
| """Schedule task cleanup in background.""" | |
| def cleanup_worker(): | |
| try: | |
| time.sleep(2) # Small delay to ensure task status is updated | |
| _cleanup_old_tasks() | |
| except Exception as e: | |
| print(f"[CLEANUP] Cleanup worker failed: {e}") | |
| # Run cleanup in background thread | |
| _executor.submit(cleanup_worker) | |
| # |