Spaces:
Running
Running
| from typing import Any | |
| from fastapi import FastAPI, Request | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from fastapi.responses import HTMLResponse, RedirectResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from app.api import router as api_router | |
| from app.services.video_service import get_video_by_id | |
| from jinja2 import pass_context | |
| from starlette.datastructures import URL | |
| app = FastAPI(title="In-Video Search", docs_url=None, redoc_url=None, openapi_url=None) | |
| # Enable CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Adjust this in production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Mount static files | |
| app.mount("/static", StaticFiles(directory="app/static"), name="static") | |
| # Templates | |
| templates = Jinja2Templates(directory="app/templates") | |
| def https_url_for(context: dict, name: str, **path_params: Any) -> URL: | |
| request: Request = context["request"] | |
| url: URL = request.url_for(name, **path_params) | |
| return url.replace(scheme="https") | |
| templates.env.globals["https_url_for"] = https_url_for | |
| async def index(request: Request): | |
| return templates.TemplateResponse( | |
| "index.html", {"request": request, "title": "In-Video Search"} | |
| ) | |
| async def video_page(request: Request, video_id: str): | |
| # Try to get video info from database | |
| video = get_video_by_id(video_id) | |
| title = "Video Player" | |
| # If video exists and has a title, use it | |
| if video and video.title: | |
| title = video.title | |
| # Get the start time from query parameters if available | |
| start_time = 0 | |
| if "t" in request.query_params: | |
| try: | |
| # Try to parse the t parameter as seconds | |
| t_param = request.query_params.get("t") | |
| # First try as a float (seconds) | |
| start_time = int(float(t_param)) | |
| except (ValueError, TypeError): | |
| # If that fails, try parsing as MM:SS or HH:MM:SS format | |
| try: | |
| if ":" in t_param: | |
| parts = t_param.split(":") | |
| if len(parts) == 2: # MM:SS | |
| start_time = int(parts[0]) * 60 + int(parts[1]) | |
| elif len(parts) == 3: # HH:MM:SS | |
| start_time = ( | |
| int(parts[0]) * 3600 + int(parts[1]) * 60 + int(parts[2]) | |
| ) | |
| except (ValueError, TypeError, IndexError): | |
| # If all parsing fails, default to 0 | |
| start_time = 0 | |
| return templates.TemplateResponse( | |
| "video.html", | |
| { | |
| "request": request, | |
| "title": title, | |
| "video_id": video_id, | |
| "start_time": start_time, | |
| }, | |
| ) | |
| async def watch_redirect(request: Request, v: str): | |
| # Redirect YouTube-style URLs to our video page | |
| # If there's a t parameter, include it in the redirect | |
| if "t" in request.query_params: | |
| return RedirectResponse(url=f"/video/{v}?t={request.query_params.get('t')}") | |
| else: | |
| return RedirectResponse(url=f"/video/{v}") | |
| # Include API routers | |
| app.include_router(api_router.router, prefix="/api") | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app.main:app", host="0.0.0.0", port=8000, reload=True) | |