Spaces:
Build error
Build error
| """ | |
| AI-Enhanced Video Analysis with Gradio Live Video | |
| Features: Real-time YOLO detection, GPT queries, Vector DB storage | |
| Optimized for Hugging Face Spaces deployment | |
| """ | |
| import gradio as gr | |
| import cv2 | |
| import numpy as np | |
| from collections import deque | |
| import time | |
| from datetime import datetime | |
| import json | |
| import os | |
| from threading import Lock | |
| # Ensure Ultralytics writes settings/cache inside the project workspace | |
| ULTRALYTICS_BASE = os.path.join(os.path.dirname(__file__), ".ultralytics") | |
| os.environ.setdefault("ULTRALYTICS_SETTINGS_DIR", ULTRALYTICS_BASE) | |
| os.environ.setdefault("ULTRALYTICS_CACHE_DIR", os.path.join(ULTRALYTICS_BASE, "cache")) | |
| os.makedirs(os.environ["ULTRALYTICS_SETTINGS_DIR"], exist_ok=True) | |
| os.makedirs(os.environ["ULTRALYTICS_CACHE_DIR"], exist_ok=True) | |
| # AI & Vector DB imports | |
| from openai import OpenAI | |
| import chromadb | |
| from chromadb.config import Settings | |
| # YOLO import | |
| try: | |
| from ultralytics import YOLO | |
| YOLO_AVAILABLE = True | |
| except ImportError: | |
| YOLO_AVAILABLE = False | |
| # Global state management | |
| class VideoAnalysisState: | |
| def __init__(self): | |
| self.lock = Lock() | |
| self.frame_chunks = deque(maxlen=100) | |
| self.chunk_id = 0 | |
| self.detected_objects = [] | |
| self.pending_chunks = [] | |
| self.event_log = deque(maxlen=50) | |
| self.openai_client = None | |
| self.chroma_client = None | |
| self.video_collection = None | |
| self.model = None | |
| self.frames_processed = 0 | |
| self.frames_processed = 0 | |
| def init_openai(self, api_key): | |
| """Initialize OpenAI client""" | |
| if not api_key: | |
| return False | |
| try: | |
| self.openai_client = OpenAI(api_key=api_key) | |
| # Test the connection | |
| self.openai_client.models.list() | |
| return True | |
| except Exception as e: | |
| self.event_log.append(f"β OpenAI error: {str(e)[:50]}") | |
| return False | |
| def init_vector_db(self): | |
| """Initialize ChromaDB""" | |
| try: | |
| self.chroma_client = chromadb.Client(Settings( | |
| anonymized_telemetry=False, | |
| allow_reset=True | |
| )) | |
| self.video_collection = self.chroma_client.get_or_create_collection( | |
| name="video_events", | |
| metadata={"hnsw:space": "cosine"} | |
| ) | |
| return True | |
| except Exception as e: | |
| self.event_log.append(f"β Vector DB error: {str(e)[:50]}") | |
| return False | |
| def init_yolo(self): | |
| """Initialize YOLO model""" | |
| if YOLO_AVAILABLE and self.model is None: | |
| try: | |
| self.model = YOLO('yolov8n.pt') | |
| self.event_log.append("β YOLO model loaded") | |
| return True | |
| except Exception as e: | |
| self.event_log.append(f"β YOLO error: {str(e)[:50]}") | |
| return False | |
| return self.model is not None | |
| # Global state | |
| state = VideoAnalysisState() | |
| def get_dominant_color(image_region): | |
| """Get dominant color from image region""" | |
| if image_region.size == 0: | |
| return "unknown" | |
| hsv = cv2.cvtColor(image_region, cv2.COLOR_BGR2HSV) | |
| h = np.mean(hsv[:, :, 0]) | |
| s = np.mean(hsv[:, :, 1]) | |
| v = np.mean(hsv[:, :, 2]) | |
| if s < 40: | |
| if v < 50: | |
| return "black" | |
| elif v > 200: | |
| return "white" | |
| else: | |
| return "gray" | |
| if h < 10 or h > 160: | |
| return "red" | |
| elif h < 25: | |
| return "orange" | |
| elif h < 35: | |
| return "yellow" | |
| elif h < 85: | |
| return "green" | |
| elif h < 125: | |
| return "blue" | |
| elif h < 155: | |
| return "purple" | |
| else: | |
| return "pink" | |
| def process_frame(frame): | |
| """Process video frame with YOLO detection""" | |
| if frame is None: | |
| return gr.update(value=None, visible=False) | |
| if state.model is None: | |
| return gr.update(value=frame, visible=True) | |
| # Convert incoming RGB frame to BGR for OpenCV/YOLO processing | |
| frame_bgr = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR) | |
| try: | |
| # Run YOLO detection | |
| results = state.model(frame_bgr, conf=0.4, verbose=False) | |
| detected_objects = [] | |
| events_text = [] | |
| for r in results: | |
| boxes = r.boxes | |
| if boxes is not None: | |
| for box in boxes: | |
| x1, y1, x2, y2 = map(int, box.xyxy[0].tolist()) | |
| conf = box.conf[0].item() | |
| cls = int(box.cls[0].item()) | |
| label = state.model.names[cls] | |
| # Get color | |
| try: | |
| roi = frame_bgr[y1:y2, x1:x2] | |
| color = get_dominant_color(roi) | |
| except: | |
| color = "unknown" | |
| detected_objects.append({ | |
| 'label': label, | |
| 'color': color, | |
| 'confidence': conf, | |
| 'bbox': (x1, y1, x2, y2) | |
| }) | |
| events_text.append(f"{color} {label}") | |
| # Draw bounding box | |
| cv2.rectangle(frame_bgr, (x1, y1), (x2, y2), (0, 255, 0), 2) | |
| # Draw label | |
| text = f"{color} {label} {conf:.2f}" | |
| cv2.putText(frame_bgr, text, (x1, y1-10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) | |
| # Update state (thread-safe) | |
| with state.lock: | |
| state.detected_objects = detected_objects | |
| state.frames_processed += 1 | |
| # Create chunks every 30 frames | |
| if state.chunk_id % 30 == 0 and events_text: | |
| chunk_description = f"At {datetime.now().strftime('%H:%M:%S')}: Detected {', '.join(events_text)}" | |
| state.frame_chunks.append({ | |
| 'id': state.chunk_id, | |
| 'timestamp': time.time(), | |
| 'description': chunk_description, | |
| 'objects': detected_objects.copy() | |
| }) | |
| state.pending_chunks.append({ | |
| 'id': state.chunk_id, | |
| 'description': chunk_description, | |
| 'timestamp': time.time(), | |
| 'object_count': len(detected_objects) | |
| }) | |
| state.chunk_id += 1 | |
| chunk_count = len(state.frame_chunks) | |
| # Add stats overlay | |
| cv2.putText(frame_bgr, f"Objects: {len(detected_objects)} | Chunks: {chunk_count}", | |
| (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2) | |
| # Convert back to RGB for display in Gradio | |
| frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB) | |
| return gr.update(value=frame_rgb, visible=True) | |
| except Exception as e: | |
| state.event_log.append(f"β Frame error: {str(e)[:50]}") | |
| return gr.update(value=frame, visible=True) | |
| def get_embedding(text): | |
| """Get embeddings from OpenAI""" | |
| if not state.openai_client: | |
| return None | |
| try: | |
| response = state.openai_client.embeddings.create( | |
| model="text-embedding-3-small", | |
| input=text | |
| ) | |
| return response.data[0].embedding | |
| except Exception as e: | |
| state.event_log.append(f"β Embedding error: {str(e)[:50]}") | |
| return None | |
| def process_pending_chunks(): | |
| """Process chunks waiting to be embedded""" | |
| with state.lock: | |
| if not state.pending_chunks or not state.video_collection: | |
| return 0 | |
| chunks_to_process = state.pending_chunks[:5] | |
| processed = 0 | |
| for chunk in chunks_to_process: | |
| try: | |
| embedding = get_embedding(chunk['description']) | |
| if embedding: | |
| state.video_collection.add( | |
| documents=[chunk['description']], | |
| embeddings=[embedding], | |
| ids=[f"chunk_{chunk['id']}"], | |
| metadatas=[{ | |
| 'timestamp': chunk['timestamp'], | |
| 'object_count': chunk['object_count'] | |
| }] | |
| ) | |
| with state.lock: | |
| state.pending_chunks.remove(chunk) | |
| processed += 1 | |
| except Exception as e: | |
| state.event_log.append(f"β Embed error: {str(e)[:30]}") | |
| break | |
| return processed | |
| def query_with_ai(question): | |
| """Answer questions using GPT with vector database context""" | |
| if not state.openai_client: | |
| return "β οΈ Please enter your OpenAI API key first." | |
| if not question or not question.strip(): | |
| return "β οΈ Please enter a question." | |
| try: | |
| # Process pending chunks | |
| with state.lock: | |
| has_pending = len(state.pending_chunks) > 0 | |
| if has_pending: | |
| processed = process_pending_chunks() | |
| if processed > 0: | |
| state.event_log.append(f"β Embedded {processed} chunks") | |
| # Get context from vector DB | |
| context_docs = [] | |
| if state.video_collection: | |
| question_embedding = get_embedding(question) | |
| if question_embedding: | |
| results = state.video_collection.query( | |
| query_embeddings=[question_embedding], | |
| n_results=5 | |
| ) | |
| if results and results['documents']: | |
| context_docs = results['documents'][0] | |
| context = "\n".join(context_docs) if context_docs else "No video events stored yet." | |
| # Get current state | |
| with state.lock: | |
| current_objects = state.detected_objects.copy() | |
| frames_seen = state.frames_processed | |
| if current_objects: | |
| obj_descriptions = [f"{o['color']} {o['label']}" for o in current_objects] | |
| current_state = f"Currently visible: {', '.join(obj_descriptions)}" | |
| else: | |
| if frames_seen > 0: | |
| current_state = "Video stream active but no objects detected in the latest frame." | |
| else: | |
| current_state = "No video frames processed yet." | |
| if not context_docs and frames_seen > 0: | |
| context = "Video stream active, waiting for notable detections to log." | |
| # Create prompt | |
| prompt = f"""You are a video analysis assistant. Answer the question based on the video footage context. | |
| Video Event History (from vector database): | |
| {context} | |
| Current Frame: | |
| {current_state} | |
| Question: {question} | |
| Provide a concise, helpful answer based on the video data.""" | |
| # Call GPT | |
| response = state.openai_client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful video analysis assistant."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.7, | |
| max_tokens=200 | |
| ) | |
| answer = response.choices[0].message.content | |
| state.event_log.append(f"β Query answered") | |
| return f"**AI Answer:**\n\n{answer}" | |
| except Exception as e: | |
| error_msg = f"Error querying AI: {str(e)}" | |
| state.event_log.append(f"β Query error: {str(e)[:30]}") | |
| return error_msg | |
| def setup_api_key(api_key): | |
| """Setup OpenAI API key and initialize services""" | |
| if not api_key or not api_key.strip(): | |
| return "β οΈ Please enter a valid API key", get_stats() | |
| success = state.init_openai(api_key) | |
| if success: | |
| state.init_vector_db() | |
| state.init_yolo() | |
| return "β OpenAI connected! Vector DB initialized!", get_stats() | |
| else: | |
| return "β Failed to connect to OpenAI. Check your API key.", get_stats() | |
| def get_stats(): | |
| """Get current system statistics""" | |
| with state.lock: | |
| chunks = len(state.frame_chunks) | |
| objects = len(state.detected_objects) | |
| pending = len(state.pending_chunks) | |
| vector_count = 0 | |
| if state.video_collection: | |
| try: | |
| vector_count = state.video_collection.count() | |
| except: | |
| vector_count = 0 | |
| stats = f"""**System Status:** | |
| - Chunks Stored: {chunks} | |
| - Current Objects: {objects} | |
| - Pending Embeddings: {pending} | |
| - Vector DB Entries: {vector_count} | |
| """ | |
| return stats | |
| def get_current_detections(): | |
| """Get list of currently detected objects""" | |
| with state.lock: | |
| current = state.detected_objects.copy() | |
| if not current: | |
| return "No objects detected" | |
| output = "**Current Detections:**\n\n" | |
| for i, obj in enumerate(current): | |
| output += f"{i+1}. {obj['color']} {obj['label']} ({obj['confidence']:.2f})\n" | |
| return output | |
| def get_recent_chunks(): | |
| """Get recent video chunks""" | |
| with state.lock: | |
| recent = list(state.frame_chunks)[-5:] | |
| if not recent: | |
| return "No chunks yet - start the video!" | |
| output = "**Recent Video Chunks:**\n\n" | |
| for chunk in recent: | |
| output += f"[{chunk['id']}] {chunk['description']}\n\n" | |
| return output | |
| def get_event_log(): | |
| """Get recent event log""" | |
| with state.lock: | |
| events = list(state.event_log)[-10:] | |
| if not events: | |
| return "No events yet" | |
| return "\n".join(events) | |
| # Initialize YOLO on startup | |
| state.init_yolo() | |
| # Build Gradio interface | |
| with gr.Blocks(title="AI Video Analysis", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π₯ AI-Enhanced Video Analysis") | |
| gr.Markdown("*Real-time object detection with GPT queries and vector database storage*") | |
| with gr.Row(): | |
| # Left column - Video and controls | |
| with gr.Column(scale=2): | |
| gr.Markdown("## πΉ Live Video Feed") | |
| # API Key setup | |
| with gr.Row(): | |
| api_key_input = gr.Textbox( | |
| label="OpenAI API Key", | |
| type="password", | |
| placeholder="sk-...", | |
| scale=3 | |
| ) | |
| setup_btn = gr.Button("Connect", scale=1, variant="primary") | |
| api_status = gr.Markdown("β οΈ Enter your OpenAI API key to enable AI features") | |
| # Live Video Stream | |
| if YOLO_AVAILABLE: | |
| processed_feed = gr.Image( | |
| label="YOLO Detection Feed", | |
| interactive=False, | |
| type="numpy", | |
| visible=False | |
| ) | |
| webcam_stream = gr.Image( | |
| label="Webcam Stream", | |
| sources=["webcam"], | |
| streaming=True, | |
| type="numpy" | |
| ) | |
| webcam_stream.stream( | |
| fn=process_frame, | |
| inputs=webcam_stream, | |
| outputs=processed_feed | |
| ) | |
| gr.Markdown("πΉ Start the webcam to reveal the YOLO view above. Detections update in real-time and frames are chunked every ~1 second!") | |
| else: | |
| gr.Markdown("β YOLO not available. Install with: `pip install ultralytics`") | |
| # Troubleshooting | |
| with gr.Accordion("β οΈ Connection Troubleshooting", open=False): | |
| gr.Markdown(""" | |
| **If video doesn't connect:** | |
| 1. **Allow camera permissions** in your browser | |
| 2. **Use HTTPS** - Hugging Face Spaces provides this automatically | |
| 3. **Try Chrome/Edge** - Best webcam streaming support | |
| 4. **Wait 30-60 seconds** on first load for YOLO model download | |
| 5. **Check browser console** for errors (F12) | |
| Live streaming uses browser-based webcam APIs; ensure camera access is allowed. | |
| """) | |
| # Right column - AI Query and Stats | |
| with gr.Column(scale=1): | |
| gr.Markdown("## π€ AI Query Interface") | |
| query_input = gr.Textbox( | |
| label="Ask about the video", | |
| placeholder="e.g., What objects appeared in the last 30 seconds?", | |
| lines=3 | |
| ) | |
| query_btn = gr.Button("π Ask AI", variant="primary") | |
| query_output = gr.Markdown("*AI response will appear here*") | |
| gr.Markdown("---") | |
| # Stats | |
| stats_display = gr.Markdown(value=get_stats, every=10) | |
| refresh_btn = gr.Button("π Refresh Stats", size="sm") | |
| gr.Markdown("---") | |
| # Current detections | |
| detections_display = gr.Markdown( | |
| value=get_current_detections, | |
| every=10 | |
| ) | |
| gr.Markdown("---") | |
| # Recent chunks | |
| chunks_display = gr.Markdown( | |
| value=get_recent_chunks, | |
| every=10 | |
| ) | |
| gr.Markdown("---") | |
| # Event log | |
| gr.Markdown("### π Event Log") | |
| log_display = gr.Markdown( | |
| value=get_event_log, | |
| every=10 | |
| ) | |
| # How it works | |
| with gr.Accordion("βΉοΈ How This Works", open=False): | |
| gr.Markdown(""" | |
| ### π― Features: | |
| **1. Real-time Object Detection:** | |
| - YOLOv8 detects objects in your webcam feed | |
| - Color detection identifies object colors | |
| - Bounding boxes drawn in real-time | |
| **2. Frame Chunking:** | |
| - Video frames grouped into 1-second chunks (30 frames) | |
| - Chunks stored in memory (last 100) and vector database | |
| **3. Vector Database (ChromaDB):** | |
| - Semantic embeddings of video events | |
| - Similarity search across video history | |
| **4. OpenAI Integration:** | |
| - GPT-4o-mini for intelligent query answering | |
| - text-embedding-3-small for semantic search | |
| - Context-aware responses based on video history | |
| ### π§ Tech Stack: | |
| - **YOLOv8**: Real-time object detection | |
| - **Gradio Live Video**: Smooth webcam streaming | |
| - **OpenAI GPT**: Natural language understanding | |
| - **ChromaDB**: Vector similarity search | |
| - **Hugging Face Spaces**: Free deployment with TURN servers | |
| ### π° Costs: | |
| - **Hugging Face Spaces**: Free (or $9/month PRO for better resources) | |
| - **OpenAI API**: Pay-as-you-go (minimal for this use case) | |
| - **TURN Servers**: Free 10GB/month via Cloudflare FastRTC | |
| """) | |
| # Event handlers | |
| setup_btn.click( | |
| fn=setup_api_key, | |
| inputs=[api_key_input], | |
| outputs=[api_status, stats_display] | |
| ) | |
| query_btn.click( | |
| fn=query_with_ai, | |
| inputs=[query_input], | |
| outputs=[query_output] | |
| ) | |
| refresh_btn.click( | |
| fn=lambda: [get_stats(), get_current_detections(), get_recent_chunks(), get_event_log()], | |
| outputs=[stats_display, detections_display, chunks_display, log_display] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| demo.launch() | |