Spaces:
Sleeping
Sleeping
| """ | |
| Database schema for Dynamic Highscores system. | |
| This module defines the SQLite database schema for the Dynamic Highscores system, | |
| which integrates benchmark selection, model evaluation, and leaderboard functionality. | |
| """ | |
| import sqlite3 | |
| import os | |
| import json | |
| import threading | |
| from datetime import datetime, timedelta | |
| import pandas as pd | |
| class ThreadLocalDB: | |
| """Thread-local database connection manager.""" | |
| _thread_local = threading.local() | |
| def __init__(self, db_path): | |
| """Initialize with database path.""" | |
| self.db_path = db_path | |
| def get_connection(self): | |
| """Get a thread-local database connection.""" | |
| if not hasattr(self._thread_local, 'conn') or self._thread_local.conn is None: | |
| self._thread_local.conn = sqlite3.connect(self.db_path) | |
| self._thread_local.conn.row_factory = sqlite3.Row | |
| return self._thread_local.conn | |
| def get_cursor(self): | |
| """Get a cursor from the thread-local connection.""" | |
| conn = self.get_connection() | |
| if not hasattr(self._thread_local, 'cursor') or self._thread_local.cursor is None: | |
| self._thread_local.cursor = conn.cursor() | |
| return self._thread_local.cursor | |
| def close(self): | |
| """Close the thread-local connection if it exists.""" | |
| if hasattr(self._thread_local, 'conn') and self._thread_local.conn is not None: | |
| if hasattr(self._thread_local, 'cursor') and self._thread_local.cursor is not None: | |
| self._thread_local.cursor.close() | |
| self._thread_local.cursor = None | |
| self._thread_local.conn.close() | |
| self._thread_local.conn = None | |
| class DynamicHighscoresDB: | |
| """Database manager for the Dynamic Highscores system.""" | |
| def __init__(self, db_path="dynamic_highscores.db"): | |
| """Initialize the database connection and create tables if they don't exist.""" | |
| self.db_path = db_path | |
| self.thread_local_db = ThreadLocalDB(db_path) | |
| self.create_tables() | |
| def get_conn(self): | |
| """Get the thread-local database connection.""" | |
| return self.thread_local_db.get_connection() | |
| def get_cursor(self): | |
| """Get the thread-local database cursor.""" | |
| return self.thread_local_db.get_cursor() | |
| def close(self): | |
| """Close the thread-local database connection.""" | |
| self.thread_local_db.close() | |
| def create_tables(self): | |
| """Create all necessary tables if they don't exist.""" | |
| cursor = self.get_cursor() | |
| conn = self.get_conn() | |
| # Users table - stores user information | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT UNIQUE NOT NULL, | |
| hf_user_id TEXT UNIQUE NOT NULL, | |
| is_admin BOOLEAN DEFAULT 0, | |
| last_submission_date TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Benchmarks table - stores information about available benchmarks | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS benchmarks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| dataset_id TEXT NOT NULL, | |
| description TEXT, | |
| metrics TEXT, -- JSON string of metrics | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Models table - stores information about submitted models | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS models ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| hf_model_id TEXT NOT NULL, | |
| user_id INTEGER NOT NULL, | |
| tag TEXT NOT NULL, -- One of: Merge, Agent, Reasoning, Coding, etc. | |
| parameters TEXT, -- Number of parameters (can be NULL) | |
| description TEXT, | |
| created_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (user_id) REFERENCES users (id), | |
| UNIQUE (hf_model_id, user_id) | |
| ) | |
| ''') | |
| # Evaluations table - stores evaluation results | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS evaluations ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| model_id INTEGER NOT NULL, | |
| benchmark_id INTEGER NOT NULL, | |
| status TEXT NOT NULL, -- pending, running, completed, failed | |
| results TEXT, -- JSON string of results | |
| score REAL, -- Overall score (can be NULL) | |
| submitted_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
| started_at TEXT, | |
| completed_at TEXT, | |
| FOREIGN KEY (model_id) REFERENCES models (id), | |
| FOREIGN KEY (benchmark_id) REFERENCES benchmarks (id) | |
| ) | |
| ''') | |
| # Queue table - stores evaluation queue | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS queue ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| evaluation_id INTEGER NOT NULL, | |
| priority INTEGER DEFAULT 0, -- Higher number = higher priority | |
| added_at TEXT DEFAULT CURRENT_TIMESTAMP, | |
| FOREIGN KEY (evaluation_id) REFERENCES evaluations (id) | |
| ) | |
| ''') | |
| conn.commit() | |
| # User management methods | |
| def add_user(self, username, hf_user_id, is_admin=False): | |
| """Add a new user to the database.""" | |
| cursor = self.get_cursor() | |
| conn = self.get_conn() | |
| try: | |
| cursor.execute( | |
| "INSERT INTO users (username, hf_user_id, is_admin) VALUES (?, ?, ?)", | |
| (username, hf_user_id, is_admin) | |
| ) | |
| conn.commit() | |
| return cursor.lastrowid | |
| except sqlite3.IntegrityError: | |
| # User already exists | |
| cursor.execute( | |
| "SELECT id FROM users WHERE hf_user_id = ?", | |
| (hf_user_id,) | |
| ) | |
| row = cursor.fetchone() | |
| return row[0] if row else None | |
| def get_user(self, hf_user_id): | |
| """Get user information by HuggingFace user ID.""" | |
| cursor = self.get_cursor() | |
| cursor.execute( | |
| "SELECT * FROM users WHERE hf_user_id = ?", | |
| (hf_user_id,) | |
| ) | |
| row = cursor.fetchone() | |
| return dict(row) if row else None | |
| def get_user_by_username(self, username): | |
| """Get user information by username.""" | |
| cursor = self.get_cursor() | |
| cursor.execute( | |
| "SELECT * FROM users WHERE username = ?", | |
| (username,) | |
| ) | |
| row = cursor.fetchone() | |
| return dict(row) if row else None | |
| def can_submit_today(self, user_id): | |
| """Check if a user can submit a benchmark evaluation today.""" | |
| cursor = self.get_cursor() | |
| cursor.execute( | |
| "SELECT is_admin, last_submission_date FROM users WHERE id = ?", | |
| (user_id,) | |
| ) | |
| result = cursor.fetchone() | |
| if not result: | |
| return False | |
| user_data = dict(result) | |
| # Admin can always submit | |
| if user_data['is_admin']: | |
| return True | |
| # If no previous submission, user can submit | |
| if not user_data['last_submission_date']: | |
| return True | |
| # Check if last submission was before today | |
| last_date = datetime.fromisoformat(user_data['last_submission_date']) | |
| today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) | |
| return last_date < today | |
| def update_submission_date(self, user_id): | |
| """Update the last submission date for a user.""" | |
| cursor = self.get_cursor() | |
| conn = self.get_conn() | |
| current_time = datetime.now().isoformat() | |
| cursor.execute( | |
| "UPDATE users SET last_submission_date = ? WHERE id = ?", | |
| (current_time, user_id) | |
| ) | |
| conn.commit() | |
| # Benchmark management methods | |
| def add_benchmark(self, name, dataset_id, description="", metrics=None): | |
| """Add a new benchmark to the database.""" | |
| cursor = self.get_cursor() | |
| conn = self.get_conn() | |
| if metrics is None: | |
| metrics = {} | |
| metrics_json = json.dumps(metrics) | |
| try: | |
| cursor.execute( | |
| "INSERT INTO benchmarks (name, dataset_id, description, metrics) VALUES (?, ?, ?, ?)", | |
| (name, dataset_id, description, metrics_json) | |
| ) | |
| conn.commit() | |
| return cursor.lastrowid | |
| except sqlite3.IntegrityError: | |
| # Benchmark already exists with this dataset_id | |
| cursor.execute( | |
| "SELECT id FROM benchmarks WHERE dataset_id = ?", | |
| (dataset_id,) | |
| ) | |
| row = cursor.fetchone() | |
| return row[0] if row else None | |
| def get_benchmarks(self): | |
| """Get all available benchmarks.""" | |
| cursor = self.get_cursor() | |
| cursor.execute("SELECT * FROM benchmarks") | |
| benchmarks = [dict(row) for row in cursor.fetchall()] | |
| # Parse metrics JSON | |
| for benchmark in benchmarks: | |
| if benchmark['metrics']: | |
| benchmark['metrics'] = json.loads(benchmark['metrics']) | |
| else: | |
| benchmark['metrics'] = {} | |
| return benchmarks | |
| def get_benchmark(self, benchmark_id): | |
| """Get benchmark information by ID.""" | |
| cursor = self.get_cursor() | |
| cursor.execute( | |
| "SELECT * FROM benchmarks WHERE id = ?", | |
| (benchmark_id,) | |
| ) | |
| row = cursor.fetchone() | |
| benchmark = dict(row) if row else None | |
| if benchmark and benchmark['metrics']: | |
| benchmark['metrics'] = json.loads(benchmark['metrics']) | |
| return benchmark | |
| # Model management methods | |
| def add_model(self, name, hf_model_id, user_id, tag, parameters=None, description=""): | |
| """Add a new model to the database.""" | |
| cursor = self.get_cursor() | |
| conn = self.get_conn() | |
| try: | |
| cursor.execute( | |
| "INSERT INTO models (name, hf_model_id, user_id, tag, parameters, description) VALUES (?, ?, ?, ?, ?, ?)", | |
| (name, hf_model_id, user_id, tag, parameters, description) | |
| ) | |
| conn.commit() | |
| return cursor.lastrowid | |
| except sqlite3.IntegrityError: | |
| # Model already exists for this user | |
| cursor.execute( | |
| "SELECT id FROM models WHERE hf_model_id = ? AND user_id = ?", | |
| (hf_model_id, user_id) | |
| ) | |
| row = cursor.fetchone() | |
| return row[0] if row else None | |
| def get_models(self, tag=None): | |
| """Get all models, optionally filtered by tag.""" | |
| cursor = self.get_cursor() | |
| if tag and tag.lower() != "all": | |
| cursor.execute( | |
| "SELECT * FROM models WHERE tag = ?", | |
| (tag,) | |
| ) | |
| else: | |
| cursor.execute("SELECT * FROM models") | |
| return [dict(row) for row in cursor.fetchall()] | |
| def get_model(self, model_id): | |
| """Get model information by ID.""" | |
| cursor = self.get_cursor() | |
| cursor.execute( | |
| "SELECT * FROM models WHERE id = ?", | |
| (model_id,) | |
| ) | |
| row = cursor.fetchone() | |
| return dict(row) if row else None | |
| # Evaluation management methods | |
| def add_evaluation(self, model_id, benchmark_id, priority=0): | |
| """Add a new evaluation to the database and queue.""" | |
| cursor = self.get_cursor() | |
| conn = self.get_conn() | |
| # First, add the evaluation | |
| cursor.execute( | |
| "INSERT INTO evaluations (model_id, benchmark_id, status) VALUES (?, ?, 'pending')", | |
| (model_id, benchmark_id) | |
| ) | |
| evaluation_id = cursor.lastrowid | |
| # Then, add it to the queue | |
| cursor.execute( | |
| "INSERT INTO queue (evaluation_id, priority) VALUES (?, ?)", | |
| (evaluation_id, priority) | |
| ) | |
| conn.commit() | |
| return evaluation_id | |
| def update_evaluation_status(self, evaluation_id, status, results=None, score=None): | |
| """Update the status of an evaluation.""" | |
| cursor = self.get_cursor() | |
| conn = self.get_conn() | |
| params = [status, evaluation_id] | |
| sql = "UPDATE evaluations SET status = ?" | |
| if results is not None: | |
| sql += ", results = ?" | |
| params.insert(1, json.dumps(results)) | |
| if score is not None: | |
| sql += ", score = ?" | |
| params.insert(1 if results is None else 2, score) | |
| if status in ['completed', 'failed']: | |
| sql += ", completed_at = datetime('now')" | |
| elif status == 'running': | |
| sql += ", started_at = datetime('now')" | |
| sql += " WHERE id = ?" | |
| cursor.execute(sql, params) | |
| conn.commit() | |
| def get_next_in_queue(self): | |
| """Get the next evaluation in the queue.""" | |
| cursor = self.get_cursor() | |
| cursor.execute(""" | |
| SELECT q.*, e.id as evaluation_id, e.model_id, e.benchmark_id, e.status | |
| FROM queue q | |
| JOIN evaluations e ON q.evaluation_id = e.id | |
| WHERE e.status = 'pending' | |
| ORDER BY q.priority DESC, q.added_at ASC | |
| LIMIT 1 | |
| """) | |
| row = cursor.fetchone() | |
| return dict(row) if row else None | |
| def get_evaluation_results(self, model_id=None, benchmark_id=None, tag=None, status=None, limit=None): | |
| """Get evaluation results, optionally filtered by model, benchmark, tag, or status.""" | |
| cursor = self.get_cursor() | |
| sql = """ | |
| SELECT e.id, e.model_id, e.benchmark_id, e.status, e.results, e.score, | |
| e.submitted_at, e.started_at, e.completed_at, m.name as model_name, m.tag, | |
| b.name as benchmark_name | |
| FROM evaluations e | |
| JOIN models m ON e.model_id = m.id | |
| JOIN benchmarks b ON e.benchmark_id = b.id | |
| WHERE 1=1 | |
| """ | |
| params = [] | |
| if status: | |
| sql += " AND e.status = ?" | |
| params.append(status) | |
| if model_id: | |
| sql += " AND e.model_id = ?" | |
| params.append(model_id) | |
| if benchmark_id and benchmark_id != "all" and benchmark_id.lower() != "all": | |
| sql += " AND e.benchmark_id = ?" | |
| params.append(benchmark_id) | |
| if tag and tag.lower() != "all": | |
| sql += " AND m.tag = ?" | |
| params.append(tag) | |
| sql += " ORDER BY e.submitted_at DESC" | |
| if limit: | |
| sql += " LIMIT ?" | |
| params.append(limit) | |
| cursor.execute(sql, params) | |
| results = [dict(row) for row in cursor.fetchall()] | |
| # Parse results JSON | |
| for result in results: | |
| if result['results']: | |
| try: | |
| result['results'] = json.loads(result['results']) | |
| except: | |
| result['results'] = {} | |
| return results | |
| def get_leaderboard_df(self, tag=None, benchmark_id=None): | |
| """Get a pandas DataFrame of the leaderboard, optionally filtered by tag and benchmark.""" | |
| results = self.get_evaluation_results(tag=tag, benchmark_id=benchmark_id, status="completed") | |
| if not results: | |
| return pd.DataFrame() | |
| # Create a list of dictionaries for the DataFrame | |
| leaderboard_data = [] | |
| for result in results: | |
| entry = { | |
| 'model_name': result['model_name'], | |
| 'tag': result['tag'], | |
| 'benchmark_name': result['benchmark_name'], | |
| 'score': result['score'], | |
| 'completed_at': result['completed_at'] | |
| } | |
| # Add any additional metrics from results | |
| if result['results'] and isinstance(result['results'], dict): | |
| for key, value in result['results'].items(): | |
| if isinstance(value, (int, float)) and key not in entry: | |
| entry[key] = value | |
| leaderboard_data.append(entry) | |
| # Convert to DataFrame | |
| df = pd.DataFrame(leaderboard_data) | |
| # Sort by score (descending) | |
| if not df.empty and 'score' in df.columns: | |
| df = df.sort_values('score', ascending=False) | |
| return df |