Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| # app.py | |
| import io | |
| import os | |
| import re | |
| import base64 | |
| import glob | |
| import logging | |
| import random | |
| import shutil | |
| import time | |
| import zipfile | |
| import json | |
| import asyncio | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Any, List, Dict, Optional | |
| import pandas as pd | |
| import pytz | |
| import streamlit as st | |
| import aiofiles | |
| import requests | |
| from PIL import Image, ImageDraw, UnidentifiedImageError | |
| from reportlab.pdfgen import canvas | |
| from reportlab.lib.utils import ImageReader | |
| from reportlab.lib.pagesizes import letter | |
| import fitz # PyMuPDF | |
| from huggingface_hub import InferenceClient | |
| from huggingface_hub.utils import RepositoryNotFoundError, GatedRepoError | |
| # Optional AI/ML imports | |
| try: | |
| import torch | |
| from transformers import ( | |
| AutoModelForCausalLM, | |
| AutoTokenizer, | |
| AutoProcessor, | |
| AutoModelForVision2Seq, | |
| pipeline | |
| ) | |
| _transformers_available = True | |
| except ImportError: | |
| _transformers_available = False | |
| try: | |
| from diffusers import StableDiffusionPipeline | |
| _diffusers_available = True | |
| except ImportError: | |
| _diffusers_available = False | |
| # --- Page Configuration --- | |
| st.set_page_config( | |
| page_title="Vision & Layout Titans (HF) 🚀🖼️", | |
| page_icon="🤖", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| menu_items={ | |
| 'Get Help': 'https://huggingface.co/docs', | |
| 'About': "Combined App: Image→PDF Layout + HF AI Tools 🌌" | |
| } | |
| ) | |
| # --- Logging Setup --- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| log_records: List[logging.LogRecord] = [] | |
| class LogCaptureHandler(logging.Handler): | |
| def emit(self, record): | |
| log_records.append(record) | |
| logger.addHandler(LogCaptureHandler()) | |
| # --- Constants & Defaults --- | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| DEFAULT_PROVIDER = "hf-inference" | |
| FEATURED_MODELS_LIST = [ | |
| "meta-llama/Meta-Llama-3.1-8B-Instruct", | |
| "mistralai/Mistral-7B-Instruct-v0.3", | |
| "google/gemma-2-9b-it", | |
| "Qwen/Qwen2-7B-Instruct", | |
| "microsoft/Phi-3-mini-4k-instruct", | |
| "HuggingFaceH4/zephyr-7b-beta", | |
| "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", | |
| "HuggingFaceTB/SmolLM-1.7B-Instruct" | |
| ] | |
| # --- Session State Initialization --- | |
| def _init_state(key: str, default: Any): | |
| if key not in st.session_state: | |
| st.session_state[key] = default | |
| for k, v in { | |
| 'layout_snapshots': [], | |
| 'layout_new_uploads': [], | |
| 'layout_last_capture': None, | |
| 'history': [], | |
| 'processing': {}, | |
| 'asset_checkboxes': {}, | |
| 'downloaded_pdfs': {}, | |
| 'unique_counter': 0, | |
| 'cam0_file': None, | |
| 'cam1_file': None, | |
| 'characters': [], | |
| 'char_form_reset_key': 0, | |
| 'gallery_size': 10, | |
| 'hf_inference_client': None, | |
| 'hf_provider': DEFAULT_PROVIDER, | |
| 'hf_custom_key': "", | |
| 'hf_selected_api_model': FEATURED_MODELS_LIST[0], | |
| 'hf_custom_api_model': "", | |
| 'local_models': {}, | |
| 'selected_local_model_path': None, | |
| 'gen_max_tokens': 512, | |
| 'gen_temperature': 0.7, | |
| 'gen_top_p': 0.95, | |
| 'gen_frequency_penalty': 0.0, | |
| 'gen_seed': -1 | |
| }.items(): | |
| _init_state(k, v) | |
| # --- Utility Functions --- | |
| def generate_filename(seq: str, ext: str = "png") -> str: | |
| ts = time.strftime('%Y%m%d_%H%M%S') | |
| safe = re.sub(r'[^\w\-]+', '_', seq) | |
| return f"{safe}_{ts}.{ext}" | |
| def clean_stem(fn: str) -> str: | |
| return os.path.splitext(os.path.basename(fn))[0].replace('-', ' ').replace('_', ' ').title() | |
| def get_download_link(path: str, mime: str, label: str = "Download") -> str: | |
| if not os.path.exists(path): return f"{label} (not found)" | |
| data = open(path,'rb').read() | |
| b64 = base64.b64encode(data).decode() | |
| return f'<a href="data:{mime};base64,{b64}" download="{os.path.basename(path)}">{label}</a>' | |
| def get_gallery_files(types: List[str] = ['png','jpg','jpeg','pdf','md','txt']) -> List[str]: | |
| files = set() | |
| for ext in types: | |
| files.update(glob.glob(f"*.{ext}")) | |
| files.update(glob.glob(f"*.{ext.upper()}")) | |
| return sorted(files) | |
| # Delete with rerun | |
| def delete_asset(path: str): | |
| try: | |
| os.remove(path) | |
| st.session_state['asset_checkboxes'].pop(path, None) | |
| if path in st.session_state['layout_snapshots']: | |
| st.session_state['layout_snapshots'].remove(path) | |
| st.toast(f"Deleted {os.path.basename(path)}", icon="✅") | |
| except OSError as e: | |
| st.error(f"Delete failed: {e}") | |
| st.rerun() | |
| # Sidebar gallery updater | |
| def update_gallery(): | |
| st.sidebar.markdown("### Asset Gallery 📸📖") | |
| files = get_gallery_files() | |
| if not files: | |
| st.sidebar.info("No assets.") | |
| return | |
| st.sidebar.caption(f"Found {len(files)} assets.") | |
| for f in files[:st.session_state['gallery_size']]: | |
| name = os.path.basename(f) | |
| ext = os.path.splitext(f)[1].lower() | |
| st.sidebar.markdown(f"**{name}**") | |
| with st.sidebar.expander("Preview", expanded=False): | |
| try: | |
| if ext in ['.png','.jpg','.jpeg']: | |
| st.image(Image.open(f), use_container_width=True) | |
| elif ext == '.pdf': | |
| doc = fitz.open(f) | |
| if doc.page_count: | |
| pix = doc[0].get_pixmap(matrix=fitz.Matrix(0.5,0.5)) | |
| img = Image.frombytes('RGB',[pix.width,pix.height],pix.samples) | |
| st.image(img, use_container_width=True) | |
| doc.close() | |
| else: | |
| txt = Path(f).read_text(errors='ignore') | |
| st.code(txt[:200]+'…') | |
| except: | |
| st.warning("Preview error") | |
| c1,c2,c3 = st.sidebar.columns(3) | |
| sel = st.session_state['asset_checkboxes'].get(f, False) | |
| c1.checkbox("Select", value=sel, key=f"cb_{f}") | |
| st.session_state['asset_checkboxes'][f] = st.session_state.get(f"cb_{f}") | |
| mime = {'png':'image/png','jpg':'image/jpeg','jpeg':'image/jpeg','pdf':'application/pdf','md':'text/markdown','txt':'text/plain'}.get(ext[1:], 'application/octet-stream') | |
| with open(f,'rb') as fp: | |
| c2.download_button("📥", data=fp, file_name=name, mime=mime, key=f"dl_{f}") | |
| c3.button("🗑️", key=f"del_{f}", on_click=delete_asset, args=(f,)) | |
| st.sidebar.markdown("---") | |
| # --- PDF Snapshot & Generation --- | |
| async def process_pdf_snapshot(path: str, mode: str='single', resF: float=2.0) -> List[str]: | |
| status = st.empty() | |
| status.text("Snapshot start...") | |
| out_files: List[str] = [] | |
| try: | |
| doc = fitz.open(path) | |
| mat = fitz.Matrix(resF,resF) | |
| cnt = {'single':1,'twopage':2,'allpages':len(doc)}.get(mode,1) | |
| for i in range(min(cnt,len(doc))): | |
| s = time.time() | |
| page = doc[i] | |
| pix = page.get_pixmap(matrix=mat) | |
| base = os.path.splitext(os.path.basename(path))[0] | |
| fname = generate_filename(f"{base}_pg{i+1}_{mode}","png") | |
| await asyncio.to_thread(pix.save, fname) | |
| out_files.append(fname) | |
| status.text(f"Saved {fname} ({int(time.time()-s)}s)") | |
| doc.close() | |
| status.success(f"Snapshot done: {len(out_files)} files") | |
| except Exception as e: | |
| status.error(f"Snapshot error: {e}") | |
| for f in out_files: | |
| if os.path.exists(f): os.remove(f) | |
| out_files = [] | |
| return out_files | |
| from reportlab.lib.pagesizes import letter | |
| def make_image_sized_pdf(sources: List[Any]) -> Optional[bytes]: | |
| # dedupe | |
| seen, uniq = set(), [] | |
| for s in sources: | |
| key = s if isinstance(s,str) else getattr(s,'name',None) | |
| if key and key not in seen: | |
| seen.add(key) | |
| uniq.append(s) | |
| if not uniq: | |
| st.warning("No images for PDF") | |
| return None | |
| buf = io.BytesIO() | |
| c = canvas.Canvas(buf, pagesize=letter) | |
| status = st.empty() | |
| for idx,s in enumerate(uniq,1): | |
| try: | |
| img = Image.open(s) if isinstance(s,str) else Image.open(s) | |
| w,h = img.size | |
| cap = 30 | |
| c.setPageSize((w,h+cap)) | |
| c.drawImage(ImageReader(img),0,cap,w,h,mask='auto') | |
| cap_txt = clean_stem(s if isinstance(s,str) else s.name) | |
| c.setFont('Helvetica',12) | |
| c.drawCentredString(w/2,cap/2,cap_txt) | |
| c.setFont('Helvetica',8) | |
| c.drawRightString(w-10,10,str(idx)) | |
| c.showPage() | |
| status.text(f"Page {idx}/{len(uniq)} added") | |
| except Exception as e: | |
| status.error(f"Error page {idx}: {e}") | |
| c.save() | |
| buf.seek(0) | |
| return buf.getvalue() | |
| # --- HF Inference Client --- | |
| def get_hf_client() -> Optional[InferenceClient]: | |
| provider = st.session_state['hf_provider'] | |
| token = st.session_state['hf_custom_key'].strip() or HF_TOKEN | |
| if provider!='hf-inference' and not token: | |
| st.error(f"Provider {provider} needs token") | |
| return None | |
| client = st.session_state['hf_inference_client'] | |
| if not client: | |
| st.session_state['hf_inference_client'] = InferenceClient(token=token, provider=provider) | |
| return st.session_state['hf_inference_client'] | |
| # --- HF Processing --- | |
| def process_text_hf(text: str, prompt: str, use_api: bool) -> str: | |
| stp = st.empty(); stp.text("Processing...") | |
| msgs = [{"role":"system","content":"You are an assistant."}, | |
| {"role":"user","content":f"{prompt}\n\n{text}"}] | |
| out = "" | |
| if use_api: | |
| client = get_hf_client() | |
| if not client: return "Client error" | |
| model = st.session_state['hf_custom_api_model'] or st.session_state['hf_selected_api_model'] | |
| try: | |
| resp = client.chat_completion( | |
| model=model, | |
| messages=msgs, | |
| max_tokens=st.session_state['gen_max_tokens'], | |
| temperature=st.session | |
| ]}]} | |