from __future__ import annotations import os import time import queue from dataclasses import dataclass from typing import Any, Optional, Dict, Tuple, Callable import pandas as pd import gradio as gr from config import Config from src.submission_tracker import get_submission_tracker, SubmissionTracker from src.quick_csv_loader import quick_load_csv from src.leaderboard_manager import append_to_leaderboard_data from src.utils import get_current_datetime_str from freshqa.fresheval_parallel import evaluate_dataframe_parallel from freshqa.freshqa_acc import process_freshqa_dataframe, calculate_accuracy from freshqa.merge_csv_with_model_response import merge_dataframe_with_model_response_df # ------------------------- # 공통 반환형(Result) # ------------------------- @dataclass class Result: ok: bool data: Optional[Any] = None error: Optional[str] = None meta: Optional[Dict] = None # ------------------------- # 핵심 핸들러 # ------------------------- class SubmissionHandler: """ 제출 파일 처리 및 FreshQA 평가 오케스트레이션. - Tracker/Config 의존성 주입 - 내부 helper는 Result/명확한 타입 반환 - 실제 저장/한도/사용자 ID는 tracker가 처리(핸들러는 호출만) """ def __init__(self, tracker: Optional[SubmissionTracker] = None, cfg: Optional[type] = None): # Dependency Injection self.tracker = tracker self.cfg = cfg or Config self.enable_limit = getattr(self.cfg, "ENABLE_SUBMISSION_LIMIT", False) self.repo_id = getattr(self.cfg, "FRESHQA_DATA_REPO_ID", None) self.filename = getattr(self.cfg, "FRESHQA_DATA_FILENAME", None) self.hf_token = getattr(self.cfg, "HF_TOKEN", None) # 필수 설정 점검 if not self.repo_id: raise ValueError("❌ FRESHQA_DATA_REPO_ID 환경 변수가 설정되지 않았습니다.") if not self.filename: raise ValueError("❌ FRESHQA_DATA_FILENAME 환경 변수가 설정되지 않았습니다.") if not self.hf_token: raise ValueError("❌ HF_TOKEN 환경 변수가 설정되지 않았습니다.") # --------- 1) 제출 파일 검증 ---------- def _validate_submission_file(self, file) -> Result: if file is None: return Result(ok=False, error="❌ CSV 파일을 업로드해주세요.") try: df = pd.read_csv(file.name) except Exception as e: return Result(ok=False, error=f"❌ CSV 로딩 실패: {e}") required_columns = ["question", "model_response"] for col in required_columns: if col not in df.columns: return Result(ok=False, error=f"❌ CSV 파일의 컬럼에 '{col}'이(가) 없습니다.") if len(df) == 0: return Result(ok=False, error="❌ CSV 파일에 데이터가 없습니다.") if df["question"].isnull().any() or df["model_response"].isnull().any(): return Result(ok=False, error="❌ 'question' 또는 'model_response' 컬럼에 누락된 값이 있습니다.") return Result(ok=True) # --------- 2) 빠른 로딩 ---------- def _load_submission_df(self, file) -> Result: try: df = quick_load_csv(self.repo_id, self.filename, self.hf_token) except Exception as e: return Result(ok=False, error=f"❌ CSV 로딩 실패: {e}") return Result(ok=True, data=df) # --------- 3) 병합 ---------- def _merge_with_base(self, submission_df: pd.DataFrame, file_name: str) -> Result: try: merged_df = merge_dataframe_with_model_response_df(submission_df, file_name) return Result(ok=True, data=merged_df) except Exception as e: return Result(ok=False, error=f"❌ 기준 데이터와 병합 실패: {e}") # --------- 4) 평가 ---------- def _evaluate_freshqa( self, merged_df: pd.DataFrame, on_progress: Optional[Callable[[int, int, str], None]] = None, ) -> Result: """Relaxed/Strict 동시 실행 + 큐 기반 진행률 갱신""" q: "queue.Queue[Tuple[int, int, str]]" = queue.Queue() # 두 모드(Relaxed, Strict)를 병렬로 처리하므로 총 진행 단위는 2배 total_items = len(merged_df) * 2 done_count = 0 def _drain_queue(block: bool = False): nonlocal done_count while True: try: item = q.get(block=block, timeout=0.05 if block else 0) except Exception: break try: # 최신 커밋 기준: progress_queue에는 1씩 증가하는 정수만 들어옵니다. if isinstance(item, int): done_count += item if on_progress: remaining = max(total_items - done_count, 0) desc_text = f"평가 중... {done_count}/{total_items}" on_progress(done_count, total_items, desc_text) # 혹시 과거 포맷(tuple)이 들어오더라도 방어적으로 처리 elif isinstance(item, tuple) and len(item) == 3 and on_progress: on_progress(item[0], item[1], item[2]) finally: q.task_done() from concurrent.futures import ThreadPoolExecutor try: with ThreadPoolExecutor(max_workers=2) as ex: relaxed_f = ex.submit( evaluate_dataframe_parallel, df=merged_df, mode="Relaxed", on_item_done=None, progress_queue=q, ) strict_f = ex.submit( evaluate_dataframe_parallel, df=merged_df, mode="Strict", on_item_done=None, progress_queue=q, ) while True: _drain_queue(block=False) if relaxed_f.done() and strict_f.done(): break time.sleep(0.05) _drain_queue(block=True) relaxed = relaxed_f.result() strict = strict_f.result() return Result(ok=True, data=(relaxed, strict)) except Exception as e: return Result(ok=False, error=f"❌ 평가 중 오류 발생: {e}") # --------- 5) 정확도 계산 ---------- def _calculate_accuracy(self, fresheval_df: pd.DataFrame) -> Result: try: processed = process_freshqa_dataframe(fresheval_df) accs, counts = calculate_accuracy(processed) return Result(ok=True, data=(processed, accs, counts)) except Exception as e: return Result(ok=False, error=f"❌ 결과 집계 중 오류가 발생했습니다: {e}") # --------- 6) 요약 ---------- def _build_summary(self, name: str, relaxed_accs: dict, strict_accs: dict) -> str: submitter = name if name else "(이름 미입력)" lines = [] lines.append(f"**제출자**: {submitter}") lines.append("") lines.append("**정확도 (테스트셋 기준)**") lines.append(f"- Relaxed: {relaxed_accs.get('acc_test', 0):.1f}%") lines.append(f"- Strict: {strict_accs.get('acc_test', 0):.1f}%") lines.append("") lines.append("**세부 지표 (테스트셋)**") lines.append( f"- Fast Changing: R {relaxed_accs.get('acc_test_fast_changing', 0):.1f}% / " f"S {strict_accs.get('acc_test_fast_changing', 0):.1f}%" ) lines.append( f"- Two-hop: R {relaxed_accs.get('acc_test_two_hop', 0):.1f}% / " f"S {strict_accs.get('acc_test_two_hop', 0):.1f}%" ) lines.append( f"- Old: R {relaxed_accs.get('acc_test_old', 0):.1f}% / " f"S {strict_accs.get('acc_test_old', 0):.1f}%" ) lines.append( f"- New: R {relaxed_accs.get('acc_test_new', 0):.1f}% / " f"S {strict_accs.get('acc_test_new', 0):.1f}%" ) lines.append( f"- VP: R {relaxed_accs.get('acc_test_vp', 0):.1f}% / " f"S {strict_accs.get('acc_test_vp', 0):.1f}%" ) lines.append( f"- FP: R {relaxed_accs.get('acc_test_fp', 0):.1f}% / " f"S {strict_accs.get('acc_test_fp', 0):.1f}%" ) return "\n".join(lines) def _get_result_summary( self, file_name: str, name: str, relaxed_accs: dict, strict_accs: dict, relaxed_table: pd.DataFrame, strict_table: pd.DataFrame, ) -> str: display_file = os.path.basename(file_name) if file_name else "" lines: list[str] = [] lines.append("✅ 제출 및 평가 완료") lines.append("") lines.append("[기본 정보]") lines.append(f"- 제출 파일: {display_file}") lines.append(f"- 평가 시스템: Solar Pro API") lines.append("") lines.append("[결과 요약]") lines.append("- Relaxed 모드") lines.append(f" · 전체 정확도: {float(relaxed_accs.get('acc', 0)):.1f}%") lines.append( f" · Fast-changing: {float(relaxed_accs.get('acc_fast_changing', 0)):.1f}% | " f"Slow-changing: {float(relaxed_accs.get('acc_slow_changing', 0)):.1f}% | " f"Never-changing: {float(relaxed_accs.get('acc_never_changing', 0)):.1f}%" ) lines.append(f" · False premise: {float(relaxed_accs.get('acc_fp', 0)):.1f}%") lines.append("") lines.append("- Strict 모드") lines.append(f" · 전체 정확도: {float(strict_accs.get('acc', 0)):.1f}%") lines.append( f" · Fast-changing: {float(strict_accs.get('acc_fast_changing', 0)):.1f}% | " f"Slow-changing: {float(strict_accs.get('acc_slow_changing', 0)):.1f}% | " f"Never-changing: {float(strict_accs.get('acc_never_changing', 0)):.1f}%" ) lines.append(f" · False premise: {float(strict_accs.get('acc_fp', 0)):.1f}%") lines.append("") lines.append("[제출 메타]") lines.append(f"- 제출자: {name if name else 'Unknown'}") lines.append(f"- 평가 일시: {get_current_datetime_str()}") lines.append(f"- 비고: Relaxed/Strict 결과가 리더보드에 반영되었습니다.") lines.append("") sep = "-" * 60 lines.append(sep) lines.append("상세 결과 테이블 (Relaxed)") lines.append(sep) lines.append(relaxed_table.to_string(index=False)) lines.append("") lines.append(sep) lines.append("상세 결과 테이블 (Strict)") lines.append(sep) lines.append(strict_table.to_string(index=False)) return "\n".join(lines) # --------- 7) 정확도 표 ---------- def _create_detailed_results_table(self, accs: dict, counts: dict) -> pd.DataFrame: table_data = [] # 전체 정확도 table_data.append({ '카테고리': '전체 정확도', '전체': f"{accs.get('acc', 0):.1f}% ({counts.get('acc', 0)}개)", '테스트': f"{accs.get('acc_test', 0):.1f}% ({counts.get('acc_test', 0)}개)", '개발': f"{accs.get('acc_dev', 0):.1f}% ({counts.get('acc_dev', 0)}개)" }) # 사실 유형별 정확도 fact_types = { 'fast_changing': '빠르게 변하는 사실', 'slow_changing': '천천히 변하는 사실', 'never_changing': '변하지 않는 사실' } for key, name in fact_types.items(): table_data.append({ '카테고리': name, '전체': f"{accs.get(f'acc_{key}', 0):.1f}% ({counts.get(f'acc_{key}', 0)}개)", '테스트': f"{accs.get(f'acc_test_{key}', 0):.1f}% ({counts.get(f'acc_test_{key}', 0)}개)", '개발': f"{accs.get(f'acc_dev_{key}', 0):.1f}% ({counts.get(f'acc_dev_{key}', 0)}개)" }) # 질문 유형별 정확도 question_types = { 'vp': '유효한 전제 (Valid Premise)', 'fp': '잘못된 전제 (False Premise)' } for key, name in question_types.items(): table_data.append({ '카테고리': name, '전체': f"{accs.get(f'acc_{key}', 0):.1f}% ({counts.get(f'acc_{key}', 0)}개)", '테스트': f"{accs.get(f'acc_test_{key}', 0):.1f}% ({counts.get(f'acc_test_{key}', 0)}개)", '개발': f"{accs.get(f'acc_dev_{key}', 0):.1f}% ({counts.get(f'acc_dev_{key}', 0)}개)" }) # 홉 수별 정확도 table_data.append({ '카테고리': f" └ {name} (단일 홉)", '전체': f"{accs.get(f'acc_{key}_one_hop', 0):.1f}% ({counts.get(f'acc_{key}_one_hop', 0)}개)", '테스트': f"{accs.get(f'acc_test_{key}_one_hop', 0):.1f}% ({counts.get(f'acc_test_{key}_one_hop', 0)}개)", '개발': f"{accs.get(f'acc_dev_{key}_one_hop', 0):.1f}% ({counts.get(f'acc_dev_{key}_one_hop', 0)}개)" }) table_data.append({ '카테고리': f" └ {name} (다중 홉)", '전체': f"{accs.get(f'acc_{key}_two_hop', 0):.1f}% ({counts.get(f'acc_{key}_two_hop', 0)}개)", '테스트': f"{accs.get(f'acc_test_{key}_two_hop', 0):.1f}% ({counts.get(f'acc_test_{key}_two_hop', 0)}개)", '개발': f"{accs.get(f'acc_dev_{key}_two_hop', 0):.1f}% ({counts.get(f'acc_dev_{key}_two_hop', 0)}개)" }) # 연도별 정확도 table_data.append({ '카테고리': f" └ {name} (오래된 데이터)", '전체': f"{accs.get(f'acc_{key}_old', 0):.1f}% ({counts.get(f'acc_{key}_old', 0)}개)", '테스트': f"{accs.get(f'acc_test_{key}_old', 0):.1f}% ({counts.get(f'acc_test_{key}_old', 0)}개)", '개발': f"{accs.get(f'acc_dev_{key}_old', 0):.1f}% ({counts.get(f'acc_dev_{key}_old', 0)}개)" }) table_data.append({ '카테고리': f" └ {name} (최신 데이터)", '전체': f"{accs.get(f'acc_{key}_new', 0):.1f}% ({counts.get(f'acc_{key}_new', 0)}개)", '테스트': f"{accs.get(f'acc_test_{key}_new', 0):.1f}% ({counts.get(f'acc_test_{key}_new', 0)}개)", '개발': f"{accs.get(f'acc_dev_{key}_new', 0):.1f}% ({counts.get(f'acc_dev_{key}_new', 0)}개)" }) return pd.DataFrame(table_data) # --------- 8) 리더보드 행 생성 ---------- def _build_leaderboard_rows( self, name: str, submit_model: str, submit_description: Optional[str], mode: str, accs: dict ): submitter_id = f"{name}".strip() result = { 'id': submitter_id if submitter_id else "Unknown", 'model': submit_model, 'description': submit_description, 'accuracy': float(accs.get('acc_test', 0)), 'fast_changing_accuracy': float(accs.get('acc_test_fast_changing', 0)), 'slow_changing_accuracy': float(accs.get('acc_test_slow_changing', 0)), 'never_changing_accuracy': float(accs.get('acc_test_never_changing', 0)), 'acc_vp': float(accs.get('acc_test_vp', 0)), 'acc_fp': float(accs.get('acc_test_fp', 0)), 'acc_vp_one_hop': float(accs.get('acc_test_vp_one_hop', 0)), 'acc_vp_two_hop': float(accs.get('acc_test_vp_two_hop', 0)), 'acc_fp_one_hop': float(accs.get('acc_test_fp_one_hop', 0)), 'acc_fp_two_hop': float(accs.get('acc_test_fp_two_hop', 0)), 'acc_vp_old': float(accs.get('acc_test_vp_old', 0)), 'acc_vp_new': float(accs.get('acc_test_vp_new', 0)), 'acc_fp_old': float(accs.get('acc_test_fp_old', 0)), 'acc_fp_new': float(accs.get('acc_test_fp_new', 0)), # 도메인별 정확도 추가 (test 결과만 사용) 'acc_politics': float(accs.get('acc_test_politics', 0)), 'acc_sports': float(accs.get('acc_test_sports', 0)), 'acc_entertainment': float(accs.get('acc_test_entertainment', 0)), 'acc_weather': float(accs.get('acc_test_weather', 0)), 'acc_world': float(accs.get('acc_test_world', 0)), 'acc_economy': float(accs.get('acc_test_economy', 0)), 'acc_society': float(accs.get('acc_test_society', 0)), 'acc_it_science': float(accs.get('acc_test_it_science', 0)), 'acc_life_culture': float(accs.get('acc_test_life_culture', 0)), 'acc_unknown': float(accs.get('acc_test_unknown', 0)), 'total_questions': int(accs.get('acc_test', 0)), 'evaluation_date': get_current_datetime_str(), 'evaluation_mode': mode } return result def _save_leaderboard( self, name: str, submit_model: str, submit_description: Optional[str], relaxed_accs: dict, strict_accs: dict ): rows = [ self._build_leaderboard_rows(name, submit_model, submit_description, 'Relaxed', relaxed_accs), self._build_leaderboard_rows(name, submit_model, submit_description, 'Strict', strict_accs), ] try: append_to_leaderboard_data(rows) except Exception as e: print(f"⚠️ 리더보드 저장 실패: {e}") # --------- 9) 공개 엔드포인트(핵심) ---------- def process_submission( self, file, name: str, submit_model: str, submit_description: str, user_id: Optional[str] = None, progress: gr.Progress = gr.Progress(), ) -> str: """ 제출 파일 처리 및 평가 - 내부 helper는 Result 기반으로 리턴 - 최종 Gradio 출력은 문자열(기존 호환) """ start = time.time() normalized_model = (submit_model or "").strip() or "Anonymous Model" normalized_description_raw = (submit_description or "").strip() normalized_description = normalized_description_raw if normalized_description_raw else None # 1) 제출 제한 확인 tracker: Optional[SubmissionTracker] = None if self.enable_limit: # 로그인 기반 제출 제약: user_id 없으면 바로 에러 if not user_id: return "❌ HuggingFace 로그인 상태에서만 제출 가능합니다. 먼저 로그인 후 다시 시도해주세요." tracker = self.tracker or get_submission_tracker() if tracker is not None: self.tracker = tracker if self.enable_limit and tracker: try: can_submit, message, remaining = tracker.can_submit(user_id=user_id) if not can_submit: return f"❌ 제출 제한: {message}" except Exception as e: return f"❌ 제출 제한 확인 실패: {e}" # 2) 파일 검증 progress(0.05, desc="제출 파일 검증 중...") v = self._validate_submission_file(file) if not v.ok: return v.error or "❌ 제출 파일 검증 실패" # 3) 로드 progress(0.1, desc="기준 데이터 로드 중...") loaded = self._load_submission_df(file) if not loaded.ok: return loaded.error or "❌ CSV 로딩 실패" submission_df: pd.DataFrame = loaded.data # 4) 병합 progress(0.15, desc="기준 데이터와 병합 중...") mg = self._merge_with_base(submission_df, file.name) if not mg.ok: return mg.error or "❌ 기준 데이터 병합 실패" merged_df: pd.DataFrame = mg.data # 5) 평가 (0.15 ~ 0.9 구간 진행률 매핑) progress(0.15, desc="FreshQA 평가 준비 중...") def on_inner_progress(done: int, total: int, desc: str): frac = 0.15 + 0.75 * (done / max(total, 1)) progress(frac, desc=desc) ev = self._evaluate_freshqa(merged_df, on_progress=on_inner_progress) if not ev.ok: # 실패 기록 if self.enable_limit and tracker and user_id: try: tracker.record_submission( user_id=user_id, submitter_name=name, file_name=os.path.basename(file.name), success=False, error_message=ev.error or "평가 실패", submit_model=normalized_model, submit_description=normalized_description, ) except Exception: pass return ev.error or "❌ 평가 중 오류가 발생했습니다" relaxed_df, strict_df = ev.data # type: ignore[assignment] # 6) 결과 집계 progress(0.8, desc="평가 결과 분석 중...") r = self._calculate_accuracy(relaxed_df) if not r.ok: if self.enable_limit and tracker and user_id: try: tracker.record_submission( user_id=user_id, submitter_name=name, file_name=os.path.basename(file.name), success=False, error_message=r.error or "집계 실패", submit_model=normalized_model, submit_description=normalized_description, ) except Exception: pass return r.error or "❌ 결과 집계 실패" s = self._calculate_accuracy(strict_df) if not s.ok: if self.enable_limit and tracker and user_id: try: tracker.record_submission( user_id=user_id, submitter_name=name, file_name=os.path.basename(file.name), success=False, error_message=s.error or "집계 실패", submit_model=normalized_model, submit_description=normalized_description, ) except Exception: pass return s.error or "❌ 결과 집계 실패" relaxed_processed, relaxed_accs, relaxed_counts = r.data # type: ignore[misc] strict_processed, strict_accs, strict_counts = s.data # type: ignore[misc] # 7) 요약/표 relaxed_table = self._create_detailed_results_table(relaxed_accs, relaxed_counts) strict_table = self._create_detailed_results_table(strict_accs, strict_counts) result_summary = self._get_result_summary( file_name=file.name if file else "", name=name, relaxed_accs=relaxed_accs, strict_accs=strict_accs, relaxed_table=relaxed_table, strict_table=strict_table, ) # 8) 제출 성공 기록 및 리더보드 저장 if self.enable_limit and tracker and user_id: progress(0.85, desc="제출 내역 저장 중...") save_ok = tracker.record_submission( user_id=user_id, submitter_name=name, file_name=os.path.basename(file.name), success=True, submit_model=normalized_model, submit_description=normalized_description, ) # save_ok 실패해도 평가 결과는 리더보드에 반영 progress(0.9, desc="리더보드 업데이트 중...") self._save_leaderboard(name, normalized_model, normalized_description, relaxed_accs, strict_accs) else: self._save_leaderboard(name, normalized_model, normalized_description, relaxed_accs, strict_accs) # 9) 결과 문자열 구성 progress(1.0, desc="완료") return result_summary # ------------------------- # 모듈-레벨 엔트리포인트 (기존 UI 호환) # ------------------------- def process_submission( file, name: str, submit_model: str, submit_description: str, user_id: Optional[str] = None, progress: gr.Progress = gr.Progress(), ) -> str: """ Gradio에서 직접 호출하는 엔트리포인트. 내부적으로 DI를 적용한 SubmissionHandler를 생성해 호출한다. """ tracker = get_submission_tracker() if Config.ENABLE_SUBMISSION_LIMIT else None handler = SubmissionHandler(tracker=tracker, cfg=Config) try: return handler.process_submission( file=file, name=name, submit_model=submit_model, submit_description=submit_description, user_id=user_id, progress=progress, ) except Exception as e: # 최상위 보호막: 예상치 못한 예외도 사용자 친화적으로 반환 try: if handler.enable_limit and handler.tracker and user_id: handler.tracker.record_submission( user_id=user_id, submitter_name=name, file_name=os.path.basename(file.name) if file else "(unknown)", success=False, error_message=str(e), submit_model=(submit_model or "").strip() or "Anonymous Model", submit_description=(submit_description or "").strip() or None, ) except Exception: # 기록 실패는 조용히 무시 pass total_time = 0.0 # 상단에서 측정하지 못했을 수 있으므로 0으로 error_message = str(e) return ( "❌ 평가 실패\n\n" "오류 내용:\n" f"{error_message}\n\n" f"소요 시간: {total_time:.2f}초 ({total_time/60:.2f}분)\n\n" "제출은 정상적으로 처리되었지만, 평가 과정에서 오류가 발생했습니다.\n" "제출 기록은 저장되었습니다." )