""" Export utilities for research reports. Supports: DOCX, PDF, Markdown, BibTeX, ZIP (full workspace). Ported from the Next.js original. """ import os import json import zipfile import tempfile import re from datetime import datetime from typing import Optional, List, Dict, Any import pandas as pd def _project_root() -> str: return os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) def _sanitize_key(value: str, fallback: str) -> str: key = re.sub(r'[^a-zA-Z0-9_]', '_', value or "") key = re.sub(r'_+', '_', key).strip('_') return key or fallback def _escape_bibtex(value: Any) -> str: text = "" if value is None else str(value) text = text.replace('\u2028', ' ').replace('\u2029', ' ') return text.replace('&', r'\&').replace('%', r'\%').replace('_', r'\_') def _doc_authors(doc: Dict[str, Any]) -> str: authors = doc.get("authors", []) if isinstance(authors, list): return " and ".join(str(a) for a in authors if a) or "Unknown" return str(authors or "Unknown") def _markdown_to_latex_body(report_md: str) -> str: body = report_md or "" body = re.sub(r'^###\s+(.+)$', r'\\subsection{\1}', body, flags=re.MULTILINE) body = re.sub(r'^##\s+(.+)$', r'\\section{\1}', body, flags=re.MULTILINE) body = re.sub(r'^#\s+(.+)$', r'\\section{\1}', body, flags=re.MULTILINE) body = body.replace('**', '') return body def generate_bibtex_from_docs(docs: List[Dict[str, Any]]) -> str: """Generate BibTeX entries from pipeline documents, preserving original GRADE evidence.""" entries = [] seen = set() for idx, doc in enumerate(docs, 1): title = doc.get("title") or "Untitled" raw_id = doc.get("id") or doc.get("doi") or title cite_key = _sanitize_key(str(raw_id), f"ref{idx}") if cite_key in seen: cite_key = f"{cite_key}_{idx}" seen.add(cite_key) authors = _doc_authors(doc) year = doc.get("year") or "n.d." doi = doc.get("doi") or doc.get("metadata", {}).get("doi") or "" source = doc.get("source") or doc.get("metadata", {}).get("journal") or "Repository" url = doc.get("url") or doc.get("pdfUrl") or doc.get("handleUrl") or "" evidence = doc.get("evidenceLevel") or doc.get("grade_label") or doc.get("grade_level") or "PENDIENTE" type_text = str(doc.get("type") or "").lower() title_text = str(title).lower() source_text = str(source).lower() is_thesis = any(k in f"{type_text} {title_text}" for k in [ "tesis", "thesis", "dissertation", "grado", "maestria", "doctorado", "licenciatura", "bachelor", "master", "phd", ]) has_journal_hint = any(k in source_text for k in [ "journal", "revista", "review", "proceedings", "conference", "transactions", ]) bib_type = "mastersthesis" if is_thesis and not has_journal_hint and not doi else "article" venue_field = "school" if bib_type == "mastersthesis" else "journal" url_field = f" url = {{{url}}},\n" if url else "" entry = ( f"@{bib_type}{{{cite_key},\n" f" author = {{{_escape_bibtex(authors)}}},\n" f" title = {{{_escape_bibtex(title)}}},\n" f" {venue_field} = {{{_escape_bibtex(source)}}},\n" f" year = {{{_escape_bibtex(year)}}},\n" f"{url_field}" f" doi = {{{_escape_bibtex(doi)}}},\n" f" note = {{Calidad de evidencia GRADE: {_escape_bibtex(evidence)}}}\n" f"}}" ) entries.append(entry) return "\n\n".join(entries) def persist_research_output( report_md: str, docs: List[Dict[str, Any]], query: str, agent_role: str = "general", model: str = "unknown", output_root: Optional[str] = None, ) -> Dict[str, str]: """Persist final pipeline artifacts following the original beta data-mining layout.""" root = output_root or os.path.join(_project_root(), "latex_output") scraping_dir = os.path.join(root, "data", "json1_scraping") outputs_dir = os.path.join(root, "data", "json2_outputs") os.makedirs(scraping_dir, exist_ok=True) os.makedirs(outputs_dir, exist_ok=True) os.makedirs(root, exist_ok=True) timestamp = datetime.utcnow().isoformat() + "Z" role_name = _sanitize_key((agent_role or "consolidado_investigacion").lower(), "consolidado_investigacion") tex_path = os.path.join(root, f"{role_name}.tex") md_path = os.path.join(root, f"{role_name}.md") bib_path = os.path.join(root, "referencias.bib") scraping_path = os.path.join(scraping_dir, "scraping_data.json") outputs_path = os.path.join(outputs_dir, "llm_outputs.json") bib = generate_bibtex_from_docs(docs) tex = _markdown_to_latex_body(report_md) with open(tex_path, "w", encoding="utf-8") as f: f.write(tex) with open(md_path, "w", encoding="utf-8") as f: f.write(report_md or "") with open(bib_path, "w", encoding="utf-8") as f: f.write(bib) scraping_data = { "version": "1.0.0", "createdAt": timestamp, "lastModifiedAt": timestamp, "projectId": "LETXIPU-GRADIO", "totalRecords": len(docs), "records": [ { "id": doc.get("id") or f"doc_{i}", "url": doc.get("url") or doc.get("pdfUrl") or doc.get("handleUrl") or "", "title": doc.get("title") or "Sin titulo", "snippet": doc.get("snippet") or doc.get("abstract") or "", "source": doc.get("source") or "Desconocido", "scrapedAt": timestamp, "metadata": { "authors": doc.get("authors") or [], "year": int(doc["year"]) if str(doc.get("year", "")).isdigit() else None, "abstract": doc.get("abstract"), "doi": doc.get("doi"), "pdfUrl": doc.get("pdfUrl"), "university": doc.get("university") or doc.get("institution"), "queries": [query], "evidenceLevel": doc.get("evidenceLevel") or doc.get("grade_label") or doc.get("grade_level"), }, } for i, doc in enumerate(docs, 1) ], "changelog": [ { "timestamp": timestamp, "action": "added", "recordCount": len(docs), "description": "Generado automaticamente por el pipeline Python Gradio.", } ], "metadata": { "queryUsed": query, "sourcesEnabled": [], "iterationsCompleted": 1, "totalIterationsPlanned": 1, }, } output_record = { "id": f"out_{int(datetime.utcnow().timestamp())}", "timestamp": timestamp, "promptUsed": query, "modelUsed": model or "unknown", "agentRole": agent_role, "inputRecordCount": len(docs), "output": {"plainText": report_md or "", "latex": tex}, "sourceScrapingVersion": "1.0.0", } outputs_data = { "version": "1.0.0", "createdAt": timestamp, "lastModifiedAt": timestamp, "projectId": "LETXIPU-GRADIO", "outputs": [output_record], } with open(scraping_path, "w", encoding="utf-8") as f: json.dump(scraping_data, f, ensure_ascii=False, indent=2) with open(outputs_path, "w", encoding="utf-8") as f: json.dump(outputs_data, f, ensure_ascii=False, indent=2) return { "tex": tex_path, "markdown": md_path, "bib": bib_path, "scraping_json": scraping_path, "outputs_json": outputs_path, } def export_markdown(report_md: str, query: str = "") -> str: """Export report as clean Markdown file.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_name = re.sub(r'[^\w\s-]', '', query[:40]).strip().replace(' ', '_') or "research" filename = f"{safe_name}_{timestamp}.md" path = os.path.join(tempfile.gettempdir(), filename) header = f"""--- title: "{query}" date: "{datetime.now().isoformat()}" generator: "LETXIPU Research Platform" --- """ with open(path, 'w', encoding='utf-8') as f: f.write(header + report_md) return path def export_bibtex(docs_df: pd.DataFrame, query: str = "") -> str: """Export documents as BibTeX references.""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_name = re.sub(r'[^\w\s-]', '', query[:40]).strip().replace(' ', '_') or "references" filename = f"{safe_name}_{timestamp}.bib" path = os.path.join(tempfile.gettempdir(), filename) entries = [] for idx, row in docs_df.iterrows(): title = row.get("Título", "N/A") authors = row.get("Autores", "N/A") year = str(row.get("Año", "")) doi = row.get("DOI", "") source = row.get("Fuente", "") # Generate citation key first_author = authors.split(",")[0].strip().split()[-1] if authors else "unknown" cite_key = re.sub(r'[^a-zA-Z0-9]', '', f"{first_author}{year}") if not cite_key: cite_key = f"ref{idx}" entry = f"""@article{{{cite_key}, title = {{{title}}}, author = {{{authors}}}, year = {{{year}}}, doi = {{{doi}}}, journal = {{{source}}}, }}""" entries.append(entry) with open(path, 'w', encoding='utf-8') as f: f.write("\n\n".join(entries)) return path def export_zip(report_md: str, docs_df: pd.DataFrame, query: str = "", settings: dict = None) -> str: """Export full workspace as ZIP: report.md + references.bib + documents.csv + settings.json""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_name = re.sub(r'[^\w\s-]', '', query[:40]).strip().replace(' ', '_') or "research" filename = f"{safe_name}_workspace_{timestamp}.zip" path = os.path.join(tempfile.gettempdir(), filename) with zipfile.ZipFile(path, 'w', zipfile.ZIP_DEFLATED) as zf: # 1. Report markdown header = f"---\ntitle: \"{query}\"\ndate: \"{datetime.now().isoformat()}\"\n---\n\n" zf.writestr("report.md", header + report_md) # 2. BibTeX bib_path = export_bibtex(docs_df, query) zf.write(bib_path, "references.bib") # 3. Documents CSV csv_content = docs_df.to_csv(index=False, encoding='utf-8') zf.writestr("documents.csv", csv_content) # 4. Documents JSON (machine-readable) docs_json = docs_df.to_json(orient='records', force_ascii=False, indent=2) zf.writestr("documents.json", docs_json) # 5. Settings/metadata meta = { "query": query, "timestamp": datetime.now().isoformat(), "total_documents": len(docs_df), "platform": "LETXIPU Research Platform", "settings": settings or {}, } zf.writestr("metadata.json", json.dumps(meta, indent=2, ensure_ascii=False)) return path def export_docx(report_md: str, query: str = "") -> Optional[str]: """Export report as DOCX using python-docx if available.""" try: from docx import Document from docx.shared import Pt, Inches from docx.enum.text import WD_ALIGN_PARAGRAPH except ImportError: return None # python-docx not installed timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") safe_name = re.sub(r'[^\w\s-]', '', query[:40]).strip().replace(' ', '_') or "research" filename = f"{safe_name}_{timestamp}.docx" path = os.path.join(tempfile.gettempdir(), filename) doc = Document() # Title title_para = doc.add_heading(query or "Informe de Investigación", level=0) title_para.alignment = WD_ALIGN_PARAGRAPH.CENTER doc.add_paragraph( f"Generado: {datetime.now().strftime('%d/%m/%Y %H:%M')} | LETXIPU Research Platform", style='Subtitle' ) doc.add_paragraph("") # spacer # Parse markdown sections lines = report_md.split('\n') for line in lines: stripped = line.strip() if not stripped: doc.add_paragraph("") continue if stripped.startswith('#### '): doc.add_heading(stripped[5:], level=4) elif stripped.startswith('### '): doc.add_heading(stripped[4:], level=3) elif stripped.startswith('## '): doc.add_heading(stripped[3:], level=2) elif stripped.startswith('# '): doc.add_heading(stripped[2:], level=1) elif stripped.startswith('- ') or stripped.startswith('* '): doc.add_paragraph(stripped[2:], style='List Bullet') elif re.match(r'^\d+\.\s', stripped): text = re.sub(r'^\d+\.\s', '', stripped) doc.add_paragraph(text, style='List Number') elif stripped.startswith('> '): p = doc.add_paragraph(stripped[2:]) p.style = 'Intense Quote' else: # Handle bold and italic in regular text p = doc.add_paragraph() # Simple bold/italic parsing parts = re.split(r'(\*\*.*?\*\*|\*.*?\*)', stripped) for part in parts: if part.startswith('**') and part.endswith('**'): run = p.add_run(part[2:-2]) run.bold = True elif part.startswith('*') and part.endswith('*'): run = p.add_run(part[1:-1]) run.italic = True else: p.add_run(part) doc.save(path) return path