File size: 5,321 Bytes
c413127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
365d87f
c413127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import asyncio
import logging
from enum import Enum
from pathlib import Path
from typing import Any, List, Optional, Union

import fire

from src.chains.chains import FindPdfChain
from src.chains.pipelines import PresentationPipeline
from src.chains.prompts import BasePrompt, JsonH1AndGDPrompt
from src.config import Config, Provider
from src.config.logging import setup_logging

logger = logging.getLogger(__name__)


async def process_presentation(
    pdf_paths: List[Union[str, Path]],
    provider: Provider = Provider.VSEGPT,
    model_name: Optional[str] = None,
    temperature: float = 0.2,
    vision_prompt: Optional[BasePrompt] = None,
    max_concurrent_slides: int = 3,
    dpi: int = 72,
    base_path: Optional[Path] = None,
    fresh_start: bool = True,
    save_steps: bool = True,
) -> None:
    """Process presentations with async pipeline

    Args:
        pdf_paths: List of PDF paths or substrings
        provider: Type of model to use (vsegpt or openai)
        model_name: Optional specific model name
        temperature: Temperature for model
        vision_prompt: Prompt to use (if None, will use JsonH1AndGDPrompt)
        max_concurrent_slides: Maximum number of slides to process concurrently
        dpi: DPI for PDF rendering
        base_path: Base path for storing results
        fresh_start: Whether to ignore existing results
        save_steps: Whether to save intermediate results
    """
    logger.debug("Initializing presentation processing pipeline")

    # Initialize LLM
    llm = Config().model_config.get_llm(provider, model_name, temperature=temperature)

    if vision_prompt is None:
        vision_prompt = JsonH1AndGDPrompt()
        logger.debug("Using default JsonH1AndGDPrompt")

    # Create pipeline
    pipeline = FindPdfChain() | PresentationPipeline(
        llm=llm,
        vision_prompt=vision_prompt,
        max_concurrent_slides=max_concurrent_slides,
        dpi=dpi,
        base_path=base_path,
        fresh_start=fresh_start,
        save_steps=save_steps,
    )

    logger.debug(
        f"Pipeline configured with: model_type={provider}, "
        f"model_name={model_name}, max_concurrent={max_concurrent_slides}, "
        f"dpi={dpi}, base_path={base_path}, fresh_start={fresh_start}, "
        f"save_steps={save_steps}"
    )

    # Process each presentation
    for pdf_path in pdf_paths:
        try:
            logger.info(f"Processing: {pdf_path}")
            result = await pipeline.ainvoke({"pdf_path": pdf_path})
            presentation = result["presentation"]
            logger.info(
                f"Completed {presentation.name} " f"({len(presentation.slides)} slides)"
            )
            logger.debug(f"Full presentation metadata: {presentation.metadata}")
        except Exception as e:
            logger.error(f"Failed to process {pdf_path}: {str(e)}", exc_info=True)


class PipelineCLI:
    """CLI for processing PDF presentations with vision model analysis"""

    def __init__(self):
        """Initialize CLI with logging setup"""
        setup_logging(logger, Path("logs"))

    def process(
        self,
        *pdf_paths: str,
        provider: str = "vsegpt",
        model_name: Optional[str] = None,
        max_concurrent: int = 3,
        dpi: int = 72,
        output_dir: Optional[str] = None,
        fresh_start: bool = True,
        save_steps: bool = True,
    ) -> None:
        """Process PDF presentations with vision model

        Args:
            *pdf_paths: One or more paths to PDF files or substrings to search
            provider: Model type to use ('vsegpt' or 'openai')
            model_name: Specific model name (optional)
            max_concurrent: Maximum number of slides to process concurrently
            dpi: DPI for PDF rendering
            output_dir: Base directory for output files
            fresh_start: Ignore existing analysis results
            save_steps: Save intermediate results
        """
        if not pdf_paths:
            logger.error("No PDF paths provided")
            return

        try:
            provider = Provider(provider.lower())
        except ValueError:
            logger.error(f"Invalid provider: {provider}. Use 'vsegpt' or 'openai'")
            return

        output_path = Path(output_dir) if output_dir else None
        paths = [Path(p) if Path(p).exists() else p for p in pdf_paths]

        logger.info("Starting presentation processing")
        logger.debug(f"Processing PDF paths: {paths}")

        try:
            asyncio.run(
                process_presentation(
                    pdf_paths=paths,
                    provider=provider,
                    model_name=model_name,
                    max_concurrent_slides=max_concurrent,
                    dpi=dpi,
                    base_path=output_path,
                    fresh_start=fresh_start,
                    save_steps=save_steps,
                )
            )
            logger.info("Processing completed successfully")
        except KeyboardInterrupt:
            logger.warning("Processing interrupted by user")
        except Exception as e:
            logger.error("Processing failed with error", exc_info=True)


def main():
    """Entry point for Fire CLI"""
    fire.Fire(PipelineCLI)


if __name__ == "__main__":
    main()