Spaces:
Running
Running
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import FileResponse | |
| import tempfile, os, json, shutil, math, re | |
| from extractor import PDFExtractor | |
| from agree_calculator import AGREECalculator | |
| from nqs_calculator import NQSCalculator | |
| from complexmogapi_calculator import ComplexMoGAPICalculator, get_param_definitions | |
| from eco_scale_calculator import EcoScaleCalculator | |
| from cafri_calculator import CaFRICalculator | |
| app = FastAPI(title="AGREE API") | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=False, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Temporary storage for session files | |
| UPLOAD_DIR = tempfile.mkdtemp(prefix="agree_") | |
| # ───────────────────────────────────────────────────────────── | |
| # Academic PDF boilerplate patterns — compiled once at startup. | |
| # Used by clean_paper_text() to strip noise before LLM calls. | |
| _BOILERPLATE_PATTERNS = [ | |
| # ─ Page numbers (standalone or decorated) ───────────────────────── | |
| re.compile(r'^\s*\d{1,4}\s*$', re.MULTILINE), # bare page numbers | |
| re.compile(r'^\s*[-–—]\s*\d{1,4}\s*[-–—]\s*$', re.MULTILINE), # – 12 – | |
| re.compile(r'^\s*Page\s+\d+\s*(of\s+\d+)?\s*$', re.MULTILINE | re.IGNORECASE), | |
| # ─ DOI / URL lines ───────────────────────────────────── | |
| re.compile(r'^\s*https?://\S+\s*$', re.MULTILINE), | |
| re.compile(r'^\s*doi\s*:\s*\S+\s*$', re.MULTILINE | re.IGNORECASE), | |
| re.compile(r'^\s*DOI:\s*10\.\d{4,}/\S+\s*$', re.MULTILINE | re.IGNORECASE), | |
| # ─ Journal / volume / issue / received-accepted metadata ──────── | |
| re.compile( | |
| r'^.{0,120}(Volume|Vol\.?|Issue|No\.?|pp\.?|Pages?)\s+\d.{0,60}$', | |
| re.MULTILINE | re.IGNORECASE | |
| ), | |
| re.compile( | |
| r'^.{0,120}(Received|Accepted|Revised|Published|Available online).{0,80}$', | |
| re.MULTILINE | re.IGNORECASE | |
| ), | |
| re.compile( | |
| r'^.{0,120}(Journal of|Analytica|Talanta|Analyst|Chemosphere|Chromatogr|Spectrochim|Microchem|TrAC|Molecules|Int\.? J\.?).{0,120}$', | |
| re.MULTILINE | re.IGNORECASE | |
| ), | |
| # ─ Copyright / licence / publisher notices ─────────────────── | |
| re.compile( | |
| r'^.{0,200}(\u00a9|Copyright|All rights reserved|Elsevier|Springer|Wiley|ACS Publications|Royal Society|Taylor & Francis|MDPI|BMC|Creative Commons|CC BY).{0,200}$', | |
| re.MULTILINE | re.IGNORECASE | |
| ), | |
| # ─ Author affiliation blocks (lines starting with superscript-like patterns) ─ | |
| re.compile(r'^\s*[1-9a-z,;\*\u2020\u2021\u00a7]{1,4}\s+(Department|School|Faculty|Institute|University|College|Laboratory|Center|Centre|Division).{0,200}$', | |
| re.MULTILINE | re.IGNORECASE), | |
| re.compile(r'^\s*(E-?mail|Correspondence|\*\s*Corresponding author).{0,200}$', | |
| re.MULTILINE | re.IGNORECASE), | |
| # ─ ORCID / CRediT / funding statement lines ───────────────── | |
| re.compile(r'^.{0,200}(ORCID|CRediT|Author contribution|Funding|Acknowledgem|Declaration of competing interest|Conflict of interest).{0,200}$', | |
| re.MULTILINE | re.IGNORECASE), | |
| re.compile(r'^\s*\d{4}-\d{4}-\d{4}-\d{4}\s*$', re.MULTILINE), # raw ORCID numbers | |
| # ─ Running headers / footers (short lines repeated near page boundaries) ─ | |
| # Catch lines that are pure UPPERCASE and very short (typical running headers) | |
| re.compile(r'^\s*[A-Z][A-Z \-&:]{5,60}\s*$', re.MULTILINE), | |
| # ─ Reference list entries (numbered citations at end of paper) ───── | |
| # Lines starting with [n] or n. followed by author initials pattern | |
| re.compile(r'^\s*\[\d{1,3}\]\s+[A-Z][a-z]*.{0,300}$', re.MULTILINE), | |
| re.compile(r'^\s*\d{1,3}\.\s+[A-Z][a-z]{0,20},\s+[A-Z]\.?.{0,300}$', re.MULTILINE), | |
| ] | |
| def clean_paper_text(text: str) -> str: | |
| """ | |
| Strip common academic PDF boilerplate from extracted text before sending | |
| to an LLM. Reduces token count without losing experimental/methods content. | |
| Returns cleaned text with normalised whitespace. | |
| """ | |
| # Step 1: Apply all regex patterns | |
| for pat in _BOILERPLATE_PATTERNS: | |
| text = pat.sub('', text) | |
| # Step 2: Collapse runs of blank lines to a single blank line | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| # Step 3: Drop lines that are too short to carry scientific meaning | |
| # (e.g. stray page-header fragments, bullet symbols, lone punctuation) | |
| lines = text.split('\n') | |
| kept = [] | |
| for line in lines: | |
| stripped = line.strip() | |
| # Keep blank lines (paragraph separators) | |
| if not stripped: | |
| kept.append('') | |
| continue | |
| # Drop very short lines that aren\'t list items or section headings | |
| if len(stripped) < 8 and not re.match(r'^(\d+\.?|[-•–])', stripped): | |
| continue | |
| kept.append(line) | |
| text = '\n'.join(kept) | |
| # Step 4: Final whitespace normalisation | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| return text.strip() | |
| # ───────────────────────────────────────────────────────────── | |
| # STEP 1: Upload PDF & extract raw text | |
| # ───────────────────────────────────────────────────────────── | |
| async def step1_upload_pdf(file: UploadFile = File(...)): | |
| """Upload a PDF, return the saved file id and extracted plain text.""" | |
| # Save permanently for this session | |
| session_id = os.urandom(8).hex() | |
| session_dir = os.path.join(UPLOAD_DIR, session_id) | |
| os.makedirs(session_dir, exist_ok=True) | |
| pdf_path = os.path.join(session_dir, "document.pdf") | |
| content = await file.read() | |
| with open(pdf_path, "wb") as f: | |
| f.write(content) | |
| # Extract text using PyPDF2 | |
| import PyPDF2 | |
| text = "" | |
| try: | |
| with open(pdf_path, "rb") as f: | |
| reader = PyPDF2.PdfReader(f) | |
| for page_num in range(len(reader.pages)): | |
| page_text = reader.pages[page_num].extract_text() | |
| if page_text: | |
| text += page_text + "\n" | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"PDF text extraction failed: {str(e)}") | |
| # Save extracted text | |
| text_path = os.path.join(session_dir, "extracted.txt") | |
| with open(text_path, "w", encoding="utf-8") as f: | |
| f.write(text) | |
| return { | |
| "session_id": session_id, | |
| "filename": file.filename, | |
| "page_count": len(PyPDF2.PdfReader(open(pdf_path, "rb")).pages), | |
| "char_count": len(text), | |
| "text": text | |
| } | |
| # ───────────────────────────────────────────────────────────── | |
| # STEP 1b: Serve the stored PDF for the viewer | |
| # ───────────────────────────────────────────────────────────── | |
| async def serve_pdf(session_id: str): | |
| pdf_path = os.path.join(UPLOAD_DIR, session_id, "document.pdf") | |
| if not os.path.exists(pdf_path): | |
| raise HTTPException(status_code=404, detail="PDF not found") | |
| return FileResponse(pdf_path, media_type="application/pdf") | |
| # ───────────────────────────────────────────────────────────── | |
| # STEP 2: Run LLM analysis on extracted text | |
| # ───────────────────────────────────────────────────────────── | |
| async def step2_llm_analyze( | |
| session_id: str = Form(...), | |
| api_key: str = Form(...), | |
| provider: str = Form("gemini"), | |
| analysis_type: str = Form("agree"), | |
| target_technique: str = Form("") | |
| ): | |
| """Run LLM to extract parameters + per-parameter evidence from stored text. | |
| Returns extracted_data + evidence dict (with quote + reasoning per param).""" | |
| text_path = os.path.join(UPLOAD_DIR, session_id, "extracted.txt") | |
| if not os.path.exists(text_path): | |
| raise HTTPException(status_code=404, detail="Extracted text not found. Run Step 1 first.") | |
| with open(text_path, "r", encoding="utf-8") as f: | |
| text = f.read() | |
| # Strip boilerplate (page numbers, headers, author info, references, etc.) | |
| # before sending to the LLM to reduce token load. | |
| cleaned_text = clean_paper_text(text) | |
| try: | |
| extractor = PDFExtractor(provider=provider, api_key=api_key) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Failed to init LLM provider '{provider}': {str(e)}") | |
| # Validate API key before calling provider | |
| if not api_key.strip() and provider not in ("lmstudio", "ollama", "local"): | |
| raise HTTPException(status_code=401, detail="API key is missing. Please paste your API key in the sidebar and try again.") | |
| try: | |
| raw = extractor.analyze_document(cleaned_text, analysis_type=analysis_type, target_technique=target_technique) | |
| except Exception as e: | |
| err_str = str(e) | |
| # Detect 401 / auth errors from any provider and surface as HTTP 401 | |
| if '401' in err_str or 'authentication' in err_str.lower() or 'missing authentication' in err_str.lower() or 'invalid api key' in err_str.lower() or 'unauthorized' in err_str.lower(): | |
| raise HTTPException(status_code=401, detail=f"Invalid or missing API key for provider '{provider}'. Please check your key and try again. (Details: {err_str[:200]})") | |
| # Detect 429 rate-limit errors and surface as HTTP 429 | |
| if '429' in err_str or 'rate limit' in err_str.lower() or 'rate-limit' in err_str.lower() or 'too many requests' in err_str.lower(): | |
| raise HTTPException(status_code=429, detail=f"Rate limit reached for provider '{provider}'. Please wait a moment and try again, or switch to a different provider. (Details: {err_str[:300]})") | |
| raise HTTPException(status_code=500, detail=f"LLM analysis failed: {err_str}") | |
| # Separate _evidence from the main extracted_data | |
| evidence = raw.pop("_evidence", {}) | |
| # Build highlights dict compatible with frontend: {param_key: ["quote — reasoning"]} | |
| highlights: dict = {} | |
| if isinstance(evidence, dict): | |
| for param, ev in evidence.items(): | |
| if isinstance(ev, dict): | |
| quote = ev.get("quote", "").strip() | |
| reasoning = ev.get("reasoning", "").strip() | |
| parts = [] | |
| if quote: | |
| parts.append(f'📖 "{quote}"') | |
| if reasoning: | |
| parts.append(f'💡 {reasoning}') | |
| if parts: | |
| highlights[param] = parts | |
| elif isinstance(ev, str) and ev.strip(): | |
| highlights[param] = [ev.strip()] | |
| # Also run legacy text search for AGREE if evidence is sparse | |
| if analysis_type == "agree" and len(highlights) < 5: | |
| legacy = find_highlights(text, raw) | |
| for k, v in legacy.items(): | |
| if k not in highlights: | |
| highlights[k] = v | |
| # Save results | |
| result_path = os.path.join(UPLOAD_DIR, session_id, "extracted_data.json") | |
| with open(result_path, "w", encoding="utf-8") as f: | |
| json.dump(raw, f, indent=2) | |
| return { | |
| "extracted_data": raw, | |
| "highlights": highlights | |
| } | |
| def find_highlights(text: str, data: dict) -> dict: | |
| """Search for text snippets relevant to each extracted parameter.""" | |
| highlights = {} | |
| search_terms = { | |
| "p1_option": [data.get("p1_option", ""), "sample", "treatment", "sampling", "remote sensing", "in-field"], | |
| "p2_amount": ["sample amount", "sample weight", "sample volume", str(data.get("p2_amount", "")), "g ", "mL", "mg"], | |
| "p3_option": [data.get("p3_option", ""), "in-line", "on-line", "at-line", "off-line", "inline", "online"], | |
| "p4_steps": ["steps", "procedure", "filtration", "extraction", "centrifugation", "evaporation", "derivatization"], | |
| "p5_automation": [data.get("p5_automation", ""), "automatic", "semi-automatic", "manual", "miniatur"], | |
| "p6_has_derivatization": ["derivat", "reagent", "CAS"], | |
| "p7_waste_amount": ["waste", "solvent", "reagent volume", str(data.get("p7_waste_amount", ""))], | |
| "p8_analytes_per_run": ["analyte", "parameter", "compound", "simultaneous", "per hour", "throughput"], | |
| "p9_technique": [data.get("p9_technique", ""), "HPLC", "GC", "LC-MS", "FTIR", "UV", "NMR", "SPE"], | |
| "p10_biobased_status": ["bio-based", "renewable", "bioethanol", "green solvent", data.get("p10_biobased_status", "")], | |
| "p11_has_toxic_reagents": ["toxic", "hazardous", "acetonitrile", "methanol", "chloroform", "benzene"], | |
| "p12_threats_count": ["flammable", "corrosive", "explosive", "oxidis", "aquatic", "bioaccumul", "persistent"], | |
| } | |
| lines = text.split("\n") | |
| for param, terms in search_terms.items(): | |
| found = [] | |
| for line in lines: | |
| if len(line.strip()) < 5: | |
| continue | |
| for term in terms: | |
| if term and len(str(term)) > 2 and str(term).lower() in line.lower(): | |
| snippet = line.strip()[:200] | |
| if snippet and snippet not in found: | |
| found.append(snippet) | |
| break | |
| highlights[param] = found[:3] # Max 3 snippets per param | |
| return highlights | |
| # ───────────────────────────────────────────────────────────── | |
| # STEP 3: Calculate the 12 principles | |
| # ───────────────────────────────────────────────────────────── | |
| async def step3_calculate( | |
| extracted_data: str = Form(...) | |
| ): | |
| """Compute greenness scores for the 12 AGREE principles from extracted_data JSON.""" | |
| try: | |
| data = json.loads(extracted_data) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid JSON for extracted_data") | |
| calculator = AGREECalculator() | |
| scores = calculator.calculate_all(data) | |
| # Build detailed breakdown for each principle | |
| breakdown = build_breakdown(data, scores) | |
| return { | |
| "scores": scores, | |
| "breakdown": breakdown | |
| } | |
| async def step3_calculate_nqs( | |
| extracted_data: str = Form(...) | |
| ): | |
| """Compute greenness scores for the NQS metric from extracted_data JSON.""" | |
| try: | |
| data = json.loads(extracted_data) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid JSON for extracted_data") | |
| # Extract inputs safely | |
| need_tier = int(data.get("need_tier", 1)) | |
| rgb_data = {} | |
| for letter in ['r', 'g', 'b']: | |
| for i in range(1, 5): | |
| rgb_data[f"{letter}{i}"] = float(data.get(f"{letter}{i}", 0.0)) | |
| sdg_agreements = [] | |
| for i in range(1, 18): | |
| sdg_agreements.append(bool(data.get(f"sdg_{i}", False))) | |
| calculator = NQSCalculator() | |
| results = calculator.calculate_nqs(need_tier, rgb_data, sdg_agreements) | |
| return results | |
| async def step3_calculate_bagi( | |
| extracted_data: str = Form(...) | |
| ): | |
| """Compute greenness scores for the BAGI metric from extracted_data JSON (selections dict {p1: 0, p2: 1, ...}).""" | |
| try: | |
| data = json.loads(extracted_data) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid JSON for extracted_data") | |
| scores = {} | |
| total = 0.0 | |
| for i in range(1, 11): | |
| idx = int(data.get(f"p{i}", 3)) # Default to worst (2.5) if not set | |
| score = 10.0 - (idx * 2.5) | |
| scores[f"p{i}"] = score | |
| total += score | |
| scores["Total"] = total | |
| return {"scores": scores} | |
| async def step3_calculate_rapi( | |
| extracted_data: str = Form(...) | |
| ): | |
| """Compute performance scores for RAPI from extracted_data JSON (selections dict {p1: 0, p2: 1, ...}).""" | |
| try: | |
| data = json.loads(extracted_data) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid JSON for extracted_data") | |
| scores = {} | |
| total = 0.0 | |
| for i in range(1, 11): | |
| # The frontend will send the exact point value (0, 2.5, 5, 7.5, 10) for RAPI | |
| score = float(data.get(f"p{i}", 0.0)) | |
| scores[f"p{i}"] = score | |
| total += score | |
| scores["Total"] = total | |
| return {"scores": scores} | |
| async def step3_calculate_complexmogapi( | |
| extracted_data: str = Form(...) | |
| ): | |
| """Compute TGS for ComplexMoGAPI from extracted_data JSON (selections dict {param_id: option_index}).""" | |
| try: | |
| data = json.loads(extracted_data) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid JSON for extracted_data") | |
| # Ensure all values are ints for parameter selections only | |
| selections = {k: int(v) for k, v in data.items() | |
| if k not in ('has_quantification', 'e_factor', '_tgs_preview') | |
| and isinstance(v, (int, float, str)) | |
| and str(v).lstrip('-').isdigit()} | |
| has_quantification = bool(data.get('has_quantification', True)) | |
| e_factor = float(data.get('e_factor', 0.0)) | |
| calculator = ComplexMoGAPICalculator() | |
| results = calculator.calculate(selections, has_quantification=has_quantification, e_factor=e_factor) | |
| return results | |
| async def step3_calculate_ecoscale( | |
| extracted_data: str = Form(...) | |
| ): | |
| """Compute Analytical Eco-Scale from extracted_data JSON.""" | |
| try: | |
| data = json.loads(extracted_data) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid JSON for extracted_data") | |
| calculator = EcoScaleCalculator() | |
| results = calculator.calculate(data) | |
| return results | |
| async def step3_calculate_cafri( | |
| extracted_data: str = Form(...) | |
| ): | |
| """Compute Carbon Footprint Reduction Index from JSON.""" | |
| try: | |
| data = json.loads(extracted_data) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid JSON for extracted_data") | |
| calculator = CaFRICalculator() | |
| results = calculator.calculate(data) | |
| return results | |
| async def get_complexmogapi_params(): | |
| """Returns the full parameter definitions for the frontend to render the review form.""" | |
| return {"params": get_param_definitions()} | |
| PRINCIPLE_INFO = { | |
| 1: {"name": "Sample Treatment", "goal": "Avoid sample treatment", "icon": "🔬"}, | |
| 2: {"name": "Sample Amount", "goal": "Minimize sample size", "icon": "⚖️"}, | |
| 3: {"name": "Device Positioning", "goal": "Perform in situ measurements", "icon": "📡"}, | |
| 4: {"name": "Integration of Steps", "goal": "Reduce distinct procedural steps", "icon": "🔗"}, | |
| 5: {"name": "Automation & Miniaturization", "goal": "Select automated & miniaturized methods", "icon": "🤖"}, | |
| 6: {"name": "Derivatization", "goal": "Avoid chemical modification", "icon": "⚗️"}, | |
| 7: {"name": "Analytical Waste", "goal": "Prevent large volumes of waste", "icon": "♻️"}, | |
| 8: {"name": "Throughput", "goal": "Multi-analyte methods are preferred", "icon": "⚡"}, | |
| 9: {"name": "Energy Consumption", "goal": "Minimize energy use", "icon": "🔋"}, | |
| 10: {"name": "Source of Reagents", "goal": "Use bio-based/renewable reagents", "icon": "🌿"}, | |
| 11: {"name": "Toxicity", "goal": "Eliminate toxic reagents", "icon": "☠️"}, | |
| 12: {"name": "Operator Safety", "goal": "Increase safety", "icon": "🛡️"}, | |
| } | |
| def build_breakdown(data: dict, scores: dict) -> list: | |
| breakdown = [] | |
| # P1 | |
| breakdown.append({ | |
| "principle": 1, | |
| **PRINCIPLE_INFO[1], | |
| "input": f"Selected: {data.get('p1_option', 'N/A')}", | |
| "formula": "Direct lookup table", | |
| "score": scores.get(1, 0) | |
| }) | |
| # P2 | |
| amt2 = data.get("p2_amount", 0) | |
| if amt2 < 0.1: | |
| formula2 = "amount < 0.1 → Score = 1.00" | |
| elif amt2 > 100: | |
| formula2 = "amount > 100 → Score = 0.00" | |
| else: | |
| formula2 = f"Score = |−0.142 × ln({amt2}) + 0.65| = {scores.get(2, 0):.3f}" | |
| breakdown.append({"principle": 2, **PRINCIPLE_INFO[2], "input": f"Sample amount: {amt2} g/mL", "formula": formula2, "score": scores.get(2, 0)}) | |
| # P3 | |
| breakdown.append({"principle": 3, **PRINCIPLE_INFO[3], "input": f"Positioning: {data.get('p3_option', 'N/A')}", "formula": "Direct lookup table", "score": scores.get(3, 0)}) | |
| # P4 | |
| steps4 = data.get("p4_steps", 0) | |
| breakdown.append({"principle": 4, **PRINCIPLE_INFO[4], "input": f"Steps: {steps4}", "formula": f"{steps4} steps → Score = {scores.get(4, 0):.1f}", "score": scores.get(4, 0)}) | |
| # P5 | |
| auto5 = data.get("p5_automation", "manual"); mini5 = data.get("p5_miniaturized", False) | |
| breakdown.append({"principle": 5, **PRINCIPLE_INFO[5], "input": f"Automation: {auto5}, Miniaturized: {mini5}", "formula": "2D matrix lookup", "score": scores.get(5, 0)}) | |
| # P6 | |
| deriv6 = data.get("p6_has_derivatization", False); cas6 = data.get("p6_agents_cas", []) | |
| if not deriv6: | |
| formula6 = "No derivatization → Score = 1.0" | |
| else: | |
| formula6 = f"Score = (∏ DA_i) − 0.2 [agents: {', '.join(cas6) if cas6 else 'none listed'}]" | |
| breakdown.append({"principle": 6, **PRINCIPLE_INFO[6], "input": f"Derivatization: {deriv6}, CAS agents: {cas6}", "formula": formula6, "score": scores.get(6, 0)}) | |
| # P7 | |
| amt7 = data.get("p7_waste_amount", 0) | |
| if amt7 < 0.1: formula7 = "amount < 0.1 → Score = 1.00" | |
| elif amt7 > 150: formula7 = "amount > 150 → Score = 0.00" | |
| else: formula7 = f"Score = |−0.134 × ln({amt7}) + 0.6946| = {scores.get(7, 0):.3f}" | |
| breakdown.append({"principle": 7, **PRINCIPLE_INFO[7], "input": f"Waste: {amt7} g/mL", "formula": formula7, "score": scores.get(7, 0)}) | |
| # P8 | |
| a8 = data.get("p8_analytes_per_run", 0); r8 = data.get("p8_runs_per_hour", 0) | |
| total8 = a8 * r8 | |
| if total8 > 70: formula8 = f"{a8} × {r8} = {total8} > 70 → Score = 1.00" | |
| elif total8 < 1: formula8 = f"{a8} × {r8} = {total8} < 1 → Score = 0.00" | |
| else: formula8 = f"Score = |0.2429 × ln({total8:.2f}) − 0.0517| = {scores.get(8, 0):.3f}" | |
| breakdown.append({"principle": 8, **PRINCIPLE_INFO[8], "input": f"Analytes/run: {a8}, Runs/hr: {r8}", "formula": formula8, "score": scores.get(8, 0)}) | |
| # P9 | |
| breakdown.append({"principle": 9, **PRINCIPLE_INFO[9], "input": f"Technique: {data.get('p9_technique', 'N/A')}", "formula": "Lookup in high/medium/low technique list", "score": scores.get(9, 0)}) | |
| # P10 | |
| breakdown.append({"principle": 10, **PRINCIPLE_INFO[10], "input": f"Reagent source: {data.get('p10_biobased_status', 'N/A')}", "formula": "No reagents/All bio-based=1.0, Partial=0.5, None=0.0", "score": scores.get(10, 0)}) | |
| # P11 | |
| toxic11 = data.get("p11_has_toxic_reagents", False); amt11 = data.get("p11_toxic_amount", 0) | |
| if not toxic11: formula11 = "No toxic reagents → Score = 1.0" | |
| elif amt11 < 0.1: formula11 = "amount < 0.1 g/mL → Score = 0.8" | |
| elif amt11 > 50: formula11 = "amount > 50 → Score = 0.00" | |
| else: formula11 = f"Score = |−0.129 × ln({amt11}) + 0.5012| = {scores.get(11, 0):.3f}" | |
| breakdown.append({"principle": 11, **PRINCIPLE_INFO[11], "input": f"Toxic reagents: {toxic11}, Amount: {amt11} g/mL", "formula": formula11, "score": scores.get(11, 0)}) | |
| # P12 | |
| threats12 = data.get("p12_threats_count", 0) | |
| breakdown.append({"principle": 12, **PRINCIPLE_INFO[12], "input": f"GHS threats count: {threats12}", "formula": f"{threats12} threats → Score = {scores.get(12, 0):.1f}", "score": scores.get(12, 0)}) | |
| return breakdown | |
| # ───────────────────────────────────────────────────────────── | |
| # STEP 4: Generate AI discussion section | |
| # ───────────────────────────────────────────────────────────── | |
| PRINCIPLE_NAMES_FULL = { | |
| 1: "Sample Treatment", | |
| 2: "Sample Amount", | |
| 3: "Device Positioning", | |
| 4: "Integration of Steps", | |
| 5: "Automation and Miniaturization", | |
| 6: "Derivatization", | |
| 7: "Analytical Waste", | |
| 8: "Throughput", | |
| 9: "Energy Consumption", | |
| 10: "Source of Reagents", | |
| 11: "Toxicity of Reagents", | |
| 12: "Operator's Safety", | |
| } | |
| def _build_discussion_prompt(scores: dict, data: dict, model_name: str, analysis_type: str = "agree") -> str: | |
| method_info = { | |
| "agree": ("AGREE (Analytical GREEnness) metric", "Pena-Pereira F, Wojnowski W, Tobiszewski M. AGREE—Analytical GREEnness Metric Approach and Software. Anal Chem. 2020;92(14):10076–10082"), | |
| "nqs": ("NQS (Need, Quality, and Sustainability) index", "Kiwfo K, et al. A new need, quality, and sustainability (NQS) index. Microchem J. 2023;193:109026"), | |
| "complexmogapi": ("ComplexMoGAPI", "Mansour FR, et al. A total scoring system and software for complex modified GAPI. Green Anal Chem. 2024;10:100126"), | |
| "ecoscale": ("Analytical Eco-Scale", "Gałuszka A, et al. Analytical Eco-Scale for assessing the greenness of analytical procedures. TrAC. 2012;37:61-72"), | |
| "cafri": ("CaFRI (Carbon Footprint Reduction Index)", "Mansour FR, Nowak PM. Introducing the carbon footprint reduction index (CaFRI). BMC Chemistry. 2025;19:10"), | |
| "bagi": ("BAGI (Blue Applicability Grade Index)", "Astolfi ML, et al. Blue Applicability Grade Index (BAGI) and tool: A new metric for evaluating the operational practicality of analytical methods. Green Chemistry, 2023") | |
| } | |
| method_name, reference = method_info.get(analysis_type, method_info["agree"]) | |
| return f"""You are an expert in Green Analytical Chemistry. Write a formal, academic-quality DISCUSSION SECTION suitable for publication in a peer-reviewed analytical chemistry journal. The discussion is about the greenness assessment of an analytical method evaluated using the {method_name}. | |
| Context: | |
| - AI model used for extraction: {model_name} | |
| - OVERALL SCORES & VERDICT: | |
| {json.dumps(scores, indent=2)} | |
| - EXTRACTED METHOD PARAMETERS: | |
| {json.dumps(data, indent=2)} | |
| WRITING REQUIREMENTS: | |
| 1. Write EXACTLY 4–5 paragraphs. | |
| 2. Paragraph 1: Introduce the {method_name} briefly (cite: {reference}). State the overall score and classification/verdict of the method. | |
| 3. Paragraph 2: Discuss the BEST-performing aspects of the procedure — what the method does well from a greenness perspective, referencing specific extracted parameters (e.g., low solvent use, energy efficiency, non-toxic reagents). | |
| 4. Paragraph 3: Discuss the WORST-performing aspects (penalties or low scores) — describe what these reveal about the method's environmental/safety weaknesses. | |
| 5. Paragraph 4: Discuss the overall greenness profile and general applicability. | |
| 6. Paragraph 5 (optional): Give 2–3 concrete, specific, actionable recommendations to improve the method's greenness score according to the {method_name}. | |
| STYLE: | |
| - Third person, passive voice, formal academic English. | |
| - Be highly specific, citing the exact numeric scores and extracted data values provided above. | |
| - Do NOT use markdown, bullet points, or headers inside the text. Write flowing paragraphs only. | |
| - Length: 350–550 words total. | |
| Produce ONLY the discussion text, starting directly with the first paragraph. | |
| """ | |
| async def generate_discussion( | |
| provider: str = Form("gemini"), | |
| api_key: str = Form(...), | |
| scores_json: str = Form(...), | |
| extracted_data_json: str = Form(...), | |
| analysis_type: str = Form("agree") | |
| ): | |
| """Use the same LLM provider to generate a scientific discussion section.""" | |
| try: | |
| scores = json.loads(scores_json) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid scores JSON") | |
| try: | |
| extracted_data = json.loads(extracted_data_json) | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid extracted_data JSON") | |
| try: | |
| extractor = PDFExtractor(provider=provider, api_key=api_key) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Failed to init provider '{provider}': {str(e)}") | |
| # Identify the model name for the prompt | |
| model_map = { | |
| "gemini": "Gemini 2.0 Flash (Google)", | |
| "groq": "Llama 3.3 70B (Groq)", | |
| "openai": "GPT-4o mini (OpenAI)", | |
| "lmstudio": "Local LLM (LM Studio)", | |
| "ollama": "Local LLM (Ollama)", | |
| "local": "Local LLM", | |
| } | |
| model_name = model_map.get(provider.lower(), f"Local LLM ({provider})") | |
| prompt = _build_discussion_prompt(scores, extracted_data, model_name, analysis_type=analysis_type) | |
| try: | |
| if provider.lower() == "gemini": | |
| response = extractor.model.generate_content(prompt) | |
| discussion = response.text.strip() | |
| elif provider.lower() == "groq": | |
| response = extractor.client.chat.completions.create( | |
| model="llama-3.3-70b-versatile", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert academic writer in analytical chemistry."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.6, | |
| max_tokens=1200, | |
| ) | |
| discussion = response.choices[0].message.content.strip() | |
| elif provider.lower() == "openai": | |
| response = extractor.client.chat.completions.create( | |
| model="gpt-4o-mini", | |
| messages=[ | |
| {"role": "system", "content": "You are an expert academic writer in analytical chemistry."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.6, | |
| max_tokens=1200, | |
| ) | |
| discussion = response.choices[0].message.content.strip() | |
| elif provider.lower() in ("lmstudio", "ollama", "local"): | |
| response = extractor.client.chat.completions.create( | |
| model=extractor.model_name, | |
| messages=[ | |
| {"role": "system", "content": "You are an expert academic writer in analytical chemistry."}, | |
| {"role": "user", "content": prompt} | |
| ], | |
| temperature=0.6, | |
| max_tokens=1200, | |
| ) | |
| discussion = response.choices[0].message.content.strip() | |
| else: | |
| raise HTTPException(status_code=400, detail=f"Unknown provider: {provider}") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Discussion generation failed: {str(e)}") | |
| return { | |
| "discussion": discussion, | |
| "model": model_name, | |
| "provider": provider, | |
| "total_score": scores.get("Total", 0.0), | |
| } | |