import lancedb import pandas as pd from langchain_huggingface import HuggingFaceEmbeddings from config import EMBEDDING_MODEL_NAME, LANCEDB_PATH from typing import List, Dict, Any, Optional import logging import os import uuid from datetime import datetime import json logger = logging.getLogger("voicebot") # Lazy load embedding model to reduce startup time and memory usage embedding_model = None def get_embedding_model(): """Lazy load the embedding model""" global embedding_model if embedding_model is None: logger.info(f"Loading embedding model: {EMBEDDING_MODEL_NAME}") embedding_model = HuggingFaceEmbeddings( model_name=EMBEDDING_MODEL_NAME, model_kwargs={ "device": "cpu", "trust_remote_code": True }, encode_kwargs={ "normalize_embeddings": True } ) return embedding_model class LanceDBService: def __init__(self): self.db_path = LANCEDB_PATH self.db = None self.embedding_model = get_embedding_model() self._initialize_db() def _initialize_db(self): """Initialize LanceDB connection and create tables if they don't exist""" try: os.makedirs(self.db_path, exist_ok=True) self.db = lancedb.connect(self.db_path) # Initialize tables self._init_documents_table() self._init_personas_table() self._init_mcp_servers_table() self._init_sessions_table() logger.info("✅ LanceDB initialized successfully") except Exception as e: logger.error(f"❌ Error initializing LanceDB: {e}") raise def _init_documents_table(self): """Initialize documents table for vector storage""" try: if "documents" not in self.db.table_names(): # Create sample data to define schema sample_data = pd.DataFrame({ "id": [str(uuid.uuid4())], "content": ["sample"], "metadata": [json.dumps({})], "user_id": ["sample"], "knowledge_base": ["sample"], "filename": ["sample"], "upload_date": [datetime.now().isoformat()], "vector": [get_embedding_model().embed_query("sample")] }) self.db.create_table("documents", sample_data) # Delete sample data tbl = self.db.open_table("documents") tbl.delete("id = 'sample'") except Exception as e: logger.error(f"❌ Error initializing documents table: {e}") def _init_personas_table(self): """Initialize personas table""" try: if "personas" not in self.db.table_names(): sample_data = pd.DataFrame({ "id": [str(uuid.uuid4())], "user_id": ["sample"], "name": ["sample"], "description": ["sample"], "icon": ["sample"], "custom_prompt": ["sample"], "knowledge_base": ["none"], "language": ["en"], "created_at": [datetime.now().isoformat()], "updated_at": [datetime.now().isoformat()] }) self.db.create_table("personas", sample_data) tbl = self.db.open_table("personas") tbl.delete("id = 'sample'") except Exception as e: logger.error(f"❌ Error initializing personas table: {e}") def _init_mcp_servers_table(self): """Initialize MCP servers table""" try: if "mcp_servers" not in self.db.table_names(): sample_data = pd.DataFrame({ "id": [str(uuid.uuid4())], "user_id": ["sample"], "name": ["sample"], "url": ["sample"], "bearer_token": ["sample"], "created_at": [datetime.now().isoformat()] }) self.db.create_table("mcp_servers", sample_data) tbl = self.db.open_table("mcp_servers") tbl.delete("id = 'sample'") except Exception as e: logger.error(f"❌ Error initializing mcp_servers table: {e}") def _init_sessions_table(self): """Initialize sessions table""" try: if "sessions" not in self.db.table_names(): sample_data = pd.DataFrame({ "id": [str(uuid.uuid4())], "user_id": ["sample"], "persona_id": ["sample"], "persona_source": ["sample"], "session_summary": ["sample"], "created_at": [datetime.now().isoformat()], "updated_at": [datetime.now().isoformat()] }) self.db.create_table("sessions", sample_data) tbl = self.db.open_table("sessions") tbl.delete("id = 'sample'") except Exception as e: logger.error(f"❌ Error initializing sessions table: {e}") async def add_documents(self, docs, user_id: str, knowledge_base: str, filename: str): """Add documents to LanceDB vector store""" try: documents_to_insert = [] for doc in docs: embedding = self.embedding_model.embed_query(doc.page_content) doc_data = { "id": str(uuid.uuid4()), "content": doc.page_content, "metadata": json.dumps(doc.metadata), "user_id": user_id, "knowledge_base": knowledge_base, "filename": filename, "upload_date": datetime.now().isoformat(), "vector": embedding } documents_to_insert.append(doc_data) # Insert documents tbl = self.db.open_table("documents") df = pd.DataFrame(documents_to_insert) tbl.add(df) logger.info(f"✅ Added {len(docs)} documents to LanceDB") return len(docs) except Exception as e: logger.error(f"❌ Error adding documents to LanceDB: {e}") raise e async def similarity_search(self, query: str, user_id: str, knowledge_base: str, k: int = 4): """Search for similar documents""" try: query_embedding = self.embedding_model.embed_query(query) tbl = self.db.open_table("documents") # Search with filters results = tbl.search(query_embedding)\ .where(f"user_id = '{user_id}' AND knowledge_base = '{knowledge_base}'")\ .limit(k)\ .to_list() docs = [] for result in results: docs.append(type('Document', (), { 'page_content': result['content'], 'metadata': json.loads(result['metadata']) if result['metadata'] else {} })()) return docs except Exception as e: logger.error(f"❌ Error searching LanceDB: {e}") return [] async def get_user_knowledge_bases(self, user_id: str) -> List[str]: """Get all knowledge bases for a user""" try: tbl = self.db.open_table("documents") df = tbl.search().where(f"user_id = '{user_id}'").to_pandas() if df.empty: return [] knowledge_bases = df['knowledge_base'].unique().tolist() return [kb for kb in knowledge_bases if kb and kb != "none"] except Exception as e: logger.error(f"❌ Error fetching knowledge bases: {e}") return [] async def get_kb_documents(self, user_id: str, kb_name: str): """Get all documents in a knowledge base""" try: tbl = self.db.open_table("documents") df = tbl.search().where(f"user_id = '{user_id}' AND knowledge_base = '{kb_name}'").to_pandas() documents = [] for _, row in df.iterrows(): documents.append({ "id": row['id'], "filename": row['filename'], "knowledge_base": row['knowledge_base'], "upload_date": row['upload_date'] }) return documents except Exception as e: logger.error(f"❌ Error fetching documents: {e}") return [] async def delete_document_from_kb(self, user_id: str, kb_name: str, filename: str): """Delete a document from knowledge base""" try: tbl = self.db.open_table("documents") tbl.delete(f"user_id = '{user_id}' AND knowledge_base = '{kb_name}' AND filename = '{filename}'") return True except Exception as e: logger.error(f"❌ Error deleting document: {e}") return False # Persona management methods async def insert_persona(self, name: str, description: str, icon: str, custom_prompt: str, user_id: str): """Insert a new persona""" try: persona_data = { "id": str(uuid.uuid4()), "user_id": user_id, "name": name, "description": description, "icon": icon, "custom_prompt": custom_prompt, "knowledge_base": "none", "language": "en", "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat() } tbl = self.db.open_table("personas") df = pd.DataFrame([persona_data]) tbl.add(df) return persona_data except Exception as e: logger.error(f"❌ Error inserting persona: {e}") raise e async def get_user_personas(self, user_id: str): """Get all personas for a user""" try: tbl = self.db.open_table("personas") df = tbl.search().where(f"user_id = '{user_id}'").to_pandas() return df.to_dict('records') except Exception as e: logger.error(f"❌ Error fetching personas: {e}") return [] # MCP Server methods async def create_mcp_server(self, user_id: str, name: str, url: str, bearer_token: str = None): """Create MCP server entry""" try: server_data = { "id": str(uuid.uuid4()), "user_id": user_id, "name": name, "url": url, "bearer_token": bearer_token, "created_at": datetime.now().isoformat() } tbl = self.db.open_table("mcp_servers") df = pd.DataFrame([server_data]) tbl.add(df) return server_data except Exception as e: logger.error(f"❌ Error creating MCP server: {e}") raise e async def get_mcp_servers_for_user(self, user_id: str): """Get MCP servers for user""" try: tbl = self.db.open_table("mcp_servers") df = tbl.search().where(f"user_id = '{user_id}'").to_pandas() return df.to_dict('records') except Exception as e: logger.error(f"❌ Error fetching MCP servers: {e}") return [] async def delete_mcp_server(self, user_id: str, server_id: str): """Delete MCP server""" try: tbl = self.db.open_table("mcp_servers") tbl.delete(f"user_id = '{user_id}' AND id = '{server_id}'") return True except Exception as e: logger.error(f"❌ Error deleting MCP server: {e}") return False # Session management async def upsert_session_summary(self, user_id: str, persona_id: str, persona_source: str, summary: str): """Create or update session summary""" try: session_data = { "id": str(uuid.uuid4()), "user_id": user_id, "persona_id": persona_id, "persona_source": persona_source, "session_summary": summary, "created_at": datetime.now().isoformat(), "updated_at": datetime.now().isoformat() } tbl = self.db.open_table("sessions") df = pd.DataFrame([session_data]) tbl.add(df) return session_data except Exception as e: logger.error(f"❌ Error upserting session: {e}") return None async def get_knowledge_bases(self) -> List[str]: """Get all unique knowledge bases""" try: tbl = self.db.open_table("documents") df = tbl.search().to_pandas() if df.empty: return [] knowledge_bases = df['knowledge_base'].unique().tolist() return [kb for kb in knowledge_bases if kb and kb != "none"] except Exception as e: logger.error(f"❌ Error getting knowledge bases: {e}") return [] async def get_documents_by_knowledge_base(self, knowledge_base: str) -> List[dict]: """Get list of documents in a specific knowledge base""" try: tbl = self.db.open_table("documents") df = tbl.search().where(f"knowledge_base = '{knowledge_base}'").to_pandas() if df.empty: return [] # Group by filename and get document info documents = [] for filename in df['filename'].unique(): file_docs = df[df['filename'] == filename] documents.append({ "filename": filename, "knowledge_base": knowledge_base, "chunks": len(file_docs), "upload_date": file_docs['upload_date'].iloc[0] if not file_docs.empty else None }) return documents except Exception as e: logger.error(f"❌ Error getting documents by knowledge base: {e}") return [] async def delete_document(self, filename: str, knowledge_base: str, user_id: str = None): """Delete a document from the knowledge base""" try: tbl = self.db.open_table("documents") where_clause = f"filename = '{filename}' AND knowledge_base = '{knowledge_base}'" if user_id: where_clause += f" AND user_id = '{user_id}'" # Delete the document chunks tbl.delete(where_clause) logger.info(f"✅ Deleted document {filename} from knowledge base {knowledge_base}") return True except Exception as e: logger.error(f"❌ Error deleting document: {e}") return False async def search_documents(self, query: str, limit: int = 5, knowledge_base: str = None): """Search for documents with specific query and limit""" try: query_embedding = self.embedding_model.embed_query(query) tbl = self.db.open_table("documents") # Build search query search_query = tbl.search(query_embedding).limit(limit) # Add knowledge base filter if specified if knowledge_base: search_query = search_query.where(f"knowledge_base = '{knowledge_base}'") results = search_query.to_list() docs = [] for result in results: docs.append({ 'content': result['content'], 'metadata': json.loads(result['metadata']) if result['metadata'] else {}, 'score': result.get('_distance', 0.0), 'knowledge_base': result.get('knowledge_base', 'unknown') }) return docs except Exception as e: logger.error(f"❌ Error searching documents: {e}") return [] async def search_all_knowledge_bases(self, query: str, k: int = 4): """Search across all knowledge bases""" try: query_embedding = self.embedding_model.embed_query(query) tbl = self.db.open_table("documents") # Search without user filters for system-wide search results = tbl.search(query_embedding).limit(k).to_list() docs = [] for result in results: docs.append(type('Document', (), { 'page_content': result['content'], 'metadata': json.loads(result['metadata']) if result['metadata'] else {} })()) return docs except Exception as e: logger.error(f"❌ Error searching all knowledge bases: {e}") return [] # Global instance lancedb_service = LanceDBService()