Spaces:
Build error
Build error
| import gradio as gr | |
| import pdfplumber | |
| import docx | |
| from transformers import pipeline, AutoTokenizer, AutoModelForCausalLM | |
| from huggingface_hub import login | |
| from PIL import Image | |
| import pytesseract | |
| import torch | |
| import os | |
| import spaces | |
| # π Authenticate Hugging Face token | |
| login(token=os.environ.get("token")) | |
| # β Ensure GPU is available | |
| if not torch.cuda.is_available(): | |
| raise RuntimeError("β GPU not detected! Please enable GPU in Space settings.") | |
| print(f"β Using GPU: {torch.cuda.get_device_name(0)}") | |
| # π§ Model | |
| model_id = "mistralai/Mistral-7B-Instruct-v0.2" | |
| # π Document extractors | |
| def extract_text_from_pdf(file): | |
| text = "" | |
| with pdfplumber.open(file) as pdf: | |
| for page in pdf.pages: | |
| page_text = page.extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| else: | |
| img = page.to_image(resolution=300).original | |
| ocr_text = pytesseract.image_to_string(img) | |
| text += ocr_text + "\n" | |
| return text | |
| def extract_text_from_docx(file): | |
| doc = docx.Document(file) | |
| return "\n".join([para.text for para in doc.paragraphs if para.text.strip() != ""]) | |
| def chunk_text(text, max_chars=6000): | |
| paragraphs = text.split("\n") | |
| chunks, current_chunk = [], "" | |
| for para in paragraphs: | |
| if len(current_chunk) + len(para) < max_chars: | |
| current_chunk += para + "\n" | |
| else: | |
| chunks.append(current_chunk) | |
| current_chunk = para + "\n" | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| return chunks | |
| # π§Ύ Q&A Prompt Template | |
| def create_prompt(text_chunk): | |
| return f""" | |
| You are an expert in analyzing U.S. government tender documents. Based only on the content provided below, answer the following 20 standard questions in Q&A format. If something is not mentioned, write "Not mentioned in the provided document." | |
| CONTENT: | |
| {text_chunk} | |
| Now provide answers for: | |
| Q1: What is the general scope of the tender? | |
| Q2: Are certifications like SAM, CMMI, ISO, SBA, 8(a), or GSA required? | |
| Q3: Is there a Set-aside status (e.g., 8a, SDVOSB)? | |
| Q4: Are U.S. citizens or security-cleared staff required? | |
| Q5: What is the expected team size or key qualifications? | |
| Q6: Are offshore resources allowed? | |
| Q7: What is the mode of working (On-site/Remote/Hybrid)? | |
| Q8: Is presence in specific regions/states required? | |
| Q9: Is the delivery location defined? | |
| Q10: Is remote or offshore delivery allowed? | |
| Q11: Is a U.S. office presence required? | |
| Q12: Are travel/lodging expenses reimbursable? | |
| Q13: Are cybersecurity frameworks (FedRAMP, NIST, HIPAA) required? | |
| Q14: Are background checks or security clearance needed? | |
| Q15: Is past experience required? | |
| Q16: How many references are required? | |
| Q17: Are only U.S. references accepted? | |
| Q18: Is private sector experience allowed? | |
| Q19: Do references need to be identified? | |
| Q20: Is subcontracting permitted? | |
| Answer clearly and in the same format: | |
| Q1: ... | |
| A1: ... | |
| Q2: ... | |
| A2: ... | |
| ... | |
| """ | |
| # π§Ό Cleaner | |
| def clean_output(raw_output): | |
| lines = raw_output.splitlines() | |
| cleaned_lines = [] | |
| started = False | |
| for line in lines: | |
| if line.strip().startswith("Q1:"): | |
| started = True | |
| if started: | |
| cleaned_lines.append(line) | |
| stop_idx = len(cleaned_lines) | |
| for i, line in enumerate(cleaned_lines[5:], 5): | |
| if "CONTENT:" in line or "You are an expert" in line: | |
| stop_idx = i | |
| break | |
| return "\n".join(cleaned_lines[:stop_idx]).strip() | |
| # π Main analysis function | |
| def analyze_document(file, cancel_flag): | |
| ext = os.path.splitext(file.name)[-1].lower() | |
| if ext == ".pdf": | |
| raw_text = extract_text_from_pdf(file) | |
| elif ext == ".docx": | |
| raw_text = extract_text_from_docx(file) | |
| else: | |
| return "β Unsupported file format. Please upload a PDF or DOCX.", "β Invalid format" | |
| if len(raw_text.strip()) == 0: | |
| return "β No text found in the document.", "β Empty document" | |
| chunks = chunk_text(raw_text) | |
| full_summary = "" | |
| tokenizer = AutoTokenizer.from_pretrained(model_id, token=os.environ.get("token")) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_id, | |
| device_map="auto", | |
| torch_dtype=torch.float16, | |
| token=os.environ.get("token"), | |
| trust_remote_code=True | |
| ) | |
| generator = pipeline("text-generation", model=model, tokenizer=tokenizer) | |
| for i, chunk in enumerate(chunks): | |
| if cancel_flag: | |
| return "β Analysis cancelled by user.", "β Terminated by user" | |
| status_msg = f"π Processing chunk {i+1} of {len(chunks)}..." | |
| prompt = create_prompt(chunk) | |
| result = generator(prompt, max_new_tokens=1024, do_sample=False)[0]["generated_text"] | |
| cleaned = clean_output(result) | |
| full_summary += cleaned + "\n\n---\n\n" | |
| return full_summary.strip(), "β Completed" | |
| # π Gradio Interface | |
| with gr.Blocks(title="Smart Tender Analyzer - US Edition") as demo: | |
| gr.Markdown("## π Document Analyzer β Extract important information using Transformer (GPU-Accelerated)") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| file_input = gr.File(label="π Upload Tender Document (PDF/DOCX)") | |
| with gr.Row(): | |
| analyze_button = gr.Button("π Analyze", variant="primary") | |
| terminate_button = gr.Button("β Terminate", variant="stop") | |
| status_box = gr.Textbox(label="π Status", value="β³ Waiting for input...", interactive=False) | |
| with gr.Column(scale=2): | |
| output_box = gr.Textbox(label="π§ Extracted Tender key information", lines=30, interactive=False) | |
| cancel_flag = gr.State(False) | |
| analyze_button.click( | |
| fn=analyze_document, | |
| inputs=[file_input, cancel_flag], | |
| outputs=[output_box, status_box] | |
| ) | |
| terminate_button.click( | |
| fn=lambda: gr.update(value=True), | |
| inputs=[], | |
| outputs=[cancel_flag] | |
| ) | |
| demo.launch(server_name="0.0.0.0", server_port=7860, share=True) | |