Spaces:
Running
Running
| """ | |
| YOUTUBE PROCESSING TOOL | |
| Enhanced with cookies support for bot detection bypass | |
| """ | |
| import os | |
| import re | |
| import json | |
| import tempfile | |
| from typing import Dict, Any, Optional, List | |
| from urllib.parse import urlparse, parse_qs | |
| try: | |
| from pytube import YouTube | |
| from youtube_transcript_api import YouTubeTranscriptApi | |
| import yt_dlp | |
| except ImportError as e: | |
| print(f"⚠️ YouTube dependencies missing: {e}") | |
| from .state_manager import get_agent_state | |
| class YouTubeTool: | |
| def __init__(self): | |
| # Lấy path cookie từ biến môi trường | |
| self.cookies_path = os.environ.get("YOUTUBE_COOKIES_PATH") | |
| if not self.cookies_path: | |
| raise ValueError("YOUTUBE_COOKIES_PATH environment variable is not set. Please set it to the path of your cookies.txt file.") | |
| print(f"🎬 YouTube Tool with cookies support initialized. Cookie path: {self.cookies_path}") | |
| def process_youtube(self, youtube_input: str, **kwargs) -> Dict[str, Any]: | |
| """ | |
| Process YouTube content with cookie authentication | |
| """ | |
| try: | |
| # Extract video ID from URL or use as-is | |
| video_id = self._extract_video_id(youtube_input) | |
| if not video_id: | |
| return self._error_result("Invalid YouTube URL or video ID") | |
| print(f"🎬 Processing YouTube video: {video_id}") | |
| # Try multiple extraction methods | |
| video_data = self._extract_with_cookies(video_id) or self._extract_with_pytube(video_id) | |
| if not video_data: | |
| return self._error_result("Could not extract video data") | |
| # Update agent state | |
| state = get_agent_state() | |
| state.cached_data["youtube_analysis"] = video_data | |
| return { | |
| "success": True, | |
| "data": video_data, | |
| "summary": f"YouTube video processed: {video_data.get('title', 'Unknown')[:50]}..." | |
| } | |
| except Exception as e: | |
| error_msg = f"YouTube processing failed: {str(e)}" | |
| print(f"❌ {error_msg}") | |
| return self._error_result(error_msg) | |
| def _extract_video_id(self, url_or_id: str) -> Optional[str]: | |
| """Extract video ID from YouTube URL or return if already ID""" | |
| if len(url_or_id) == 11 and url_or_id.isalnum(): | |
| return url_or_id | |
| # Extract from various YouTube URL formats | |
| patterns = [ | |
| r'(?:youtube\.com/watch\?v=|youtu\.be/|youtube\.com/embed/)([a-zA-Z0-9_-]{11})', | |
| r'youtube\.com/.*[?&]v=([a-zA-Z0-9_-]{11})', | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, url_or_id) | |
| if match: | |
| return match.group(1) | |
| return None | |
| def _extract_with_cookies(self, video_id: str) -> Optional[Dict[str, Any]]: | |
| """Extract using yt-dlp with cookies for better success rate""" | |
| try: | |
| ydl_opts = { | |
| 'quiet': True, | |
| 'no_warnings': True, | |
| 'extractaudio': False, | |
| 'extract_flat': False, | |
| } | |
| # Add cookies if file exists | |
| if os.path.exists(self.cookies_path): | |
| ydl_opts['cookiefile'] = self.cookies_path | |
| print(f"🍪 Using cookies from: {self.cookies_path}") | |
| url = f"https://www.youtube.com/watch?v={video_id}" | |
| with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
| info = ydl.extract_info(url, download=False) | |
| # Extract transcript using youtube-transcript-api | |
| transcript = self._get_transcript(video_id) | |
| return { | |
| "video_id": video_id, | |
| "title": info.get('title', ''), | |
| "description": info.get('description', ''), | |
| "channel": info.get('uploader', ''), | |
| "duration": info.get('duration', 0), | |
| "view_count": info.get('view_count', 0), | |
| "transcript": transcript, | |
| "thumbnail_url": info.get('thumbnail', ''), | |
| "upload_date": info.get('upload_date', ''), | |
| "url": url, | |
| "extraction_method": "yt-dlp_with_cookies" | |
| } | |
| except Exception as e: | |
| print(f"⚠️ yt-dlp extraction failed: {str(e)}") | |
| return None | |
| def _extract_with_pytube(self, video_id: str) -> Optional[Dict[str, Any]]: | |
| """Fallback extraction using pytube""" | |
| try: | |
| url = f"https://www.youtube.com/watch?v={video_id}" | |
| yt = YouTube(url) | |
| transcript = self._get_transcript(video_id) | |
| return { | |
| "video_id": video_id, | |
| "title": yt.title or '', | |
| "description": yt.description or '', | |
| "channel": yt.author or '', | |
| "duration": yt.length or 0, | |
| "view_count": yt.views or 0, | |
| "transcript": transcript, | |
| "thumbnail_url": yt.thumbnail_url or '', | |
| "upload_date": str(yt.publish_date) if yt.publish_date else '', | |
| "url": url, | |
| "extraction_method": "pytube_fallback" | |
| } | |
| except Exception as e: | |
| print(f"⚠️ PyTube extraction failed: {str(e)}") | |
| return None | |
| def _get_transcript(self, video_id: str) -> str: | |
| """Get video transcript using youtube-transcript-api""" | |
| try: | |
| # Try to get transcript in multiple languages | |
| languages = ['en', 'en-US', 'auto', 'vi'] | |
| for lang in languages: | |
| try: | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id, languages=[lang]) | |
| transcript_text = ' '.join([entry['text'] for entry in transcript_list]) | |
| if transcript_text.strip(): | |
| return transcript_text | |
| except: | |
| continue | |
| # If no manual transcript, try auto-generated | |
| try: | |
| transcript_list = YouTubeTranscriptApi.get_transcript(video_id) | |
| return ' '.join([entry['text'] for entry in transcript_list]) | |
| except: | |
| return "No transcript available" | |
| except Exception as e: | |
| print(f"⚠️ Transcript extraction failed: {str(e)}") | |
| return "Transcript extraction failed" | |
| def is_youtube_url(self, text: str) -> bool: | |
| """Check if text contains YouTube URL""" | |
| youtube_patterns = [ | |
| r'youtube\.com/watch\?v=', | |
| r'youtu\.be/', | |
| r'youtube\.com/embed/', | |
| r'youtube\.com/.*[?&]v=' | |
| ] | |
| return any(re.search(pattern, text, re.IGNORECASE) for pattern in youtube_patterns) | |
| def _error_result(self, error_msg: str) -> Dict[str, Any]: | |
| """Standard error result format""" | |
| return { | |
| "success": False, | |
| "error": error_msg, | |
| "data": None, | |
| "summary": f"YouTube processing failed: {error_msg}" | |
| } |