#!/usr/bin/env python3 """ Test script to validate the Congressional Bioguide database and search functionality. """ import sqlite3 import json from pathlib import Path def test_database(): """Test database structure and basic queries.""" print("Testing Database...") print("=" * 60) if not Path("congress.db").exists(): print("❌ Database not found. Run ingest_data.py first.") return False conn = sqlite3.connect("congress.db") cursor = conn.cursor() # Test 1: Count members cursor.execute("SELECT COUNT(*) FROM members") member_count = cursor.fetchone()[0] print(f"✓ Members in database: {member_count}") # Test 2: Count job positions cursor.execute("SELECT COUNT(*) FROM job_positions") job_count = cursor.fetchone()[0] print(f"✓ Job positions recorded: {job_count}") # Test 3: Search by name cursor.execute(""" SELECT bio_id, family_name, given_name, birth_date, death_date FROM members WHERE unaccented_family_name = 'Lincoln' ORDER BY birth_date """) lincolns = cursor.fetchall() print(f"\n✓ Found {len(lincolns)} member(s) with family name 'Lincoln':") for bio_id, family, given, birth, death in lincolns: print(f" - {given} {family} ({bio_id}): {birth} - {death or 'present'}") # Test 4: Party breakdown cursor.execute(""" SELECT party, COUNT(DISTINCT bio_id) as count FROM job_positions WHERE party IS NOT NULL GROUP BY party ORDER BY count DESC LIMIT 10 """) parties = cursor.fetchall() print(f"\n✓ Top parties by member count:") for party, count in parties: print(f" - {party}: {count} members") # Test 5: State representation cursor.execute(""" SELECT region_code, COUNT(DISTINCT bio_id) as count FROM job_positions WHERE region_code IS NOT NULL AND region_type = 'StateRegion' GROUP BY region_code ORDER BY count DESC LIMIT 10 """) states = cursor.fetchall() print(f"\n✓ Top states by member count:") for state, count in states: print(f" - {state}: {count} members") # Test 6: Relationships cursor.execute("SELECT COUNT(*) FROM relationships") rel_count = cursor.fetchone()[0] print(f"\n✓ Family relationships recorded: {rel_count}") if rel_count > 0: cursor.execute(""" SELECT m1.given_name, m1.family_name, r.relationship_type, m2.given_name, m2.family_name FROM relationships r JOIN members m1 ON r.bio_id = m1.bio_id JOIN members m2 ON r.related_bio_id = m2.bio_id LIMIT 5 """) relationships = cursor.fetchall() print(" Sample relationships:") for given1, family1, rel_type, given2, family2 in relationships: print(f" - {given1} {family1} is {rel_type} of {given2} {family2}") # Test 7: Profile text cursor.execute(""" SELECT bio_id, given_name, family_name, LENGTH(profile_text) as text_len FROM members WHERE profile_text IS NOT NULL ORDER BY text_len DESC LIMIT 5 """) longest_profiles = cursor.fetchall() print(f"\n✓ Longest biography profiles:") for bio_id, given, family, length in longest_profiles: print(f" - {given} {family} ({bio_id}): {length} characters") conn.close() return True def test_faiss_index(): """Test FAISS index.""" print("\n\nTesting FAISS Index...") print("=" * 60) if not Path("congress_faiss.index").exists(): print("❌ FAISS index not found. Run ingest_data.py first.") return False if not Path("congress_bio_ids.pkl").exists(): print("❌ Bio ID mapping not found. Run ingest_data.py first.") return False try: import faiss import pickle from sentence_transformers import SentenceTransformer # Load index index = faiss.read_index("congress_faiss.index") with open("congress_bio_ids.pkl", "rb") as f: bio_ids = pickle.load(f) print(f"✓ FAISS index loaded: {index.ntotal} vectors") print(f"✓ Dimension: {index.d}") # Load model model = SentenceTransformer('all-MiniLM-L6-v2') print("✓ Sentence transformer model loaded") # Test search test_queries = [ "lawyers who became judges", "Civil War veterans", "served in the military", "teachers and educators" ] for query in test_queries: print(f"\n✓ Testing query: '{query}'") query_embedding = model.encode([query])[0].reshape(1, -1).astype('float32') faiss.normalize_L2(query_embedding) scores, indices = index.search(query_embedding, 3) # Load database to get names conn = sqlite3.connect("congress.db") cursor = conn.cursor() print(" Top 3 results:") for idx, score in zip(indices[0], scores[0]): if idx < len(bio_ids): bio_id = bio_ids[idx] cursor.execute( "SELECT given_name, family_name FROM members WHERE bio_id = ?", (bio_id,) ) result = cursor.fetchone() if result: given, family = result print(f" - {given} {family} ({bio_id}): score={score:.4f}") conn.close() return True except ImportError as e: print(f"❌ Missing dependency: {e}") print(" Run: pip install -r requirements.txt") return False except Exception as e: print(f"❌ Error testing FAISS: {e}") return False def test_sample_profile(): """Display a sample profile.""" print("\n\nSample Profile...") print("=" * 60) conn = sqlite3.connect("congress.db") conn.row_factory = sqlite3.Row cursor = conn.cursor() # Get a well-known member cursor.execute(""" SELECT * FROM members WHERE unaccented_family_name = 'Lincoln' AND unaccented_given_name = 'Abraham' LIMIT 1 """) member = cursor.fetchone() if member: bio_id = member['bio_id'] print(f"Profile: {member['given_name']} {member['family_name']} ({bio_id})") print(f"Birth: {member['birth_date']}") print(f"Death: {member['death_date']}") print(f"\nBiography excerpt:") profile_text = member['profile_text'] or "" print(f" {profile_text[:300]}...") # Get positions cursor.execute(""" SELECT job_name, party, congress_number, region_code, start_date, end_date FROM job_positions WHERE bio_id = ? ORDER BY start_date """, (bio_id,)) positions = cursor.fetchall() if positions: print(f"\nPositions held ({len(positions)}):") for pos in positions: print(f" - {pos['job_name']} ({pos['party']}), {pos['region_code']}") print(f" Congress {pos['congress_number']}: {pos['start_date']} - {pos['end_date']}") conn.close() def main(): """Run all tests.""" print("Congressional Bioguide Database Test Suite") print("=" * 60) print() db_ok = test_database() faiss_ok = test_faiss_index() if db_ok: test_sample_profile() print("\n" + "=" * 60) if db_ok and faiss_ok: print("✓ All tests passed!") print("\nThe system is ready to use. Start the MCP server with:") print(" python3 server.py") else: print("❌ Some tests failed. Please check the errors above.") if not db_ok: print(" Run: python3 ingest_data.py") if __name__ == "__main__": main()