Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, File, UploadFile, HTTPException, Form | |
| from fastapi.responses import FileResponse | |
| from fastapi.staticfiles import StaticFiles | |
| import shutil | |
| from pathlib import Path | |
| from transformers import ( | |
| pipeline, | |
| M2M100Tokenizer, | |
| M2M100ForConditionalGeneration, | |
| BartTokenizer, | |
| BlipProcessor, BlipForConditionalGeneration, | |
| AutoModelForCausalLM, AutoTokenizer | |
| ) | |
| from utils import extract_text, save_file, verify_summary, ensure_complete_sentences | |
| from langdetect import detect, DetectorFactory | |
| from langcodes import Language | |
| import torch | |
| from PIL import Image | |
| import os | |
| import pytesseract | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| import hashlib | |
| import re | |
| from concurrent.futures import ThreadPoolExecutor | |
| app = FastAPI() | |
| # Hugging Face models | |
| summarizer = pipeline("summarization", model="facebook/bart-large-cnn") | |
| summary_tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn") | |
| processor = BlipProcessor.from_pretrained("Salesforce/blip-image-captioning-large", use_fast=True) | |
| interpretation_model = BlipForConditionalGeneration.from_pretrained("Salesforce/blip-image-captioning-large") | |
| translation_tokenizer = M2M100Tokenizer.from_pretrained("facebook/m2m100_1.2B") | |
| translation_model = M2M100ForConditionalGeneration.from_pretrained("facebook/m2m100_1.2B") | |
| question_answering = pipeline("question-answering", model="bert-large-uncased-whole-word-masking-finetuned-squad") | |
| visual_question_answering = pipeline("visual-question-answering", model="dandelin/vilt-b32-finetuned-vqa") | |
| DetectorFactory.seed = 0 | |
| # Directory to store uploaded and processed files | |
| UPLOAD_DIR = Path("uploads") | |
| PROCESSED_DIR = Path("processed") | |
| UPLOAD_DIR.mkdir(exist_ok=True) | |
| PROCESSED_DIR.mkdir(exist_ok=True) | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| # Hugging Face Token | |
| API_TOKEN = os.environ.get("HF_TOKEN") | |
| if not API_TOKEN: | |
| raise ValueError("HUGGINGFACE_API_TOKEN environment variable not set.") | |
| code_generation_tokenizer = AutoTokenizer.from_pretrained("Qwen/Qwen2.5-Coder-1.5B-Instruct", token=API_TOKEN) | |
| code_generation_model = AutoModelForCausalLM.from_pretrained("Qwen/Qwen2.5-Coder-1.5B-Instruct", token=API_TOKEN) | |
| code_generation_tokenizer.pad_token_id = code_generation_tokenizer.eos_token_id | |
| code_generation_generator = pipeline("text-generation", model="Qwen/Qwen2.5-Coder-1.5B-Instruct", tokenizer=code_generation_tokenizer, device=-1) | |
| # Uploading Static files | |
| app.mount("/assets", StaticFiles(directory="frontend/assets", html=True), name="assets") | |
| app.mount("/images", StaticFiles(directory="frontend/images", html=True), name="images") | |
| app.mount("/processed", StaticFiles(directory="processed"), name="processed") | |
| async def serve_frontend(): | |
| return FileResponse("frontend/index.html") | |
| # List processed files | |
| async def list_processed_files(): | |
| files = [f.name for f in PROCESSED_DIR.iterdir() if f.is_file()] | |
| return {"files": files} | |
| # Download a processed file | |
| async def download_file(filename: str): | |
| file_path = PROCESSED_DIR / filename | |
| if not file_path.exists(): | |
| raise HTTPException(status_code=404, detail="File not found") | |
| return FileResponse(file_path, filename=filename) | |
| def split_text(text, max_words=1000): | |
| words = text.split() | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| for word in words: | |
| current_chunk.append(word) | |
| current_length += 1 | |
| if current_length >= max_words: | |
| chunks.append(" ".join(current_chunk)) | |
| current_chunk = [] | |
| current_length = 0 | |
| if current_chunk: | |
| chunks.append(" ".join(current_chunk)) | |
| return chunks | |
| # Document & Image Analysis (Summarization & Interpretation) | |
| async def docsum_imginter(file: UploadFile = File(...), task: str = Form(...)): | |
| file_type = file.filename.split(".")[-1].lower() | |
| file_path = UPLOAD_DIR / file.filename | |
| output_filename = f"summarized_{file.filename}" | |
| output_path = PROCESSED_DIR / output_filename | |
| with open(file_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| if task.lower() == "summarize": | |
| text = extract_text(file_path, file_type) | |
| if not text or not text.strip(): | |
| raise HTTPException(400, "No text found in document") | |
| if len(text.strip().split()) < 150: | |
| raise HTTPException(400, "WallD thinks the file is too small for summarization - minimum 150 words",) | |
| text = text.encode("ascii", "ignore").decode("ascii") | |
| chunks = split_text(text) | |
| summaries = [] | |
| prompt = ( | |
| "Generate a concise, factual summary covering ALL key sections of the text. " | |
| "Include: main objectives, critical details, and outcomes if mentioned. " | |
| "Never include: contact information, website links, or promotional content. " | |
| "\n" | |
| "Text to summarize:\n{chunk}" | |
| ) | |
| for chunk in chunks: | |
| if not chunk.strip(): | |
| continue | |
| word_count = len(chunk.split()) | |
| max_length = min(max(int(word_count * 0.4), 150),512) | |
| summary_result = summarizer( | |
| prompt.format(chunk=chunk), | |
| max_length=max_length, | |
| min_length=max(150, int(max_length * 0.6)), | |
| do_sample=False, | |
| truncation=True, | |
| repetition_penalty=1.5, | |
| no_repeat_ngram_size=3, | |
| num_beams=4, | |
| length_penalty=1.0, | |
| ) | |
| if summary_result: | |
| raw_summary = summary_result[0]["summary_text"] | |
| verified = verify_summary(raw_summary, chunk) | |
| if verified: | |
| complete = ensure_complete_sentences(verified) | |
| summaries.append(complete) | |
| if not summaries: | |
| raise HTTPException(500, "Summary verification failed - no valid content extracted") | |
| full_summary = "\n".join(filter(None, summaries)) | |
| if len(summaries) > 1: | |
| full_summary = summarizer( | |
| f"Combine these partial summaries into one coherent paragraph:\n{full_summary}", | |
| max_length=512, | |
| )[0]["summary_text"] | |
| if not full_summary.strip(): | |
| sentences = [s.strip() for s in text.split(".") if s.strip()] | |
| full_summary = (". ".join(sentences[:3]) + "." if sentences else text[:500]) | |
| save_file(full_summary, file_type, output_path) | |
| return FileResponse(output_path, filename=output_filename) | |
| elif task.lower() == "interpret": | |
| try: | |
| with Image.open(file_path) as image: | |
| if image.mode != "RGB": | |
| image = image.convert("RGB") | |
| inputs = processor(images=image, return_tensors="pt") | |
| if inputs is None or "pixel_values" not in inputs: | |
| raise ValueError("Image processing failed: No valid inputs generated.") | |
| outputs = interpretation_model.generate(**inputs, repetition_penalty=1.2) | |
| if outputs is None: | |
| raise ValueError("Model generation failed: No outputs produced.") | |
| caption = processor.decode(outputs[0], skip_special_tokens=True) | |
| return {"caption": caption if caption else "No caption generated"} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Inference failed: {str(e)}") | |
| finally: | |
| if file_path.exists(): | |
| file_path.unlink() | |
| # Intelligent Question Answering | |
| def is_visual_question(question: str) -> bool: | |
| visual_keywords = [ | |
| "color", "describe", "what do you see", "how many", | |
| "is there", "are there", "what is in", "can you see" | |
| ] | |
| question = question.lower() | |
| return any(keyword in question for keyword in visual_keywords) | |
| async def ask(file: UploadFile = File(...), question: str = Form(...)): | |
| try: | |
| file_type = file.filename.split(".")[-1].lower() | |
| file_path = UPLOAD_DIR / file.filename | |
| with open(file_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| if file_type in ["docx", "xlsx", "pptx", "pdf", "txt"]: | |
| text = extract_text(file_path, file_type) | |
| elif file_type in ["png", "jpg", "jpeg", 'webp']: | |
| with Image.open(file_path) as image: | |
| if image.mode != 'RGB': | |
| image = image.convert('RGB') | |
| if is_visual_question(question): | |
| vqa_result = visual_question_answering(image, question, top_k=1)[0] | |
| return {"answer": vqa_result["answer"]} | |
| text = pytesseract.image_to_string(image) | |
| else: | |
| raise HTTPException(status_code=400, detail="Unsupported file type.") | |
| if not text: | |
| raise HTTPException(status_code=400,detail="The File doesn't contain any text.",) | |
| result = question_answering(question=question, context=text) | |
| return {"answer": result["answer"]} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error processing request. {str(e)}") | |
| finally: | |
| if file_path.exists(): | |
| file_path.unlink() | |
| # Data Visualization Code Generation | |
| async def visualization(file: UploadFile = File(...), request: str = Form(...)): | |
| file_path = UPLOAD_DIR / file.filename | |
| with open(file_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| try: | |
| df = pd.read_excel(file_path) | |
| if df.empty: | |
| raise ValueError("Excel file is empty.") | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Error reading Excel file: {str(e)}") | |
| input_text = f""" | |
| Given the DataFrame 'df' with columns {', '.join(df.columns)} and preview: | |
| {df.head().to_string()} | |
| Write Python code to: create {request} | |
| - Use ONLY 'df = pd.read_excel({file.filename})' (no external data loading like pd.read_csv or creating a new DataFrame). | |
| - Use pandas (pd), matplotlib.pyplot (plt), or seaborn (sns). | |
| - Include axis labels and a title. | |
| - Output ONLY executable Python code. Do NOT include triple quotes, prose, Markdown, or text like 'Hint', 'Solution', or 'Here is the code'. | |
| """ | |
| try: | |
| generated = code_generation_generator(input_text, max_new_tokens=500, num_return_sequences=1) | |
| generated_code = generated[0]["generated_text"].replace(input_text, "").strip() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error querying model: {str(e)}") | |
| if not generated_code.strip(): | |
| raise HTTPException(status_code=500, detail="No code generated by the AI model.") | |
| code_block_pattern = r"```python\n(.*?)(\n```|\Z)" | |
| matches = list(re.finditer(code_block_pattern, generated_code, re.DOTALL)) | |
| if matches: | |
| raw_code_block = matches[0].group(1).strip() | |
| executable_code = raw_code_block | |
| else: | |
| raise HTTPException(status_code=500, detail="No valid Python code block found in generated output.") | |
| executable_code = "\n".join( | |
| line.strip() for line in executable_code.splitlines() | |
| if line.strip() and | |
| not any(kw in line for kw in ["pd.read_csv", "pd.read_excel", "plt.show", "df ="]) | |
| ).strip() | |
| display_code = "\n".join( | |
| line.strip() for line in raw_code_block.splitlines() | |
| if line.strip() | |
| ).strip() | |
| if not executable_code: | |
| raise HTTPException(status_code=500, detail="Generated code was invalid (e.g., included data loading, df redefinition, or was empty).") | |
| plot_hash = hashlib.md5(f"{file.filename}_{request}".encode()).hexdigest()[:8] | |
| plot_filename = f"plot_{plot_hash}.png" | |
| plot_path = PROCESSED_DIR / plot_filename | |
| try: | |
| exec_globals = {"pd": pd, "plt": plt, "sns": sns, "df": df} | |
| exec(executable_code, exec_globals) | |
| plt.savefig(plot_path, bbox_inches="tight") | |
| plt.close() | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error executing code: {str(e)}") | |
| finally: | |
| if file_path.exists(): | |
| file_path.unlink() | |
| if not plot_path.exists(): | |
| raise HTTPException(status_code=500, detail="Plot file was not created.") | |
| # Return the file response | |
| return {"code": display_code, "image_path": plot_path} | |
| # Text Translation | |
| def split_tran_text_trans(text, max_chunk_size=800): | |
| chunks = [] | |
| current_chunk = [] | |
| current_length = 0 | |
| paragraphs = text.split("\n\n") | |
| for para in paragraphs: | |
| para = para.strip() | |
| if not para: | |
| continue | |
| words = len(para.split()) | |
| if current_length + words <= max_chunk_size: | |
| current_chunk.append(para) | |
| current_length += words | |
| else: | |
| if current_chunk: | |
| chunks.append("\n\n".join(current_chunk)) | |
| current_chunk = [para] | |
| current_length = words | |
| if current_chunk: | |
| chunks.append("\n\n".join(current_chunk)) | |
| return chunks | |
| async def translate_document(file: UploadFile = File(...), target_language: str = Form(...)): | |
| file_type = file.filename.split(".")[-1].lower() | |
| file_path = UPLOAD_DIR / file.filename | |
| output_filename = f"translated_to_{target_language}_{file.filename}" | |
| output_path = PROCESSED_DIR / output_filename | |
| with open(file_path, "wb") as f: | |
| shutil.copyfileobj(file.file, f) | |
| try: | |
| text = extract_text(file_path, file_type) | |
| source_language = detect(text[:1000]) | |
| tr_language = Language.find(target_language).language | |
| source_language = { | |
| "en": "en", "fr": "fr", "es": "es", "de": "de", | |
| "ar": "ar", "zh": "zh", "ja": "ja", "ru": "ru", | |
| }.get(source_language, source_language) | |
| supported_languages = translation_tokenizer.lang_code_to_id.keys() | |
| if source_language not in supported_languages: | |
| raise HTTPException(400, f"Unsupported source language: {Language.get(source_language).display_name()}") | |
| if tr_language not in supported_languages: | |
| raise HTTPException(400, f"Unsupported target language: {target_language}") | |
| chunks = split_tran_text_trans(text) | |
| translated_chunks = [] | |
| translation_tokenizer.src_lang = source_language | |
| def translate_chunk(chunk): | |
| try: | |
| inputs = translation_tokenizer(chunk, return_tensors="pt", truncation=True, max_length=800) | |
| generated_tokens = translation_model.generate( | |
| **inputs, | |
| forced_bos_token_id=translation_tokenizer.get_lang_id(tr_language), | |
| max_length=1000 | |
| ) | |
| return translation_tokenizer.decode(generated_tokens[0], skip_special_tokens=True) | |
| except Exception as e: | |
| print(f"Error translating chunk: {str(e)}") | |
| return chunk | |
| with ThreadPoolExecutor() as executor: | |
| translated_chunks = list(executor.map(translate_chunk, chunks)) | |
| translated_text = "\n\n".join(translated_chunks) | |
| save_file(translated_text, file_type, output_path) | |
| return FileResponse(output_path, filename=output_filename) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| if file_path.exists(): | |
| file_path.unlink() |