GitRecap / server /routes.py
github-actions[bot]
Deploy app/api to HF Space
f0d1569
from fastapi import APIRouter, HTTPException, Request, Query
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
from models.schemas import (
BranchListResponse,
ValidTargetBranchesRequest,
ValidTargetBranchesResponse,
CreatePullRequestRequest,
CreatePullRequestResponse,
GetPullRequestDiffRequest,
GetPullRequestDiffResponse,
GetAuthorsRequest,
GetAuthorsResponse,
AuthorInfo,
ActionsResponse,
GetCurrentAuthorResponse,
CloneRequest
)
from services.llm_service import set_llm, get_llm, trim_messages
from services.fetcher_service import store_fetcher, get_fetcher
from git_recap.utils import parse_entries_to_txt, parse_releases_to_txt
from aicore.llm.config import LlmConfig
from datetime import datetime, timezone
import requests
import os
router = APIRouter()
GITHUB_ACCESS_TOKEN_URL = 'https://github.com/login/oauth/access_token'
@router.post("/clone-repo")
async def clone_repository(request: CloneRequest):
"""
Endpoint for cloning a repository from a URL.
Args:
request: CloneRequest containing the repository URL
Returns:
dict: Contains session_id for subsequent operations
Raises:
HTTPException: 400 for invalid URL, 500 for cloning failure
"""
try:
response = await create_llm_session()
session_id = response.get("session_id")
store_fetcher(session_id, request.url, "URL")
return {"session_id": session_id}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to clone repository: {str(e)}")
@router.get("/external-signup")
async def external_signup(app: str, accessToken: str, provider: str):
"""
Handle external OAuth signup flow.
Args:
app: Application name
accessToken: OAuth access token or authorization code
provider: Provider name (e.g., "github")
Returns:
dict: Contains session_id, token, and provider information
Raises:
HTTPException: 400 for unsupported provider or token errors
"""
if provider.lower() != "github":
raise HTTPException(status_code=400, detail="Unsupported provider")
params = {
"client_id": os.getenv("VITE_GITHUB_CLIENT_ID"),
"client_secret": os.getenv("VITE_GITHUB_CLIENT_SECRET"),
"code": accessToken
}
headers = {
"Accept": "application/json",
"Accept-Encoding": "application/json"
}
response = requests.get(GITHUB_ACCESS_TOKEN_URL, params=params, headers=headers)
if response.status_code != 200:
raise HTTPException(status_code=response.status_code, detail="Error fetching token from GitHub")
githubUserData = response.json()
token = githubUserData.get("access_token")
if not token:
raise HTTPException(status_code=400, detail="Failed to retrieve access token")
response = await create_llm_session()
response["token"] = token
response["provider"] = provider
return await store_fetcher_endpoint(response)
@router.post("/pat")
async def store_fetcher_endpoint(request: Request):
"""
Endpoint to store the PAT associated with a session.
Args:
request: Contains JSON payload with 'session_id' and 'pat'
Returns:
dict: Contains session_id and username
Raises:
HTTPException: 400 if PAT is missing
"""
if isinstance(request, Request):
payload = await request.json()
else:
payload = request
provider = payload.get("provider", "GitHub")
token = payload.get("pat") or payload.get("token")
if not token:
raise HTTPException(status_code=400, detail="Missing required field: pat")
response = await create_llm_session()
session_id = response.get("session_id")
username = store_fetcher(session_id, token, provider)
return {"session_id": session_id, "username": username}
async def create_llm_session(request: Optional[LlmConfig] = None):
"""
Create a new LLM session with custom configuration.
Args:
request: Optional LLM configuration
Returns:
dict: Contains session_id and success message
Raises:
HTTPException: 500 if session creation fails
"""
try:
session_id = await set_llm(request)
return {
"session_id": session_id,
"message": "LLM session created successfully"
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/repos")
async def get_repos(session_id: str):
"""
Return a list of repositories for the given session_id.
Args:
session_id: The session identifier
Returns:
dict: Contains list of repository names
Raises:
HTTPException: 404 if session not found
"""
fetcher = get_fetcher(session_id)
return {"repos": fetcher.repos_names}
@router.get("/actions", response_model=ActionsResponse)
async def get_actions(
session_id: str,
start_date: Optional[str] = Query(None),
end_date: Optional[str] = Query(None),
repo_filter: Optional[List[str]] = Query(None),
authors: Optional[List[str]] = Query(None)
):
"""
Get actions for the specified session with optional filters.
Returns a structured response including the actions list, user-facing
informational message (if trimming occurred), and metadata about the
trimming operation.
Args:
session_id: The session identifier
start_date: Optional start date filter
end_date: Optional end date filter
repo_filter: Optional list of repositories to filter
authors: Optional list of authors to filter
Returns:
ActionsResponse: Structured response with actions, message, and metadata
Raises:
HTTPException: 404 if session not found
"""
if repo_filter is not None:
repo_filter = sum([repo.split(",") for repo in repo_filter], [])
if authors is not None:
authors = sum([author.split(",") for author in authors], [])
fetcher = get_fetcher(session_id)
start_dt = datetime.fromisoformat(start_date).replace(tzinfo=timezone.utc) if start_date else None
end_dt = datetime.fromisoformat(end_date).replace(tzinfo=timezone.utc) if end_date else None
if start_dt:
fetcher.start_date = start_dt
if end_dt:
fetcher.end_dt = end_dt
if repo_filter is not None:
fetcher.repo_filter = repo_filter
if authors is not None:
fetcher.authors = authors
llm = get_llm(session_id)
actions = fetcher.get_authored_messages()
# Store original count before trimming
original_count = len(actions)
# Apply token limit trimming
trimmed_actions = trim_messages(actions, llm.tokenizer)
# Calculate how many items were removed
trimmed_count = original_count - len(trimmed_actions)
# Generate user-facing message if trimming occurred
message = None
if trimmed_count > 0:
message = (
f"We're running the free version with a maximum token limit for contextual input. "
f"To stay within this limit, we automatically trimmed {trimmed_count} older Git "
f"actionable{'s' if trimmed_count != 1 else ''} from the context. "
f"We hope you understand!"
)
# Parse actions to text format
actions_txt = parse_entries_to_txt(trimmed_actions)
# Return structured response
return ActionsResponse(
actions=actions_txt,
message=message,
trimmed_count=trimmed_count,
total_count=original_count
)
@router.get("/release_notes")
async def get_release_notes(
session_id: str,
repo_filter: Optional[List[str]] = Query(None),
num_old_releases: int = Query(..., ge=1)
):
"""
Generate release notes for the latest release of a single repository.
Args:
session_id: The session identifier
repo_filter: Must contain exactly one repository name
num_old_releases: Number of previous releases to include for context
Returns:
dict: Contains actions and release notes text
Raises:
HTTPException: 400 for invalid input, 404 for session not found, 500 for errors
"""
if repo_filter is None or len(repo_filter) != 1:
raise HTTPException(status_code=400, detail="repo_filter must be a list containing exactly one repository name.")
repo = repo_filter[0]
try:
fetcher = get_fetcher(session_id)
except HTTPException:
raise
try:
releases = fetcher.fetch_releases()
except NotImplementedError:
raise HTTPException(status_code=400, detail="Release fetching is not supported for this provider.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching releases: {str(e)}")
releases_txt = parse_releases_to_txt(releases[:num_old_releases])
repo_releases = [r for r in releases if r.get("repo") == repo]
n_releases = len(repo_releases)
if n_releases < 1:
raise HTTPException(status_code=400, detail="Not enough releases found for the specified repository (need at least 1).")
if num_old_releases < 1 or num_old_releases >= n_releases:
raise HTTPException(
status_code=400,
detail=f"num_old_releases must be at least 1 and less than the number of releases available ({n_releases}) for this repository."
)
try:
repo_releases.sort(key=lambda r: r.get("published_at") or r.get("created_at"), reverse=True)
except Exception:
raise HTTPException(status_code=500, detail="Failed to sort releases by date.")
latest_release = repo_releases[0]
release_date = latest_release.get("published_at") or latest_release.get("created_at")
if not release_date:
raise HTTPException(status_code=500, detail="Latest release does not have a valid date.")
if isinstance(release_date, datetime):
start_date_iso = release_date.astimezone(timezone.utc).isoformat()
else:
try:
dt = datetime.fromisoformat(release_date)
start_date_iso = dt.astimezone(timezone.utc).isoformat()
except Exception:
raise HTTPException(status_code=500, detail="Release date is not a valid ISO format.")
fetcher.start_date = datetime.fromisoformat(start_date_iso)
fetcher.end_dt = None
fetcher.repo_filter = [repo]
llm = get_llm(session_id)
actions = fetcher.get_authored_messages()
actions = trim_messages(actions, llm.tokenizer)
actions_txt = parse_entries_to_txt(actions)
return {"actions": "\n\n".join([actions_txt, releases_txt])}
@router.get("/branches", response_model=BranchListResponse)
async def get_branches(session_id: str, repo: str):
"""
Get all branches for a given repository in the current session.
Args:
session_id: The session identifier
repo: Repository name
Returns:
BranchListResponse: Contains list of branch names
Raises:
HTTPException: 400 if not supported, 404 if session not found, 500 for errors
"""
fetcher = get_fetcher(session_id)
try:
fetcher.repo_filter = [repo]
branches = fetcher.get_branches()
except NotImplementedError:
raise HTTPException(status_code=400, detail="Branch listing is not supported for this provider.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch branches: {str(e)}")
return BranchListResponse(branches=branches)
@router.post("/valid-target-branches", response_model=ValidTargetBranchesResponse)
async def get_valid_target_branches(req: ValidTargetBranchesRequest):
"""
Get all valid target branches for a given source branch in a repository.
Args:
req: ValidTargetBranchesRequest containing session_id, repo, and source_branch
Returns:
ValidTargetBranchesResponse: Contains list of valid target branch names
Raises:
HTTPException: 400 for validation errors, 404 if session not found, 500 for errors
"""
fetcher = get_fetcher(req.session_id)
try:
fetcher.repo_filter = [req.repo]
valid_targets = fetcher.get_valid_target_branches(req.source_branch)
except NotImplementedError:
raise HTTPException(status_code=400, detail="Target branch validation is not supported for this provider.")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to validate target branches: {str(e)}")
return ValidTargetBranchesResponse(valid_target_branches=valid_targets)
@router.post("/create-pull-request", response_model=CreatePullRequestResponse)
async def create_pull_request(req: CreatePullRequestRequest):
"""
Create a pull request between two branches with optional metadata.
Args:
req: CreatePullRequestRequest containing all PR details
Returns:
CreatePullRequestResponse: Contains PR URL, number, state, and success status
Raises:
HTTPException: 400 for validation errors, 404 if session not found, 500 for errors
"""
fetcher = get_fetcher(req.session_id)
fetcher.repo_filter = [req.repo]
if not req.description or not req.description.strip():
raise HTTPException(status_code=400, detail="Description is required for pull request creation.")
try:
result = fetcher.create_pull_request(
head_branch=req.source_branch,
base_branch=req.target_branch,
title=req.title or f"Merge {req.source_branch} into {req.target_branch}",
body=req.description,
draft=req.draft or False,
reviewers=req.reviewers,
assignees=req.assignees,
labels=req.labels,
)
except NotImplementedError:
raise HTTPException(status_code=400, detail="Pull request creation is not supported for this provider.")
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to create pull request: {str(e)}")
return CreatePullRequestResponse(
url=result.get("url"),
number=result.get("number"),
state=result.get("state"),
success=result.get("success", False),
generated_description=None
)
@router.post("/get-pull-request-diff")
async def get_pull_request_diff(req: GetPullRequestDiffRequest):
"""
Get the diff between two branches for pull request preview.
Args:
req: GetPullRequestDiffRequest containing session_id, repo, source_branch, and target_branch
Returns:
dict: Contains formatted commit actions between branches
Raises:
HTTPException: 400 if not supported or GitHub only, 404 if session not found, 500 for errors
"""
fetcher = get_fetcher(req.session_id)
fetcher.repo_filter = [req.repo]
provider = type(fetcher).__name__.lower()
if "github" not in provider:
raise HTTPException(status_code=400, detail="Pull request diff is only supported for GitHub provider.")
try:
commits = fetcher.fetch_branch_diff_commits(req.source_branch, req.target_branch)
except NotImplementedError:
raise HTTPException(status_code=400, detail="Branch diff is not supported for this provider.")
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to fetch pull request diff: {str(e)}")
return {"actions": parse_entries_to_txt(commits)}
@router.post("/authors", response_model=GetAuthorsResponse)
async def get_authors(request: GetAuthorsRequest):
"""
Retrieve list of unique authors from specified repositories.
Args:
request: GetAuthorsRequest containing session_id and optional repo_names
Returns:
GetAuthorsResponse with list of authors and metadata
Raises:
HTTPException: 404 if session not found, 500 for fetcher errors
"""
try:
fetcher = get_fetcher(request.session_id)
if not fetcher:
raise HTTPException(
status_code=404,
detail=f"Session {request.session_id} not found or expired"
)
authors_data = fetcher.get_authors(request.repo_names or [])
authors = [
AuthorInfo(name=author["name"], email=author["email"])
for author in authors_data
]
response = GetAuthorsResponse(
authors=authors,
total_count=len(authors),
repo_count=len(request.repo_names) if request.repo_names else 0
)
return response
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error fetching authors: {str(e)}"
)
@router.get("/current-author", response_model=GetCurrentAuthorResponse)
async def get_current_author(session_id: str = Query(..., description="Session identifier")):
"""
Retrieve the current authenticated user's information from the fetcher.
Args:
session_id: The session identifier
Returns:
GetCurrentAuthorResponse: Contains optional author information (name and email)
Raises:
HTTPException: 404 if session not found, 500 for errors
"""
try:
fetcher = get_fetcher(session_id)
if not fetcher:
raise HTTPException(
status_code=404,
detail=f"Session {session_id} not found or expired"
)
try:
author_info = fetcher.get_current_author()
except NotImplementedError:
author_info = None
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error retrieving current author: {str(e)}"
)
return GetCurrentAuthorResponse(author=author_info)
except HTTPException:
raise
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Error fetching current author: {str(e)}"
)