| | |
| | from fastapi import FastAPI, HTTPException |
| | from fastapi.middleware.cors import CORSMiddleware |
| | import asyncio |
| | from datetime import datetime, timedelta |
| | from typing import Dict, Optional |
| | from pydantic import BaseModel |
| | from dataclasses import dataclass |
| | import logging |
| | import requests |
| | from bs4 import BeautifulSoup |
| | from difflib import get_close_matches |
| | from model.analyzer import analyze_content |
| |
|
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| | app = FastAPI() |
| |
|
| | app.add_middleware( |
| | CORSMiddleware, |
| | allow_origins=["*"], |
| | allow_credentials=True, |
| | allow_methods=["*"], |
| | allow_headers=["*"], |
| | ) |
| |
|
| | @dataclass |
| | class ProgressState: |
| | progress: float |
| | status: str |
| | timestamp: datetime |
| | task_id: str |
| | is_complete: bool = False |
| | result: Optional[dict] = None |
| | error: Optional[str] = None |
| |
|
| | class ProgressResponse(BaseModel): |
| | progress: float |
| | status: str |
| | is_complete: bool |
| | result: Optional[dict] = None |
| | error: Optional[str] = None |
| |
|
| | |
| | progress_tracker: Dict[str, ProgressState] = {} |
| |
|
| | BASE_URL = "https://imsdb.com" |
| | ALL_SCRIPTS_URL = f"{BASE_URL}/all-scripts.html" |
| |
|
| | def create_task_id(movie_name: str) -> str: |
| | """Create a unique task ID for a movie analysis request""" |
| | return f"{movie_name}-{datetime.now().timestamp()}" |
| |
|
| | async def cleanup_old_tasks(): |
| | """Remove tasks older than 1 hour""" |
| | while True: |
| | current_time = datetime.now() |
| | expired_tasks = [ |
| | task_id for task_id, state in progress_tracker.items() |
| | if current_time - state.timestamp > timedelta(hours=1) |
| | ] |
| | for task_id in expired_tasks: |
| | del progress_tracker[task_id] |
| | await asyncio.sleep(300) |
| |
|
| | @app.on_event("startup") |
| | async def startup_event(): |
| | """Initialize the server and start cleanup task""" |
| | progress_tracker.clear() |
| | asyncio.create_task(cleanup_old_tasks()) |
| | logger.info("Server started, progress tracker initialized") |
| |
|
| | def update_progress(task_id: str, progress: float, status: str, result: Optional[dict] = None, error: Optional[str] = None): |
| | """Update progress state for a task""" |
| | is_complete = progress >= 1.0 |
| | progress_tracker[task_id] = ProgressState( |
| | progress=progress, |
| | status=status, |
| | timestamp=datetime.now(), |
| | task_id=task_id, |
| | is_complete=is_complete, |
| | result=result, |
| | error=error |
| | ) |
| | logger.info(f"Task {task_id}: {status} (Progress: {progress * 100:.0f}%)") |
| |
|
| | @app.get("/api/start_analysis") |
| | async def start_analysis(movie_name: str): |
| | """Start a new analysis task""" |
| | task_id = create_task_id(movie_name) |
| | update_progress(task_id, 0.0, "Starting analysis...") |
| | |
| | |
| | asyncio.create_task(run_analysis(task_id, movie_name)) |
| | |
| | return {"task_id": task_id} |
| |
|
| | @app.get("/api/progress/{task_id}") |
| | async def get_progress(task_id: str) -> ProgressResponse: |
| | """Get current progress for a task""" |
| | if task_id not in progress_tracker: |
| | raise HTTPException(status_code=404, detail="Task not found") |
| | |
| | state = progress_tracker[task_id] |
| | return ProgressResponse( |
| | progress=state.progress, |
| | status=state.status, |
| | is_complete=state.is_complete, |
| | result=state.result, |
| | error=state.error |
| | ) |
| |
|
| | def find_movie_link(movie_name: str, soup: BeautifulSoup) -> str | None: |
| | """Find the closest matching movie link from the script database.""" |
| | movie_links = {link.text.strip().lower(): link['href'] for link in soup.find_all('a', href=True)} |
| | close_matches = get_close_matches(movie_name.lower(), movie_links.keys(), n=1, cutoff=0.6) |
| | |
| | if close_matches: |
| | logger.info(f"Close match found: {close_matches[0]}") |
| | return BASE_URL + movie_links[close_matches[0]] |
| | |
| | logger.info("No close match found.") |
| | return None |
| |
|
| | def find_script_link(soup: BeautifulSoup, movie_name: str) -> str | None: |
| | """Find the script download link for a given movie.""" |
| | patterns = [ |
| | f'Read "{movie_name}" Script', |
| | f'Read "{movie_name.title()}" Script', |
| | f'Read "{movie_name.upper()}" Script', |
| | f'Read "{movie_name.lower()}" Script' |
| | ] |
| | |
| | for link in soup.find_all('a', href=True): |
| | link_text = link.text.strip() |
| | if any(pattern.lower() in link_text.lower() for pattern in patterns): |
| | return link['href'] |
| | elif all(word.lower() in link_text.lower() for word in ["Read", "Script", movie_name]): |
| | return link['href'] |
| | return None |
| |
|
| | def fetch_script(movie_name: str) -> str | None: |
| | """Fetch and extract the script content for a given movie.""" |
| | |
| | update_progress(movie_name, 0.1, "Fetching the script database...") |
| | try: |
| | response = requests.get(ALL_SCRIPTS_URL) |
| | response.raise_for_status() |
| | except requests.RequestException as e: |
| | logger.error(f"Failed to load the main page: {str(e)}") |
| | return None |
| |
|
| | |
| | update_progress(movie_name, 0.2, "Searching for the movie...") |
| | soup = BeautifulSoup(response.text, 'html.parser') |
| | movie_link = find_movie_link(movie_name, soup) |
| | |
| | if not movie_link: |
| | logger.error(f"Script for '{movie_name}' not found.") |
| | return None |
| |
|
| | |
| | update_progress(movie_name, 0.3, "Loading movie details...") |
| | try: |
| | response = requests.get(movie_link) |
| | response.raise_for_status() |
| | except requests.RequestException as e: |
| | logger.error(f"Failed to load the movie page: {str(e)}") |
| | return None |
| |
|
| | |
| | update_progress(movie_name, 0.4, "Locating script download...") |
| | soup = BeautifulSoup(response.text, 'html.parser') |
| | script_link = find_script_link(soup, movie_name) |
| |
|
| | if not script_link: |
| | logger.error(f"Unable to find script link for '{movie_name}'.") |
| | return None |
| |
|
| | |
| | script_page_url = BASE_URL + script_link |
| | update_progress(movie_name, 0.5, "Downloading script content...") |
| | |
| | try: |
| | response = requests.get(script_page_url) |
| | response.raise_for_status() |
| | except requests.RequestException as e: |
| | logger.error(f"Failed to load the script: {str(e)}") |
| | return None |
| |
|
| | |
| | update_progress(movie_name, 0.6, "Extracting script text...") |
| | soup = BeautifulSoup(response.text, 'html.parser') |
| | script_content = soup.find('pre') |
| | |
| | if script_content: |
| | update_progress(movie_name, 0.7, "Script extracted successfully") |
| | return script_content.get_text() |
| | else: |
| | logger.error("Failed to extract script content.") |
| | return None |
| |
|
| | async def run_analysis(task_id: str, movie_name: str): |
| | """Run the actual analysis task""" |
| | try: |
| | |
| | update_progress(task_id, 0.2, "Fetching script...") |
| | script_text = fetch_script(movie_name) |
| | if not script_text: |
| | raise Exception("Script not found") |
| |
|
| | |
| | update_progress(task_id, 0.6, "Analyzing content...") |
| | result = await analyze_content(script_text) |
| | |
| | |
| | update_progress(task_id, 1.0, "Analysis complete", result=result) |
| | |
| | except Exception as e: |
| | logger.error(f"Error in analysis: {str(e)}", exc_info=True) |
| | update_progress(task_id, 1.0, "Error occurred", error=str(e)) |
| |
|
| | @app.get("/api/fetch_and_analyze") |
| | async def fetch_and_analyze(movie_name: str): |
| | """Fetch and analyze a movie script, with progress tracking.""" |
| | try: |
| | |
| | task_id = create_task_id(movie_name) |
| | update_progress(task_id, 0.0, "Starting script search...") |
| | |
| | |
| | script_text = fetch_script(movie_name) |
| | if not script_text: |
| | raise HTTPException(status_code=404, detail="Script not found or error occurred") |
| |
|
| | |
| | update_progress(task_id, 0.8, "Analyzing script content...") |
| | result = await analyze_content(script_text) |
| | |
| | |
| | update_progress(task_id, 1.0, "Analysis complete!") |
| | return result |
| | |
| | except Exception as e: |
| | logger.error(f"Error in fetch_and_analyze: {str(e)}", exc_info=True) |
| | |
| | if movie_name in progress_tracker: |
| | del progress_tracker[movie_name] |
| | raise HTTPException(status_code=500, detail=f"Internal Server Error: {str(e)}") |
| |
|
| | @app.get("/api/progress") |
| | def get_progress(movie_name: str): |
| | """Get the current progress and status for a movie analysis.""" |
| | if movie_name not in progress_tracker: |
| | return { |
| | "progress": 0, |
| | "status": "Waiting to start..." |
| | } |
| | |
| | progress_info = progress_tracker[movie_name] |
| | |
| | |
| | current_time = datetime.now() |
| | if (current_time - progress_info.timestamp).total_seconds() > 3600: |
| | del progress_tracker[movie_name] |
| | return { |
| | "progress": 0, |
| | "status": "Session expired. Please try again." |
| | } |
| | |
| | return { |
| | "progress": progress_info.progress, |
| | "status": progress_info.status |
| | } |
| |
|
| | if __name__ == "__main__": |
| | import uvicorn |
| | uvicorn.run(app, host="0.0.0.0", port=8000) |