Spaces:
Sleeping
Sleeping
| # app.py -- Backend v6 (Enhanced: Tables, Images, Emojis, Icons) | |
| # Universal Document Enhancer - Works for ANY document type | |
| import os | |
| import io | |
| import json | |
| import traceback | |
| import unicodedata | |
| import re | |
| import base64 | |
| from fastapi import FastAPI, File, UploadFile, Form | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from typing import Optional, List, Dict, Any | |
| import fitz # pymupdf | |
| from docx import Document | |
| from docx.shared import Pt, Inches, RGBColor | |
| from docx.enum.text import WD_ALIGN_PARAGRAPH | |
| from docx.oxml.shared import OxmlElement, qn | |
| from PIL import Image | |
| import requests | |
| app = FastAPI(title="Document Enhancer v6") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") | |
| GEMINI_API_KEY1 = os.getenv("GEMINI_API_KEY1") | |
| GEMINI_API_KEY2 = os.getenv("GEMINI_API_KEY2") | |
| GEMINI_API_KEY3 = os.getenv("GEMINI_API_KEY3") | |
| GEMINI_API_KEY4 = os.getenv("GEMINI_API_KEY4") | |
| GEMINI_API_KEY5 = os.getenv("GEMINI_API_KEY5") | |
| # Build list of available API keys | |
| GEMINI_API_KEYS = [ | |
| key for key in [ | |
| GEMINI_API_KEY, | |
| GEMINI_API_KEY1, | |
| GEMINI_API_KEY2, | |
| GEMINI_API_KEY3, | |
| GEMINI_API_KEY4, | |
| GEMINI_API_KEY5 | |
| ] if key | |
| ] | |
| GEMINI_URL = "https://generativelanguage.googleapis.com/v1beta/models/gemini-2.0-flash-exp:generateContent" | |
| # ------------------------- | |
| # Utility: Sanitize filename for HTTP headers | |
| # ------------------------- | |
| def sanitize_filename(filename: str) -> str: | |
| """ | |
| Sanitize filename to be safe for HTTP Content-Disposition header. | |
| Only allows ASCII characters safe for latin-1 encoding. | |
| """ | |
| if not filename: | |
| return "document.docx" | |
| name_without_ext = filename.rsplit('.', 1)[0] if '.' in filename else filename | |
| try: | |
| ascii_name = name_without_ext.encode('ascii', 'ignore').decode('ascii') | |
| except: | |
| ascii_name = "document" | |
| if not ascii_name or not ascii_name.strip(): | |
| ascii_name = "document" | |
| safe_name = re.sub(r'[^\w\s\-]', '', ascii_name) | |
| safe_name = re.sub(r'[\s\-]+', '_', safe_name) | |
| safe_name = safe_name.strip('_') or "document" | |
| return f"{safe_name}.docx" | |
| # ------------------------- | |
| # Utility: Repair truncated JSON | |
| # ------------------------- | |
| def repair_truncated_json(json_str: str) -> str: | |
| """ | |
| Attempt to repair truncated JSON by closing open structures. | |
| """ | |
| try: | |
| open_braces = json_str.count('{') | |
| close_braces = json_str.count('}') | |
| open_brackets = json_str.count('[') | |
| close_brackets = json_str.count(']') | |
| if json_str.rstrip().endswith('"'): | |
| last_complete = json_str.rfind('"}') | |
| if last_complete != -1: | |
| json_str = json_str[:last_complete + 2] | |
| else: | |
| last_quote = json_str.rfind('"') | |
| if last_quote != -1: | |
| after_quote = json_str[last_quote + 1:].strip() | |
| if after_quote and after_quote[0] not in [',', '}', ']']: | |
| prev_comma = json_str.rfind(',', 0, last_quote) | |
| if prev_comma != -1: | |
| json_str = json_str[:prev_comma] | |
| for _ in range(open_brackets - close_brackets): | |
| json_str += ']' | |
| for _ in range(open_braces - close_braces): | |
| json_str += '}' | |
| return json_str | |
| except: | |
| return json_str | |
| # ------------------------- | |
| # Utility: Clean Gemini JSON | |
| # ------------------------- | |
| def clean_gemini_json(raw_text: str) -> str: | |
| """ | |
| Removes markdown code fences and wrappers from Gemini output. | |
| """ | |
| if not raw_text: | |
| return raw_text | |
| cleaned = raw_text.strip() | |
| if cleaned.startswith("```"): | |
| first_line_end = cleaned.find("\n") | |
| if first_line_end != -1: | |
| cleaned = cleaned[first_line_end + 1 :] | |
| if cleaned.endswith("```"): | |
| cleaned = cleaned[: -3] | |
| first_brace = cleaned.find("{") | |
| if first_brace > 0: | |
| cleaned = cleaned[first_brace :] | |
| return cleaned.strip() | |
| # ------------------------- | |
| # File type detection | |
| # ------------------------- | |
| def detect_file_type(filename: str) -> str: | |
| name = (filename or "").lower() | |
| if name.endswith(".pdf"): | |
| return "pdf" | |
| if name.endswith(".docx"): | |
| return "docx" | |
| if name.endswith(".txt"): | |
| return "txt" | |
| return "unknown" | |
| # ------------------------- | |
| # Extract text, tables, and images (ENHANCED) | |
| # ------------------------- | |
| def extract_text_and_layout(file_bytes: bytes, kind: str): | |
| """ | |
| Returns (text, extracted_data) where extracted_data contains: | |
| - tables: list of table data | |
| - images: list of base64 encoded images with positions | |
| """ | |
| extracted_data = {"tables": [], "images": []} | |
| try: | |
| if kind == "pdf": | |
| doc = fitz.open(stream=file_bytes, filetype="pdf") | |
| texts = [] | |
| for page_num, page in enumerate(doc): | |
| # Extract text | |
| texts.append(page.get_text()) | |
| # Extract tables | |
| tables = page.find_tables() | |
| for table_idx, table in enumerate(tables): | |
| try: | |
| table_data = table.extract() | |
| if table_data: | |
| extracted_data["tables"].append({ | |
| "page": page_num + 1, | |
| "data": table_data, | |
| "position": f"page_{page_num + 1}_table_{table_idx + 1}" | |
| }) | |
| except: | |
| pass | |
| # Extract images | |
| image_list = page.get_images() | |
| for img_idx, img in enumerate(image_list): | |
| try: | |
| xref = img[0] | |
| base_image = doc.extract_image(xref) | |
| image_bytes = base_image["image"] | |
| image_base64 = base64.b64encode(image_bytes).decode() | |
| extracted_data["images"].append({ | |
| "page": page_num + 1, | |
| "data": image_base64, | |
| "ext": base_image["ext"], | |
| "position": f"page_{page_num + 1}_img_{img_idx + 1}" | |
| }) | |
| except: | |
| pass | |
| return "\n\n".join(texts), extracted_data | |
| elif kind == "docx": | |
| from docx import Document as DocReader | |
| doc = DocReader(io.BytesIO(file_bytes)) | |
| texts = [] | |
| # Extract paragraphs | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| texts.append(para.text) | |
| # Extract tables | |
| for table_idx, table in enumerate(doc.tables): | |
| table_data = [] | |
| for row in table.rows: | |
| row_data = [cell.text for cell in row.cells] | |
| table_data.append(row_data) | |
| if table_data: | |
| extracted_data["tables"].append({ | |
| "data": table_data, | |
| "position": f"table_{table_idx + 1}" | |
| }) | |
| # Extract images (inline shapes) | |
| for rel in doc.part.rels.values(): | |
| if "image" in rel.target_ref: | |
| try: | |
| image_bytes = rel.target_part.blob | |
| image_base64 = base64.b64encode(image_bytes).decode() | |
| extracted_data["images"].append({ | |
| "data": image_base64, | |
| "position": f"image_{len(extracted_data['images']) + 1}" | |
| }) | |
| except: | |
| pass | |
| return "\n\n".join(texts), extracted_data | |
| elif kind == "txt": | |
| return file_bytes.decode("utf-8", errors="ignore"), extracted_data | |
| else: | |
| return "", extracted_data | |
| except Exception as e: | |
| raise RuntimeError(f"Text extraction failed: {e}") | |
| # ------------------------- | |
| # Gemini formatting -> JSON layout (ENHANCED with tables & images) | |
| # ------------------------- | |
| def enhance_text_with_gemini(text: str, doc_type: str = "auto", user_prompt: str = "", | |
| extracted_data: Dict = None) -> str: | |
| """ | |
| Enhanced: Now includes table and image information in the prompt. | |
| """ | |
| if not GEMINI_API_KEYS: | |
| return json.dumps({"error": "No GEMINI_API_KEY configured"}) | |
| is_long_doc = len(text) > 10000 | |
| # Build user instructions | |
| user_instructions = "" | |
| if user_prompt.strip(): | |
| user_instructions = f""" | |
| USER INSTRUCTIONS: | |
| {user_prompt.strip()} | |
| IMPORTANT: Follow the user's instructions while maintaining the JSON format and document structure. | |
| """ | |
| # Add information about extracted tables and images | |
| extracted_info = "" | |
| if extracted_data: | |
| if extracted_data.get("tables"): | |
| extracted_info += f"\n\nDOCUMENT CONTAINS {len(extracted_data['tables'])} TABLES. Preserve and format them appropriately." | |
| if extracted_data.get("images"): | |
| extracted_info += f"\nDOCUMENT CONTAINS {len(extracted_data['images'])} IMAGES. Note their positions for reference." | |
| prompt = f"""You are a professional document formatter and editor. Analyze the INPUT TEXT and enhance it. | |
| INPUT TEXT: | |
| {text} | |
| {extracted_info} | |
| DOCUMENT TYPE: {doc_type} | |
| {user_instructions} | |
| TASK: | |
| 1) Analyze the document type (resume, cover letter, report, article, essay, notes, etc.) | |
| 2) {"Apply the user's specific instructions" if user_prompt.strip() else "Improve grammar, clarity, and professional tone while preserving all original information"} | |
| 3) Organize content with appropriate structure (headings, paragraphs, lists, tables) | |
| 4) Preserve emojis, special characters, and Unicode symbols | |
| 5) Output ONLY valid JSON following the schema below - NO markdown, NO commentary | |
| JSON SCHEMA: | |
| {{ | |
| "document": [ | |
| {{ "type": "heading", "level": 1, "text": "Main Title ✨" }}, | |
| {{ "type": "heading", "level": 2, "text": "Section Title" }}, | |
| {{ "type": "paragraph", "text": "Regular text with emojis 😊", "align": "left" }}, | |
| {{ "type": "bullet_list", "items": ["Item 1 ✓", "Item 2 ★"] }}, | |
| {{ "type": "number_list", "items": ["Step 1", "Step 2"] }}, | |
| {{ "type": "table", "rows": [["Header1", "Header2"], ["Value1", "Value2"]], "has_header": true }}, | |
| {{ "type": "image_placeholder", "position": "center", "caption": "Figure 1: Description" }} | |
| ] | |
| }} | |
| FORMATTING RULES: | |
| - Use level 1 heading for document title only | |
| - Use level 2 headings for major sections | |
| - PRESERVE all emojis, Unicode symbols (★, ✓, →, •, etc.), and special characters | |
| - {"Keep paragraphs BRIEF - combine similar content" if is_long_doc else "Keep paragraphs concise and well-structured"} | |
| - Use bullet_list for unordered items, number_list for sequences | |
| - Use "table" type for tabular data with "has_header": true/false | |
| - Use "image_placeholder" to mark where images should be inserted | |
| - {"IMPORTANT: For long documents, be concise - summarize repetitive sections" if is_long_doc else "Preserve ALL original content - do not omit information"} | |
| TABLE FORMATTING: | |
| - First row is typically headers (set "has_header": true) | |
| - Include all rows and columns from source | |
| - Preserve cell content including numbers, symbols, emojis | |
| EMOJI & SYMBOL SUPPORT: | |
| - Keep ALL emojis exactly as they appear (😊, 🎉, ❤️, etc.) | |
| - Preserve Unicode symbols (★, ✓, →, •, ©, ®, ™, etc.) | |
| - Maintain special characters (€, £, ¥, °, ±, etc.) | |
| DOCUMENT-SPECIFIC GUIDELINES: | |
| - **Resume/CV**: Name (h1), Contact (center paragraph), Summary, Skills, Experience, Education, Certifications | |
| - **Cover Letter**: Your Info, Date, Recipient Info, Salutation, Body, Closing | |
| - **Report/Article**: Title (h1), Abstract, Introduction, Body Sections (h2), Tables, Figures, Conclusion | |
| - **Essay**: Title (h1, center), Author, Body paragraphs | |
| - **Notes/General**: Logical headings, preserve lists, tables, and special formatting | |
| IMPORTANT: | |
| - Return COMPLETE, VALID JSON only | |
| - Ensure all strings are properly closed with quotes | |
| - Escape special JSON characters properly | |
| - Ensure all brackets and braces are balanced | |
| - No trailing commas | |
| - No markdown fences | |
| - {"CRITICAL: Keep output under 30KB - be concise, combine similar sections" if is_long_doc else ""} | |
| """ | |
| payload = { | |
| "contents": [{"parts": [{"text": prompt}]}], | |
| "generationConfig": { | |
| "temperature": 0.3, | |
| "topP": 0.8, | |
| "topK": 40, | |
| } | |
| } | |
| # Try each API key in sequence | |
| last_error = None | |
| for idx, api_key in enumerate(GEMINI_API_KEYS): | |
| try: | |
| print(f"Trying Gemini API key #{idx + 1}/{len(GEMINI_API_KEYS)}...") | |
| res = requests.post( | |
| GEMINI_URL + f"?key={api_key}", | |
| json=payload, | |
| timeout=240 | |
| ) | |
| if res.status_code == 429: | |
| print(f"API key #{idx + 1} rate limited, trying next...") | |
| last_error = "Rate limit exceeded" | |
| continue | |
| if res.status_code == 403: | |
| print(f"API key #{idx + 1} quota exceeded or invalid, trying next...") | |
| last_error = "Quota exceeded or invalid API key" | |
| continue | |
| res.raise_for_status() | |
| resp_json = res.json() | |
| content = resp_json["candidates"][0]["content"]["parts"][0]["text"] | |
| if not content.rstrip().endswith("}") and not content.rstrip().endswith("]"): | |
| content = repair_truncated_json(content) | |
| print(f"✓ API key #{idx + 1} succeeded!") | |
| return content | |
| except requests.exceptions.Timeout: | |
| print(f"API key #{idx + 1} timed out, trying next...") | |
| last_error = "Request timed out" | |
| continue | |
| except requests.exceptions.RequestException as e: | |
| print(f"API key #{idx + 1} failed: {str(e)}, trying next...") | |
| last_error = str(e) | |
| continue | |
| except Exception as e: | |
| print(f"API key #{idx + 1} error: {str(e)}, trying next...") | |
| last_error = str(e) | |
| continue | |
| return json.dumps({ | |
| "error": f"All {len(GEMINI_API_KEYS)} API keys failed. Last error: {last_error}" | |
| }) | |
| # ------------------------- | |
| # Advanced DOCX builder (ENHANCED with tables, images, emojis) | |
| # ------------------------- | |
| def build_docx_from_design(layout_json_text: str, extracted_data: Dict = None) -> bytes: | |
| """ | |
| Enhanced: Now supports tables, images, emojis, and Unicode symbols. | |
| """ | |
| cleaned = clean_gemini_json(layout_json_text) | |
| if not cleaned.rstrip().endswith('}'): | |
| cleaned = repair_truncated_json(cleaned) | |
| try: | |
| data = json.loads(cleaned) | |
| except json.JSONDecodeError as e: | |
| try: | |
| last_valid = cleaned.rfind('}') | |
| if last_valid != -1: | |
| test_json = cleaned[:last_valid + 1] + ']}' | |
| data = json.loads(test_json) | |
| else: | |
| raise ValueError(f"Could not parse JSON: {e}\nContent preview: {cleaned[:500]}...") | |
| except: | |
| raise ValueError(f"Invalid layout JSON: {e}\nContent preview: {cleaned[:500]}...") | |
| except Exception as e: | |
| raise ValueError(f"Invalid layout JSON: {e}\nRaw: {cleaned[:1000]}") | |
| doc = Document() | |
| # Set page margins | |
| sec = doc.sections[0] | |
| sec.top_margin = Inches(0.6) | |
| sec.bottom_margin = Inches(0.6) | |
| sec.left_margin = Inches(0.7) | |
| sec.right_margin = Inches(0.7) | |
| # Default font (supports Unicode) | |
| try: | |
| style = doc.styles["Normal"] | |
| style.font.name = "Calibri" | |
| style.font.size = Pt(11) | |
| except Exception: | |
| pass | |
| def add_heading_text(text: str, level: int = 1): | |
| h = doc.add_heading(level=level) | |
| run = h.add_run(text or "") | |
| run.bold = True | |
| run.font.size = Pt(18 if level == 1 else 14 if level == 2 else 12) | |
| h.alignment = WD_ALIGN_PARAGRAPH.LEFT | |
| def add_paragraph_text(text: str, bold=False, italic=False, align="left"): | |
| p = doc.add_paragraph() | |
| r = p.add_run(text or "") | |
| r.bold = bool(bold) | |
| r.italic = bool(italic) | |
| if align == "center": | |
| p.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| elif align == "right": | |
| p.alignment = WD_ALIGN_PARAGRAPH.RIGHT | |
| elif align == "justify": | |
| p.alignment = WD_ALIGN_PARAGRAPH.JUSTIFY | |
| else: | |
| p.alignment = WD_ALIGN_PARAGRAPH.LEFT | |
| return p | |
| # Track image usage | |
| image_counter = 0 | |
| for block in data.get("document", []): | |
| btype = block.get("type", "").lower() | |
| if btype == "heading": | |
| lvl = int(block.get("level", 1)) | |
| add_heading_text(block.get("text", ""), level=min(max(lvl, 1), 9)) | |
| elif btype == "paragraph": | |
| add_paragraph_text( | |
| block.get("text", ""), | |
| bold=block.get("bold", False), | |
| italic=block.get("italic", False), | |
| align=block.get("align", "left"), | |
| ) | |
| elif btype == "bullet_list" or (btype == "list" and block.get("kind") == "bulleted"): | |
| items = block.get("items", []) or [] | |
| for item in items: | |
| text = item.get("text") if isinstance(item, dict) else str(item) | |
| p = doc.add_paragraph(text, style="List Bullet") | |
| p.paragraph_format.left_indent = Inches(0.25) | |
| p.paragraph_format.space_after = Pt(6) | |
| elif btype == "number_list" or (btype == "list" and block.get("kind") == "numbered"): | |
| items = block.get("items", []) or [] | |
| for item in items: | |
| text = item.get("text") if isinstance(item, dict) else str(item) | |
| p = doc.add_paragraph(text, style="List Number") | |
| p.paragraph_format.left_indent = Inches(0.25) | |
| p.paragraph_format.space_after = Pt(6) | |
| elif btype == "table": | |
| rows = block.get("rows", []) or [] | |
| if not rows: | |
| continue | |
| cols = len(rows[0]) if rows else 0 | |
| if cols == 0: | |
| continue | |
| table = doc.add_table(rows=len(rows), cols=cols) | |
| table.style = "Table Grid" | |
| has_header = block.get("has_header", True) | |
| for r_idx, row in enumerate(rows): | |
| for c_idx, cell in enumerate(row): | |
| if isinstance(cell, dict): | |
| text = str(cell.get("text", "")) | |
| else: | |
| text = str(cell) | |
| cell_obj = table.rows[r_idx].cells[c_idx] | |
| cell_obj.text = text | |
| # Bold header row | |
| if has_header and r_idx == 0: | |
| for p in cell_obj.paragraphs: | |
| for run in p.runs: | |
| run.bold = True | |
| elif btype == "image_placeholder" or btype == "image": | |
| # Insert image from extracted data | |
| if extracted_data and extracted_data.get("images"): | |
| if image_counter < len(extracted_data["images"]): | |
| try: | |
| img_data = extracted_data["images"][image_counter] | |
| image_bytes = base64.b64decode(img_data["data"]) | |
| image_stream = io.BytesIO(image_bytes) | |
| # Create paragraph for image | |
| p = doc.add_paragraph() | |
| align = block.get("position", "center") | |
| if align == "center": | |
| p.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| elif align == "right": | |
| p.alignment = WD_ALIGN_PARAGRAPH.RIGHT | |
| else: | |
| p.alignment = WD_ALIGN_PARAGRAPH.LEFT | |
| # Add image | |
| run = p.add_run() | |
| run.add_picture(image_stream, width=Inches(4)) | |
| # Add caption if provided | |
| caption = block.get("caption", "") | |
| if caption: | |
| cap_para = doc.add_paragraph(caption) | |
| cap_para.alignment = p.alignment | |
| for run in cap_para.runs: | |
| run.font.size = Pt(9) | |
| run.font.italic = True | |
| image_counter += 1 | |
| except Exception as e: | |
| print(f"Failed to insert image: {e}") | |
| # Add placeholder text | |
| doc.add_paragraph(f"[Image: {block.get('caption', 'Figure')}]") | |
| else: | |
| # No image data available, add placeholder | |
| doc.add_paragraph(f"[Image: {block.get('caption', 'Figure')}]") | |
| elif btype == "page_break": | |
| doc.add_page_break() | |
| elif btype == "section_break": | |
| new_sec = doc.add_section() | |
| margins = block.get("margins_pt", {}) | |
| try: | |
| if margins: | |
| top = margins.get("top") | |
| bottom = margins.get("bottom") | |
| left = margins.get("left") | |
| right = margins.get("right") | |
| if top: new_sec.top_margin = Pt(float(top)) | |
| if bottom: new_sec.bottom_margin = Pt(float(bottom)) | |
| if left: new_sec.left_margin = Pt(float(left)) | |
| if right: new_sec.right_margin = Pt(float(right)) | |
| except Exception: | |
| pass | |
| else: | |
| # Unknown block: add as paragraph | |
| add_paragraph_text(str(block)) | |
| # Finalize to bytes | |
| out = io.BytesIO() | |
| doc.save(out) | |
| out.seek(0) | |
| return out.getvalue() | |
| # ------------------------- | |
| # FastAPI routes | |
| # ------------------------- | |
| async def route_extract(file: UploadFile = File(...)): | |
| """Extract plain text, tables, and images from uploaded document.""" | |
| try: | |
| fb = await file.read() | |
| kind = detect_file_type(file.filename) | |
| text, extracted_data = extract_text_and_layout(fb, kind) | |
| return { | |
| "text": text, | |
| "tables_count": len(extracted_data.get("tables", [])), | |
| "images_count": len(extracted_data.get("images", [])) | |
| } | |
| except Exception as e: | |
| traceback.print_exc() | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def route_enhance( | |
| file: UploadFile = File(...), | |
| doc_type: str = "auto", | |
| prompt: str = "" | |
| ): | |
| """ | |
| Enhanced document processor with table, image, and emoji support. | |
| """ | |
| try: | |
| fb = await file.read() | |
| kind = detect_file_type(file.filename) | |
| text, extracted_data = extract_text_and_layout(fb, kind) | |
| if not text.strip(): | |
| return JSONResponse({"error": "No text extracted from document"}, status_code=400) | |
| # Enhanced: Pass extracted data to Gemini | |
| raw = enhance_text_with_gemini(text, doc_type, prompt, extracted_data) | |
| cleaned = clean_gemini_json(raw) | |
| try: | |
| test_parse = json.loads(cleaned) | |
| if "error" in test_parse: | |
| return JSONResponse({"error": test_parse["error"]}, status_code=500) | |
| except: | |
| pass | |
| # Enhanced: Pass extracted data to DOCX builder | |
| docx_bytes = build_docx_from_design(cleaned, extracted_data) | |
| safe_filename = sanitize_filename(f"Enhanced_{file.filename or 'document.docx'}") | |
| return StreamingResponse( | |
| io.BytesIO(docx_bytes), | |
| media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| headers={ | |
| "Content-Disposition": f'attachment; filename="{safe_filename}"' | |
| }, | |
| ) | |
| except ValueError as ve: | |
| traceback.print_exc() | |
| return JSONResponse({"error": f"JSON parsing error: {str(ve)}"}, status_code=400) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def route_preview( | |
| file: UploadFile = File(...), | |
| doc_type: str = "auto", | |
| prompt: str = "" | |
| ): | |
| """ | |
| Preview with table and image information. | |
| """ | |
| try: | |
| fb = await file.read() | |
| kind = detect_file_type(file.filename) | |
| text, extracted_data = extract_text_and_layout(fb, kind) | |
| if not text.strip(): | |
| return JSONResponse({"error": "No text extracted"}, status_code=400) | |
| raw = enhance_text_with_gemini(text, doc_type, prompt, extracted_data) | |
| cleaned = clean_gemini_json(raw) | |
| return { | |
| "layout_json": json.loads(cleaned), | |
| "extracted_data": { | |
| "tables_count": len(extracted_data.get("tables", [])), | |
| "images_count": len(extracted_data.get("images", [])) | |
| } | |
| } | |
| except Exception as e: | |
| traceback.print_exc() | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def route_design( | |
| file: UploadFile = File(...), | |
| doc_type: str = "auto", | |
| prompt: str = "" | |
| ): | |
| """Legacy endpoint with enhanced features.""" | |
| try: | |
| fb = await file.read() | |
| kind = detect_file_type(file.filename) | |
| text, extracted_data = extract_text_and_layout(fb, kind) | |
| if not text.strip(): | |
| return JSONResponse({"error": "No text extracted"}, status_code=400) | |
| raw = enhance_text_with_gemini(text, doc_type, prompt, extracted_data) | |
| cleaned = clean_gemini_json(raw) | |
| docx_bytes = build_docx_from_design(cleaned, extracted_data) | |
| safe_filename = sanitize_filename(f"Professional_{file.filename or 'document.docx'}") | |
| return StreamingResponse( | |
| io.BytesIO(docx_bytes), | |
| media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| headers={ | |
| "Content-Disposition": f'attachment; filename="{safe_filename}"' | |
| }, | |
| ) | |
| except ValueError as ve: | |
| return JSONResponse({"error": str(ve)}, status_code=400) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def route_full( | |
| file: UploadFile = File(...), | |
| doc_type: str = "auto", | |
| prompt: str = "" | |
| ): | |
| """Full pipeline with enhanced features.""" | |
| try: | |
| fb = await file.read() | |
| kind = detect_file_type(file.filename) | |
| text, extracted_data = extract_text_and_layout(fb, kind) | |
| if not text.strip(): | |
| return JSONResponse({"error": "No text extracted"}, status_code=400) | |
| raw = enhance_text_with_gemini(text, doc_type, prompt, extracted_data) | |
| cleaned = clean_gemini_json(raw) | |
| docx_bytes = build_docx_from_design(cleaned, extracted_data) | |
| safe_filename = sanitize_filename(f"Enhanced_{file.filename or 'document.docx'}") | |
| return StreamingResponse( | |
| io.BytesIO(docx_bytes), | |
| media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| headers={ | |
| "Content-Disposition": f'attachment; filename="{safe_filename}"' | |
| }, | |
| ) | |
| except ValueError as ve: | |
| return JSONResponse({"error": str(ve)}, status_code=400) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return JSONResponse({"error": str(e)}, status_code=500) | |
| async def route_add_signature( | |
| file: UploadFile = File(...), | |
| signature: str = Form(...), | |
| position: str = Form("bottom-right"), | |
| signer_name: Optional[str] = Form(None) | |
| ): | |
| """ | |
| Add a signature image to an existing DOCX file. | |
| Parameters: | |
| - file: DOCX file to sign | |
| - signature: Base64 encoded signature image (PNG/JPEG) | |
| - position: Where to place signature (bottom-right, bottom-center, bottom-left) | |
| - signer_name: Optional name to display under signature | |
| """ | |
| try: | |
| # Read the DOCX file | |
| docx_bytes = await file.read() | |
| doc = Document(io.BytesIO(docx_bytes)) | |
| # Decode signature image from base64 | |
| try: | |
| # Remove data URI prefix if present (data:image/png;base64,...) | |
| if ',' in signature: | |
| signature = signature.split(',', 1)[1] | |
| signature_bytes = base64.b64decode(signature) | |
| signature_image = io.BytesIO(signature_bytes) | |
| # Validate it's a valid image | |
| img = Image.open(signature_image) | |
| signature_image.seek(0) # Reset stream position | |
| except Exception as e: | |
| return JSONResponse({"error": f"Invalid signature image: {str(e)}"}, status_code=400) | |
| # Add signature at the end of document | |
| doc.add_paragraph() | |
| # Create signature paragraph | |
| sig_paragraph = doc.add_paragraph() | |
| # Set alignment based on position | |
| if position == "bottom-center": | |
| sig_paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER | |
| elif position == "bottom-left": | |
| sig_paragraph.alignment = WD_ALIGN_PARAGRAPH.LEFT | |
| else: # bottom-right (default) | |
| sig_paragraph.alignment = WD_ALIGN_PARAGRAPH.RIGHT | |
| # Add signature image (resize to reasonable size) | |
| run = sig_paragraph.add_run() | |
| run.add_picture(signature_image, width=Inches(1.5)) | |
| # Add signer name if provided | |
| if signer_name and signer_name.strip(): | |
| name_paragraph = doc.add_paragraph() | |
| name_paragraph.alignment = sig_paragraph.alignment | |
| name_run = name_paragraph.add_run(signer_name.strip()) | |
| name_run.font.size = Pt(10) | |
| name_run.font.italic = True | |
| # Save to bytes | |
| output = io.BytesIO() | |
| doc.save(output) | |
| output.seek(0) | |
| safe_filename = sanitize_filename(f"Signed_{file.filename or 'document.docx'}") | |
| return StreamingResponse( | |
| output, | |
| media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document", | |
| headers={ | |
| "Content-Disposition": f'attachment; filename="{safe_filename}"' | |
| }, | |
| ) | |
| except Exception as e: | |
| traceback.print_exc() | |
| return JSONResponse({"error": f"Failed to add signature: {str(e)}"}, status_code=500) | |
| # ------------------------- | |
| # Root | |
| # ------------------------- | |
| def root(): | |
| return { | |
| "service": "Universal Document Enhancer v6", | |
| "status": "ok", | |
| "description": "AI-powered document formatter with table, image, emoji, and icon support", | |
| "supported_types": ["Resume/CV", "Cover Letter", "Report", "Article", "Essay", "Notes", "Any text document"], | |
| "new_features": [ | |
| "✓ Table extraction and formatting", | |
| "✓ Image extraction and insertion", | |
| "✓ Emoji and Unicode symbol preservation (😊, ★, ✓, →)", | |
| "✓ Enhanced formatting with icons", | |
| "✓ Multi-column table support", | |
| "✓ Image captions and positioning" | |
| ], | |
| "endpoints": { | |
| "/extract": "Extract text, tables, and images from document", | |
| "/enhance": "Full pipeline: extract + AI enhancement + DOCX with tables/images (RECOMMENDED)", | |
| "/add-signature": "Add signature to existing DOCX file", | |
| "/preview": "Preview JSON layout with table/image counts", | |
| "/design": "Same as /enhance (legacy)", | |
| "/full": "Same as /enhance (legacy)" | |
| }, | |
| "usage": { | |
| "basic": "POST /enhance with file upload", | |
| "with_prompt": "POST /enhance?prompt=your_instructions&doc_type=auto", | |
| "supported_content": [ | |
| "Text with emojis (😊🎉❤️)", | |
| "Unicode symbols (★✓→•©®™)", | |
| "Special chars (€£¥°±)", | |
| "Tables (with headers)", | |
| "Images (inline)", | |
| "Lists (bullet/numbered)", | |
| "Headers and formatting" | |
| ] | |
| } | |
| } | |
| def health(): | |
| return { | |
| "status": "healthy", | |
| "api_keys_configured": len(GEMINI_API_KEYS), | |
| "version": "6.0 - Enhanced with Tables, Images, Emojis & Icons", | |
| "features": { | |
| "tables": "✓ Supported", | |
| "images": "✓ Supported", | |
| "emojis": "✓ Supported", | |
| "unicode": "✓ Supported", | |
| "multi_api_keys": f"✓ {len(GEMINI_API_KEYS)} keys configured" | |
| } | |
| } | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |