File size: 5,586 Bytes
91b23d7
 
1a1604b
 
c99a3fd
 
1a1604b
 
 
c99a3fd
1a1604b
 
c99a3fd
1a1604b
91b23d7
152fbf2
 
 
 
 
 
 
 
 
 
 
 
 
 
91b23d7
 
1a1604b
 
 
 
 
91b23d7
1a1604b
 
 
91b23d7
 
 
 
 
 
 
 
 
 
 
c99a3fd
91b23d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1a1604b
 
91b23d7
1a1604b
 
 
91b23d7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1a1604b
 
 
 
 
 
 
 
91b23d7
 
1a1604b
 
91b23d7
c99a3fd
 
 
 
 
 
 
 
 
 
 
91b23d7
 
 
 
 
 
 
 
 
 
 
1a1604b
d0b9734
 
 
 
91b23d7
d0b9734
 
 
91b23d7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
# app.py (with logging and debug improvements)
import io, os, json, shutil, subprocess, traceback
from typing import Dict, List, Any
import gradio as gr
from fastapi import FastAPI, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from PIL import Image
import pytesseract
import pdfplumber
from pptx import Presentation
from transformers import BlipProcessor, BlipForConditionalGeneration
import torch
import uvicorn

# ----------- Tesseract Debugging -----------
try:
    print("\n--- DEBUG INFO ---")
    tesseract_path = shutil.which("tesseract")
    print("Tesseract path:", tesseract_path)

    if tesseract_path:
        result = subprocess.run(["tesseract", "--version"], capture_output=True, text=True)
        print("Tesseract version output:\n", result.stdout)
    else:
        print("Tesseract is NOT found in PATH")
    print("--- END DEBUG INFO ---\n")
except Exception as e:
    print("Error during Tesseract check:", e)

# ----------- BLIP Image Caption Model -----------
print("πŸ”„ Loading BLIP model...")
processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-base")
blip_model = BlipForConditionalGeneration.from_pretrained(
    "Salesforce/blip-image-captioning-base",
    torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32
).eval()
print("βœ… BLIP model loaded")

def _caption_image(img: Image.Image) -> str:
    """Run BLIP to caption a PIL image."""
    try:
        inputs = processor(img.convert("RGB"), return_tensors="pt")
        with torch.no_grad():
            out = blip_model.generate(**{k: v.to(blip_model.device) for k, v in inputs.items()})
        return processor.decode(out[0], skip_special_tokens=True)
    except Exception as e:
        print(f"[ERROR] Captioning image failed: {e}")
        traceback.print_exc()
        return "[CAPTION_ERROR]"

# ----------- Slidepack Processing -----------
def analyze_slidepack(file: Any) -> Dict[str, Any]:
    try:
        fname = os.path.basename(file.name)
        print(f"πŸ“‚ Analyzing file: {fname}")
        slides_out: List[Dict[str, Any]] = []

        # PPTX
        if fname.lower().endswith(".pptx"):
            pres = Presentation(file.name)
            for idx, slide in enumerate(pres.slides, start=1):
                texts, caps = [], []
                for shape in slide.shapes:
                    if hasattr(shape, "text"):
                        text = shape.text.strip()
                        if text:
                            texts.append(text)
                    if shape.shape_type == 13:
                        img_blob = shape.image.blob
                        img = Image.open(io.BytesIO(img_blob))
                        caps.append(_caption_image(img))
                slides_out.append({
                    "slide_index": idx,
                    "textBlocks": texts,
                    "imageCaptions": caps
                })

        # PDF
        elif fname.lower().endswith(".pdf"):
            with pdfplumber.open(file.name) as pdf:
                for idx, page in enumerate(pdf.pages, start=1):
                    texts = [page.extract_text() or ""]
                    caps = []
                    try:
                        img = page.to_image(resolution=200).original
                        caps.append(_caption_image(img))
                        ocr_text = pytesseract.image_to_string(img)
                        if ocr_text.strip():
                            texts.append(ocr_text)
                    except Exception as e:
                        print(f"[WARN] Skipping image/OCR on page {idx} due to error: {e}")
                    slides_out.append({
                        "slide_index": idx,
                        "textBlocks": [t for t in texts if t.strip()],
                        "imageCaptions": caps
                    })

        else:
            raise gr.Error("Unsupported file type. Upload a .pptx or .pdf.")

        print("βœ… Slidepack analysis completed")
        return {"file_name": fname, "slides": slides_out}

    except Exception as e:
        print(f"[ERROR] Exception during slidepack analysis: {e}")
        traceback.print_exc()
        return {"error": str(e)}

# ----------- Gradio UI -----------
demo = gr.Interface(
    fn=analyze_slidepack,
    inputs=gr.File(label="Upload PPTX or PDF"),
    outputs=gr.JSON(),
    title="Slide-Pack Full Extractor",
    description=(
        "Returns **every** text fragment and BLIP-generated image caption in JSON. "
        "No summarisation – perfect for downstream quiz agents."
    ),
    live=True
)

# ----------- FastAPI REST Endpoint -----------
api = FastAPI()
api.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@api.post("/extract_slidepack")
async def extract_slidepack(file: UploadFile):
    try:
        path = f"/tmp/{file.filename}"
        with open(path, "wb") as f:
            f.write(await file.read())
        return analyze_slidepack(type("File", (object,), {"name": path}))
    except Exception as e:
        print(f"[ERROR] extract_slidepack endpoint failed: {e}")
        traceback.print_exc()
        return {"error": str(e)}

# ----------- Main Entry -----------
if __name__ == "__main__":
    import asyncio

    async def delayed_startup():
        print("⏳ Waiting before MCP launch to avoid race condition...")
        await asyncio.sleep(3)
        print("πŸš€ Launching with MCP support now.")
        demo.launch(mcp_server=True)

    asyncio.run(delayed_startup())