File size: 8,214 Bytes
15de73a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
#!/usr/bin/env python3
"""

Test script to validate the Congressional Bioguide database and search functionality.

"""

import sqlite3
import json
from pathlib import Path


def test_database():
    """Test database structure and basic queries."""
    print("Testing Database...")
    print("=" * 60)

    if not Path("congress.db").exists():
        print("❌ Database not found. Run ingest_data.py first.")
        return False

    conn = sqlite3.connect("congress.db")
    cursor = conn.cursor()

    # Test 1: Count members
    cursor.execute("SELECT COUNT(*) FROM members")
    member_count = cursor.fetchone()[0]
    print(f"βœ“ Members in database: {member_count}")

    # Test 2: Count job positions
    cursor.execute("SELECT COUNT(*) FROM job_positions")
    job_count = cursor.fetchone()[0]
    print(f"βœ“ Job positions recorded: {job_count}")

    # Test 3: Search by name
    cursor.execute("""

        SELECT bio_id, family_name, given_name, birth_date, death_date

        FROM members

        WHERE unaccented_family_name = 'Lincoln'

        ORDER BY birth_date

    """)
    lincolns = cursor.fetchall()
    print(f"\nβœ“ Found {len(lincolns)} member(s) with family name 'Lincoln':")
    for bio_id, family, given, birth, death in lincolns:
        print(f"  - {given} {family} ({bio_id}): {birth} - {death or 'present'}")

    # Test 4: Party breakdown
    cursor.execute("""

        SELECT party, COUNT(DISTINCT bio_id) as count

        FROM job_positions

        WHERE party IS NOT NULL

        GROUP BY party

        ORDER BY count DESC

        LIMIT 10

    """)
    parties = cursor.fetchall()
    print(f"\nβœ“ Top parties by member count:")
    for party, count in parties:
        print(f"  - {party}: {count} members")

    # Test 5: State representation
    cursor.execute("""

        SELECT region_code, COUNT(DISTINCT bio_id) as count

        FROM job_positions

        WHERE region_code IS NOT NULL AND region_type = 'StateRegion'

        GROUP BY region_code

        ORDER BY count DESC

        LIMIT 10

    """)
    states = cursor.fetchall()
    print(f"\nβœ“ Top states by member count:")
    for state, count in states:
        print(f"  - {state}: {count} members")

    # Test 6: Relationships
    cursor.execute("SELECT COUNT(*) FROM relationships")
    rel_count = cursor.fetchone()[0]
    print(f"\nβœ“ Family relationships recorded: {rel_count}")

    if rel_count > 0:
        cursor.execute("""

            SELECT m1.given_name, m1.family_name, r.relationship_type,

                   m2.given_name, m2.family_name

            FROM relationships r

            JOIN members m1 ON r.bio_id = m1.bio_id

            JOIN members m2 ON r.related_bio_id = m2.bio_id

            LIMIT 5

        """)
        relationships = cursor.fetchall()
        print("  Sample relationships:")
        for given1, family1, rel_type, given2, family2 in relationships:
            print(f"  - {given1} {family1} is {rel_type} of {given2} {family2}")

    # Test 7: Profile text
    cursor.execute("""

        SELECT bio_id, given_name, family_name, LENGTH(profile_text) as text_len

        FROM members

        WHERE profile_text IS NOT NULL

        ORDER BY text_len DESC

        LIMIT 5

    """)
    longest_profiles = cursor.fetchall()
    print(f"\nβœ“ Longest biography profiles:")
    for bio_id, given, family, length in longest_profiles:
        print(f"  - {given} {family} ({bio_id}): {length} characters")

    conn.close()
    return True


