""" AI AGENT WITH LANGGRAPH + HUGGINGFACE INTEGRATION Clean architecture with LangChain HuggingFace Pipeline """ import os import json import time from typing import Dict, Any, List, Optional, Annotated from dotenv import load_dotenv from langchain_core.messages import HumanMessage, AIMessage, SystemMessage from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser, StrOutputParser from langgraph.graph import StateGraph, END from langgraph.graph.message import add_messages from typing_extensions import TypedDict from pydantic import BaseModel, Field # LangChain HuggingFace Integration from huggingface_hub import InferenceClient from utils import ( process_question_with_tools, get_agent_state, reset_agent_state, ToolOrchestrator, get_system_prompt, get_response_prompt, build_context_summary, analyze_question_type ) load_dotenv() class AgentState(TypedDict): messages: Annotated[List, add_messages] question: str task_id: str ai_analysis: Dict[str, Any] should_use_tools: bool tool_processing_result: Dict[str, Any] final_answer: str processing_complete: bool class QuestionAnalysis(BaseModel): question_type: str = Field(description="Type: youtube|image|audio|wiki|file|text|math") needs_tools: bool = Field(description="Whether tools are needed") reasoning: str = Field(description="AI reasoning for the decision") confidence: str = Field(description="Confidence level: high|medium|low") class AIBrain: def __init__(self): self.model_name = "Qwen/Qwen3-8B" print("🧠 Initializing Qwen3-8B với huggingface_hub InferenceClient...") self.client = InferenceClient( provider="auto", api_key=os.environ["HF_TOKEN"], ) print("✅ Qwen3 AI Brain với huggingface_hub InferenceClient đã sẵn sàng") def _generate_with_qwen3(self, prompt: str, max_tokens: int = 2048) -> str: """Sinh text với Qwen3 bằng huggingface_hub InferenceClient""" try: messages = [ {"role": "user", "content": prompt + "\n/no_thinking"} ] completion = self.client.chat.completions.create( model=self.model_name, messages=messages, max_tokens=max_tokens, ) return completion.choices[0].message.content except Exception as e: print(f"⚠️ Qwen3 InferenceClient error: {str(e)}") return f"AI generation failed: {str(e)}" def analyze_question(self, question: str, task_id: str = "") -> Dict[str, Any]: """Analyze question type using Qwen3 with strict JSON output""" prompt = f""" Analyze this question and determine the correct tool approach. Return ONLY valid JSON. - If the question is about a historical event, a specific person, place, object, or something that requires searching the internet (e.g., Wikipedia), you MUST choose "wiki". - If the question is about an event in the past or future (e.g., "when was", "in what year", "has ever", "will happen", "history", "prediction"), choose "wiki". - If the question asks about a specific topic, person, place, object, or event (e.g., "who is", "what is", "where is", "when is", "why", "how"), choose "wiki". - If the data source is unclear or you are not sure, prefer "wiki". - Other types: youtube (if there is a link/video), image (if there is an image), audio (if there is an audio file), file (if there is an attachment), text (if it is text analysis or math). {question} {task_id} - YouTube URLs (youtube.com, youtu.be): "youtube" - Images, photos, chess positions, visual content: "image" - Audio files, voice, sound, mp3: "audio" - Excel, CSV, documents, file uploads: "file" - Wikipedia searches, historical facts, people info, events, future/past, specific topics: "wiki" - Math calculations, logic, text analysis: "text" Return this exact JSON format: {{ "question_type": "youtube|image|audio|wiki|file|text", "needs_tools": true, "reasoning": "Brief explanation of classification", "confidence": "high" }}""" try: response = self._generate_with_qwen3(prompt, 512) # Extract JSON from response import re json_pattern = r'\{[^{}]*\}' json_match = re.search(json_pattern, response) if json_match: result = json.loads(json_match.group()) # Validate required fields required_fields = ["question_type", "needs_tools", "reasoning", "confidence"] if all(field in result for field in required_fields): return result raise ValueError("Invalid JSON structure in response") except Exception as e: print(f"⚠️ Qwen3 analysis failed: {str(e)[:100]}...") # Fallback analysis question_type = analyze_question_type(question) return { "question_type": question_type, "needs_tools": question_type in ["wiki", "youtube", "image", "audio", "file"], "reasoning": f"Fallback classification: detected {question_type}", "confidence": "medium" } def generate_answer(self, question: str, tool_results: Dict[str, Any]) -> str: """Generate final answer using Qwen3 with context""" if tool_results and tool_results.get("tool_results"): context = build_context_summary( tool_results.get("tool_results", []), tool_results.get("cached_data", {}) ) else: context = "" prompt = f""" Answer the following question with only the answer. Do not explain, do not add any extra text, do not repeat the question, do not add punctuation or any prefix/suffix. Just output the answer as short and direct as possible. If the answer is not available, reply with 'No data'. Context (if any): {context} Question: {question} """ response = self._generate_with_qwen3(prompt, 2048) # Dùng LangChain StrOutputParser để lấy phần text cuối cùng parser = StrOutputParser() answer = parser.parse(response) answer = answer.strip() # Remove common prefixes for prefix in ["Answer:", "The answer is", "FINAL ANSWER:", "Final answer:", "final answer:"]: if answer.lower().startswith(prefix.lower()): answer = answer[len(prefix):].strip() # Remove trailing period if only one word/number if answer.endswith(".") and answer.count(" ") < 2: answer = answer[:-1].strip() return answer # Initialize AI Brain globally ai_brain = AIBrain() def analyze_question_node(state: AgentState) -> AgentState: """Analyze question using Qwen3 AI Brain""" question = state["question"] task_id = state.get("task_id", "") print("🔍 Analyzing question with Qwen3...") analysis = ai_brain.analyze_question(question, task_id) state["ai_analysis"] = analysis state["should_use_tools"] = analysis.get("needs_tools", True) print(f"📊 Type: {analysis.get('question_type')} | Tools: {analysis.get('needs_tools')} | Confidence: {analysis.get('confidence')}") return state def process_with_tools_node(state: AgentState) -> AgentState: """Process question with appropriate tools""" question = state["question"] task_id = state.get("task_id", "") print("🔧 Processing with specialized tools...") tool_results = process_question_with_tools(question, task_id) state["tool_processing_result"] = tool_results # Log response từ wiki nếu có for result in tool_results.get("tool_results", []): if result.tool_name == "wiki_search": print(f"[DEBUG] Wiki tool response: {result.result}") if result.tool_name == "audio_transcript": print(f"[DEBUG] Audio transcript: {result.result}") successful_tools = [result.tool_name for result in tool_results.get("tool_results", []) if result.success] if successful_tools: print(f"✅ Successful tools: {successful_tools}") else: print("⚠️ No tools succeeded") return state def answer_directly_node(state: AgentState) -> AgentState: """Answer directly without tools using Qwen3""" question = state["question"] print("💭 Generating direct answer with Qwen3...") answer = ai_brain.generate_answer(question, {}) state["final_answer"] = answer state["processing_complete"] = True return state def generate_final_answer_node(state: AgentState) -> AgentState: """Generate final answer combining tool results and AI analysis""" question = state["question"] tool_results = state.get("tool_processing_result", {}) print("🎯 Generating final answer with context...") answer = ai_brain.generate_answer(question, tool_results) state["final_answer"] = answer state["processing_complete"] = True return state def create_agent_workflow(): """Create LangGraph workflow for question processing""" workflow = StateGraph(AgentState) # Add nodes workflow.add_node("analyze_question", analyze_question_node) workflow.add_node("process_with_tools", process_with_tools_node) workflow.add_node("answer_directly", answer_directly_node) workflow.add_node("generate_final_answer", generate_final_answer_node) # Define routing logic def should_use_tools(state: AgentState) -> str: return "process_with_tools" if state.get("should_use_tools", True) else "answer_directly" # Set up the flow workflow.set_entry_point("analyze_question") workflow.add_conditional_edges("analyze_question", should_use_tools) workflow.add_edge("process_with_tools", "generate_final_answer") workflow.add_edge("answer_directly", END) workflow.add_edge("generate_final_answer", END) return workflow.compile() class LangGraphUtilsAgent: def __init__(self): self.app = create_agent_workflow() print("🚀 LangGraph Agent with Qwen3 + Utils System ready") def process_question(self, question: str, task_id: str = "") -> str: """Process question through the workflow""" try: print(f"\n🎯 Processing: {question[:100]}...") # Initialize state initial_state = { "messages": [HumanMessage(content=question)], "question": question, "task_id": task_id, "ai_analysis": {}, "should_use_tools": True, "tool_processing_result": {}, "final_answer": "", "processing_complete": False } # Run workflow start_time = time.time() result = self.app.invoke(initial_state) elapsed_time = time.time() - start_time final_answer = result.get("final_answer", "No answer generated") print(f"✅ Completed in {elapsed_time:.2f}s") return final_answer except Exception as e: print(f"❌ Agent error: {str(e)}") return f"I apologize, but I encountered an error processing your question: {str(e)}" # Global agent instance agent = LangGraphUtilsAgent() def process_question(question: str, task_id: str = "") -> str: """Main entry point for question processing""" if not question or not question.strip(): return "Please provide a valid question." return agent.process_question(question.strip(), task_id) # ============================================================================= # TESTING # ============================================================================= if __name__ == "__main__": print("🧪 Testing LangGraph Utils Agent\n") test_cases = [ { "question": "Who was Marie Curie?", "task_id": "", "description": "Wikipedia factual question" }, { "question": "What is 25 + 17 * 3?", "task_id": "", "description": "Math calculation" }, { "question": ".rewsna eht sa \"tfel\" drow eht fo etisoppo eht etirw ,ecnetnes siht dnatsrednu uoy fI", "task_id": "", "description": "Reversed text question" }, { "question": "How many continents are there?", "task_id": "", "description": "General knowledge" } ] for i, test_case in enumerate(test_cases, 1): print(f"\n{'='*60}") print(f"TEST {i}: {test_case['description']}") print(f"{'='*60}") print(f"Question: {test_case['question']}") try: answer = process_question(test_case["question"], test_case["task_id"]) print(f"\nAnswer: {answer}") except Exception as e: print(f"\nTest failed: {str(e)}") print(f"\n{'-'*60}") print("\n✅ All tests completed!")