|
|
|
|
|
import os |
|
|
import io |
|
|
import sys |
|
|
import zipfile |
|
|
import tempfile |
|
|
import subprocess |
|
|
from pathlib import Path |
|
|
|
|
|
import gradio as gr |
|
|
import pandas as pd |
|
|
|
|
|
import barzooka as bz |
|
|
|
|
|
|
|
|
def _check_pdftocairo() -> str: |
|
|
try: |
|
|
out = subprocess.run(["pdftocairo", "-v"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) |
|
|
if out.returncode == 0 or "pdftocairo" in out.stdout: |
|
|
return out.stdout.strip().splitlines()[-1] |
|
|
return f"pdftocairo not found (returncode {out.returncode}). Output:\n{out.stdout}" |
|
|
except FileNotFoundError: |
|
|
return "pdftocairo not found (FileNotFoundError). Install poppler-utils." |
|
|
|
|
|
|
|
|
def _maybe_download_model(target_path: Path) -> bool: |
|
|
"""If BARZOOKA_MODEL_URL is set, try to download the model to target_path.""" |
|
|
url = os.getenv("BARZOOKA_MODEL_URL", "").strip() |
|
|
if not url: |
|
|
return False |
|
|
try: |
|
|
import urllib.request |
|
|
print(f"Downloading Barzooka model from {url} ...") |
|
|
with urllib.request.urlopen(url) as resp: |
|
|
data = resp.read() |
|
|
target_path.write_bytes(data) |
|
|
print(f"Saved model to {target_path}") |
|
|
return True |
|
|
except Exception as e: |
|
|
print(f"Model download failed: {e}") |
|
|
return False |
|
|
|
|
|
|
|
|
def _construct_barzooka() -> bz.Barzooka: |
|
|
|
|
|
try: |
|
|
return bz.Barzooka(cpu=True) |
|
|
except Exception as e_default: |
|
|
|
|
|
local_pkl = Path(__file__).with_name("barzooka.pkl") |
|
|
if not local_pkl.exists(): |
|
|
|
|
|
_maybe_download_model(local_pkl) |
|
|
if local_pkl.exists(): |
|
|
try: |
|
|
return bz.Barzooka(model_file=str(local_pkl), cpu=True) |
|
|
except Exception as e_local: |
|
|
raise RuntimeError( |
|
|
"Failed to load Barzooka model from both package resources and local 'barzooka.pkl'.\n" |
|
|
f"Package error: {e_default}\nLocal file error: {e_local}" |
|
|
) |
|
|
raise RuntimeError( |
|
|
"Barzooka model file not found. Upload 'barzooka.pkl' next to app.py, or set BARZOOKA_MODEL_URL to a downloadable .pkl." |
|
|
) |
|
|
|
|
|
|
|
|
BARZOOKA = _construct_barzooka() |
|
|
|
|
|
|
|
|
def _predict_single_pdf(pdf_path: str, pagewise: bool = False) -> pd.DataFrame: |
|
|
res = BARZOOKA.predict_from_file(pdf_path, tmp_folder="./tmp/", pagewise=pagewise) |
|
|
if pagewise: |
|
|
images, page_preds = res |
|
|
rows = [{"page": i + 1, "classes": preds} for i, preds in enumerate(page_preds)] |
|
|
return pd.DataFrame(rows) |
|
|
else: |
|
|
return pd.DataFrame([res]) |
|
|
|
|
|
|
|
|
def screen_pdf(pdf_file, pagewise: bool): |
|
|
if pdf_file is None: |
|
|
return pd.DataFrame([{"error": "Please upload a PDF."}]) |
|
|
try: |
|
|
return _predict_single_pdf(pdf_file, pagewise) |
|
|
except Exception as e: |
|
|
return pd.DataFrame([{"error": str(e)}]) |
|
|
|
|
|
|
|
|
def screen_images(img_files): |
|
|
if not img_files: |
|
|
return pd.DataFrame([{"error": "Please upload at least one image."}]) |
|
|
try: |
|
|
classes_per_img = BARZOOKA.predict_from_img(img_files) |
|
|
rows = [{"image": os.path.basename(p), "classes": c} for p, c in zip(img_files, classes_per_img)] |
|
|
return pd.DataFrame(rows) |
|
|
except Exception as e: |
|
|
return pd.DataFrame([{"error": str(e)}]) |
|
|
|
|
|
|
|
|
def screen_zip_of_pdfs(zip_file, pagewise: bool): |
|
|
if zip_file is None: |
|
|
return pd.DataFrame([{"error": "Please upload a .zip containing PDFs."}]), None |
|
|
|
|
|
out_rows = [] |
|
|
|
|
|
with tempfile.TemporaryDirectory() as tdir: |
|
|
with zipfile.ZipFile(zip_file, "r") as zf: |
|
|
zf.extractall(tdir) |
|
|
|
|
|
pdf_paths = [str(p) for p in Path(tdir).rglob("*.pdf")] |
|
|
if not pdf_paths: |
|
|
return pd.DataFrame([{"error": "No PDFs found in the ZIP."}]), None |
|
|
|
|
|
for pdf in sorted(pdf_paths): |
|
|
try: |
|
|
df = _predict_single_pdf(pdf, pagewise=pagewise) |
|
|
if pagewise: |
|
|
paper_id = Path(pdf).stem |
|
|
df.insert(0, "paper_id", paper_id) |
|
|
out_rows.append(df) |
|
|
else: |
|
|
out_rows.append(df) |
|
|
except Exception as e: |
|
|
out_rows.append(pd.DataFrame([{"paper_id": Path(pdf).stem, "error": str(e)}])) |
|
|
|
|
|
result_df = pd.concat(out_rows, ignore_index=True) |
|
|
|
|
|
csv_path = Path(tdir) / "barzooka_results.csv" |
|
|
result_df.to_csv(csv_path, index=False) |
|
|
|
|
|
out_csv = Path.cwd() / "barzooka_results.csv" |
|
|
try: |
|
|
out_csv.write_bytes(csv_path.read_bytes()) |
|
|
except Exception: |
|
|
out_csv = None |
|
|
|
|
|
return result_df, (str(out_csv) if out_csv and out_csv.exists() else None) |
|
|
|
|
|
|
|
|
def diagnostics(): |
|
|
pdfto = _check_pdftocairo() |
|
|
classes = getattr(BARZOOKA, "class_names", []) |
|
|
summary = { |
|
|
"pdftocairo": pdfto, |
|
|
"classes": classes, |
|
|
"model_loaded": True, |
|
|
} |
|
|
return pd.DataFrame([summary]) |
|
|
|
|
|
|
|
|
with gr.Blocks(title="Barzooka: Graph-Type Screening") as demo: |
|
|
gr.Markdown( |
|
|
""" |
|
|
# Barzooka |
|
|
Screen PDFs or images to detect graph types (bar, bardot, box, dot, violin, hist, pie, flow, text, other). |
|
|
|
|
|
**Notes:** PDF screening requires the system utility `pdftocairo` (from Poppler). This Space installs |
|
|
`poppler-utils` so conversion should work out-of-the-box. |
|
|
""" |
|
|
) |
|
|
|
|
|
with gr.Row(): |
|
|
diag_btn = gr.Button("Environment diagnostics") |
|
|
diag_out = gr.Dataframe(label="Diagnostics", wrap=True) |
|
|
diag_btn.click(diagnostics, outputs=diag_out) |
|
|
|
|
|
with gr.Tab("Single PDF"): |
|
|
pdf_input = gr.File(label="Upload a PDF", file_types=[".pdf"]) |
|
|
pagewise_chk = gr.Checkbox(label="Return page-wise predictions", value=False) |
|
|
pdf_btn = gr.Button("Screen PDF") |
|
|
pdf_out = gr.Dataframe(label="Results", wrap=True) |
|
|
pdf_btn.click(fn=screen_pdf, inputs=[pdf_input, pagewise_chk], outputs=pdf_out) |
|
|
|
|
|
with gr.Tab("ZIP of PDFs (batch)"): |
|
|
zip_input = gr.File(label="Upload a ZIP containing PDFs", file_types=[".zip"]) |
|
|
pw_chk = gr.Checkbox(label="Return page-wise predictions", value=False) |
|
|
zip_btn = gr.Button("Screen ZIP") |
|
|
zip_out = gr.Dataframe(label="Results (preview)", wrap=True) |
|
|
zip_csv = gr.File(label="Download all results (CSV)") |
|
|
zip_btn.click(fn=screen_zip_of_pdfs, inputs=[zip_input, pw_chk], outputs=[zip_out, zip_csv]) |
|
|
|
|
|
with gr.Tab("Images"): |
|
|
imgs_input = gr.Files(label="Upload images", file_types=[".jpg", ".jpeg", ".png"]) |
|
|
imgs_btn = gr.Button("Screen Images") |
|
|
imgs_out = gr.Dataframe(label="Results", wrap=True) |
|
|
imgs_btn.click(fn=screen_images, inputs=imgs_input, outputs=imgs_out) |
|
|
|
|
|
demo.queue(api_open=True) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.queue(api_open=True).launch() |
|
|
|