def test_faiss_index():
    """Test FAISS index."""
    print("\n\nTesting FAISS Index...")
    print("=" * 60)

    if not Path("congress_faiss.index").exists():
        print("❌ FAISS index not found. Run ingest_data.py first.")
        return False

    if not Path("congress_bio_ids.pkl").exists():
        print("❌ Bio ID mapping not found. Run ingest_data.py first.")
        return False

    try:
        import faiss
        import pickle
        from sentence_transformers import SentenceTransformer

        # Load index
        index = faiss.read_index("congress_faiss.index")
        with open("congress_bio_ids.pkl", "rb") as f:
            bio_ids = pickle.load(f)

        print(f"βœ“ FAISS index loaded: {index.ntotal} vectors")
        print(f"βœ“ Dimension: {index.d}")

        # Load model
        model = SentenceTransformer('all-MiniLM-L6-v2')
        print("βœ“ Sentence transformer model loaded")

        # Test search
        test_queries = [
            "lawyers who became judges",
            "Civil War veterans",
            "served in the military",
            "teachers and educators"
        ]

        for query in test_queries:
            print(f"\nβœ“ Testing query: '{query}'")
            query_embedding = model.encode([query])[0].reshape(1, -1).astype('float32')
            faiss.normalize_L2(query_embedding)

            scores, indices = index.search(query_embedding, 3)

            # Load database to get names
            conn = sqlite3.connect("congress.db")
            cursor = conn.cursor()

            print("  Top 3 results:")
            for idx, score in zip(indices[0], scores[0]):
                if idx < len(bio_ids):
                    bio_id = bio_ids[idx]
                    cursor.execute(
                        "SELECT given_name, family_name FROM members WHERE bio_id = ?",
                        (bio_id,)
                    )
                    result = cursor.fetchone()
                    if result:
                        given, family = result
                        print(f"    - {given} {family} ({bio_id}): score={score:.4f}")

            conn.close()

        return True

    except ImportError as e:
        print(f"❌ Missing dependency: {e}")
        print("   Run: pip install -r requirements.txt")
        return False
    except Exception as e:
        print(f"❌ Error testing FAISS: {e}")
        return False


def test_sample_profile():
    """Display a sample profile."""
    print("\n\nSample Profile...")
    print("=" * 60)

    conn = sqlite3.connect("congress.db")
    conn.row_factory = sqlite3.Row
    cursor = conn.cursor()

    # Get a well-known member
    cursor.execute("""

        SELECT * FROM members

        WHERE unaccented_family_name = 'Lincoln' AND unaccented_given_name = 'Abraham'

        LIMIT 1

    """)
    member = cursor.fetchone()

    if member:
        bio_id = member['bio_id']
        print(f"Profile: {member['given_name']} {member['family_name']} ({bio_id})")
        print(f"Birth: {member['birth_date']}")
        print(f"Death: {member['death_date']}")
        print(f"\nBiography excerpt:")
        profile_text = member['profile_text'] or ""
        print(f"  {profile_text[:300]}...")

        # Get positions
        cursor.execute("""

            SELECT job_name, party, congress_number, region_code, start_date, end_date

            FROM job_positions

            WHERE bio_id = ?

            ORDER BY start_date

        """, (bio_id,))
        positions = cursor.fetchall()

        if positions:
            print(f"\nPositions held ({len(positions)}):")
            for pos in positions:
                print(f"  - {pos['job_name']} ({pos['party']}), {pos['region_code']}")
                print(f"    Congress {pos['congress_number']}: {pos['start_date']} - {pos['end_date']}")

    conn.close()


def main():
    """Run all tests."""
    print("Congressional Bioguide Database Test Suite")
    print("=" * 60)
    print()

    db_ok = test_database()
    faiss_ok = test_faiss_index()

    if db_ok:
        test_sample_profile()

    print("\n" + "=" * 60)
    if db_ok and faiss_ok:
        print("βœ“ All tests passed!")
        print("\nThe system is ready to use. Start the MCP server with:")
        print("  python3 server.py")
    else:
        print("❌ Some tests failed. Please check the errors above.")
        if not db_ok:
            print("  Run: python3 ingest_data.py")


if __name__ == "__main__":
    main()