Spaces:
Build error
Build error
| import asyncio | |
| import logging | |
| from enum import Enum | |
| from pathlib import Path | |
| from typing import Any, List, Optional, Union | |
| import fire | |
| from src.chains.chains import FindPdfChain | |
| from src.chains.pipelines import PresentationPipeline | |
| from src.chains.prompts import BasePrompt, JsonH1AndGDPrompt | |
| from src.config import Config, Provider | |
| from src.config.logging import setup_logging | |
| logger = logging.getLogger(__name__) | |
| async def process_presentation( | |
| pdf_paths: List[Union[str, Path]], | |
| provider: Provider = Provider.VSEGPT, | |
| model_name: Optional[str] = None, | |
| temperature: float = 0.2, | |
| vision_prompt: Optional[BasePrompt] = None, | |
| max_concurrent_slides: int = 3, | |
| dpi: int = 72, | |
| base_path: Optional[Path] = None, | |
| fresh_start: bool = True, | |
| save_steps: bool = True, | |
| ) -> None: | |
| """Process presentations with async pipeline | |
| Args: | |
| pdf_paths: List of PDF paths or substrings | |
| provider: Type of model to use (vsegpt or openai) | |
| model_name: Optional specific model name | |
| temperature: Temperature for model | |
| vision_prompt: Prompt to use (if None, will use JsonH1AndGDPrompt) | |
| max_concurrent_slides: Maximum number of slides to process concurrently | |
| dpi: DPI for PDF rendering | |
| base_path: Base path for storing results | |
| fresh_start: Whether to ignore existing results | |
| save_steps: Whether to save intermediate results | |
| """ | |
| logger.debug("Initializing presentation processing pipeline") | |
| # Initialize LLM | |
| llm = Config().model_config.get_llm(provider, model_name, temperature=temperature) | |
| if vision_prompt is None: | |
| vision_prompt = JsonH1AndGDPrompt() | |
| logger.debug("Using default JsonH1AndGDPrompt") | |
| # Create pipeline | |
| pipeline = FindPdfChain() | PresentationPipeline( | |
| llm=llm, | |
| vision_prompt=vision_prompt, | |
| max_concurrent_slides=max_concurrent_slides, | |
| dpi=dpi, | |
| base_path=base_path, | |
| fresh_start=fresh_start, | |
| save_steps=save_steps, | |
| ) | |
| logger.debug( | |
| f"Pipeline configured with: model_type={provider}, " | |
| f"model_name={model_name}, max_concurrent={max_concurrent_slides}, " | |
| f"dpi={dpi}, base_path={base_path}, fresh_start={fresh_start}, " | |
| f"save_steps={save_steps}" | |
| ) | |
| # Process each presentation | |
| for pdf_path in pdf_paths: | |
| try: | |
| logger.info(f"Processing: {pdf_path}") | |
| result = await pipeline.ainvoke({"pdf_path": pdf_path}) | |
| presentation = result["presentation"] | |
| logger.info( | |
| f"Completed {presentation.name} " f"({len(presentation.slides)} slides)" | |
| ) | |
| logger.debug(f"Full presentation metadata: {presentation.metadata}") | |
| except Exception as e: | |
| logger.error(f"Failed to process {pdf_path}: {str(e)}", exc_info=True) | |
| class PipelineCLI: | |
| """CLI for processing PDF presentations with vision model analysis""" | |
| def __init__(self): | |
| """Initialize CLI with logging setup""" | |
| setup_logging(logger, Path("logs")) | |
| def process( | |
| self, | |
| *pdf_paths: str, | |
| provider: str = "vsegpt", | |
| model_name: Optional[str] = None, | |
| max_concurrent: int = 3, | |
| dpi: int = 72, | |
| output_dir: Optional[str] = None, | |
| fresh_start: bool = True, | |
| save_steps: bool = True, | |
| ) -> None: | |
| """Process PDF presentations with vision model | |
| Args: | |
| *pdf_paths: One or more paths to PDF files or substrings to search | |
| provider: Model type to use ('vsegpt' or 'openai') | |
| model_name: Specific model name (optional) | |
| max_concurrent: Maximum number of slides to process concurrently | |
| dpi: DPI for PDF rendering | |
| output_dir: Base directory for output files | |
| fresh_start: Ignore existing analysis results | |
| save_steps: Save intermediate results | |
| """ | |
| if not pdf_paths: | |
| logger.error("No PDF paths provided") | |
| return | |
| try: | |
| provider = Provider(provider.lower()) | |
| except ValueError: | |
| logger.error(f"Invalid provider: {provider}. Use 'vsegpt' or 'openai'") | |
| return | |
| output_path = Path(output_dir) if output_dir else None | |
| paths = [Path(p) if Path(p).exists() else p for p in pdf_paths] | |
| logger.info("Starting presentation processing") | |
| logger.debug(f"Processing PDF paths: {paths}") | |
| try: | |
| asyncio.run( | |
| process_presentation( | |
| pdf_paths=paths, | |
| provider=provider, | |
| model_name=model_name, | |
| max_concurrent_slides=max_concurrent, | |
| dpi=dpi, | |
| base_path=output_path, | |
| fresh_start=fresh_start, | |
| save_steps=save_steps, | |
| ) | |
| ) | |
| logger.info("Processing completed successfully") | |
| except KeyboardInterrupt: | |
| logger.warning("Processing interrupted by user") | |
| except Exception as e: | |
| logger.error("Processing failed with error", exc_info=True) | |
| def main(): | |
| """Entry point for Fire CLI""" | |
| fire.Fire(PipelineCLI) | |
| if __name__ == "__main__": | |
| main() | |