Spaces:
Running
Running
| import os | |
| from reportlab.lib.pagesizes import A4 | |
| from reportlab.lib.units import inch | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from datetime import datetime | |
| import markdown2 | |
| from mistralai import Mistral | |
| from pathlib import Path | |
| from urllib.parse import urlparse | |
| import convertapi | |
| import requests | |
| from dotenv import load_dotenv | |
| import re | |
| load_dotenv() | |
| convertapi.api_credentials = os.getenv("CONVERTAPI_TOKEN") | |
| if not convertapi.api_credentials: | |
| raise ValueError("CONVERTAPI_TOKEN environment variable is required") | |
| SUPPORTED_FORMATS = ["pdf", "docx", "txt"] | |
| MAX_FILE_SIZE = int(os.getenv("MAX_FILE_SIZE", 100 * 1024 * 1024)) | |
| # TEMP_DIR = os.getenv("TEMP_DIR", "temp") | |
| # In merge_md.py, update temp directory handling | |
| TEMP_DIR = os.getenv("TEMP_DIR", "/tmp/scraper_temp") | |
| # Ensure temp directory exists | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| def upload_to_service(file_path: str) -> str: | |
| """ | |
| Mock function to simulate uploading a file to a cloud service. | |
| Args: | |
| file_path (str): Path to the file to upload. | |
| Returns: | |
| str: Mock public URL or error message. | |
| """ | |
| try: | |
| if not os.path.exists(file_path): | |
| return f"File not found: {file_path}" | |
| return f"https://mock-cloud-service.com/{os.path.basename(file_path)}" | |
| except Exception as e: | |
| return f"Error uploading file: {str(e)}" | |
| def convert_from_url(document_url: str, output_format: str) -> str: | |
| """ | |
| Convert a document from a URL to a different format using ConvertAPI. | |
| Args: | |
| document_url (str): The URL of the input file. | |
| output_format (str): The format to convert the file to. | |
| Returns: | |
| str: The path to the converted file or an error message. | |
| """ | |
| try: | |
| if not document_url or not document_url.lower().startswith(("http://", "https://")): | |
| return "Invalid or unsupported URL format." | |
| if output_format not in SUPPORTED_FORMATS: | |
| return f"Unsupported output format: {output_format}" | |
| result = convertapi.convert(output_format, {"File": document_url}) | |
| input_filename = Path(urlparse(document_url).path).stem or "converted_file" | |
| output_filename = f"{input_filename}.{output_format}" | |
| output_path = Path(TEMP_DIR) / output_filename | |
| output_path.parent.mkdir(exist_ok=True) | |
| result.file.save(str(output_path)) | |
| return str(output_path) | |
| except Exception as e: | |
| return f"Error converting file from URL: {str(e)}" | |
| def merge_md_to_pdf(output_dir, site_name, site_description="", site_category="General"): | |
| """ | |
| Merge all Markdown files in the output directory into a single PDF using reportlab after processing with Mistral AI. | |
| Args: | |
| output_dir (str): Directory containing Markdown files. | |
| site_name (str): Name of the site for the PDF title. | |
| site_description (str): Description of the site. | |
| site_category (str): Category of the site. | |
| Returns: | |
| dict: Result containing success status, output PDF path, and message. | |
| """ | |
| try: | |
| api_key = os.getenv("MISTRAL_API_KEY") | |
| if not api_key: | |
| return { | |
| "success": False, | |
| "error": "MISTRAL_API_KEY environment variable not set", | |
| "output_pdf": None, | |
| "pages_merged": 0 | |
| } | |
| client = Mistral(api_key=api_key) | |
| model = "mistral-large-latest" | |
| if not os.path.exists(output_dir): | |
| return { | |
| "success": False, | |
| "error": f"Output directory {output_dir} does not exist", | |
| "output_pdf": None, | |
| "pages_merged": 0 | |
| } | |
| md_files = [ | |
| f for f in os.listdir(output_dir) | |
| if f.endswith('.md') and f not in ['scraping_summary.md', 'scraping_log.txt'] | |
| ] | |
| if not md_files: | |
| return { | |
| "success": False, | |
| "error": "No Markdown files found in the output directory", | |
| "output_pdf": None, | |
| "pages_merged": 0 | |
| } | |
| pdf_output_path = os.path.join(output_dir, f"{site_name}_merged.pdf") | |
| doc = SimpleDocTemplate( | |
| pdf_output_path, | |
| pagesize=A4, | |
| rightMargin=inch, | |
| leftMargin=inch, | |
| topMargin=inch, | |
| bottomMargin=inch | |
| ) | |
| styles = getSampleStyleSheet() | |
| title_style = ParagraphStyle(name='Title', fontSize=24, leading=28, alignment=1, spaceAfter=20) | |
| heading_style = ParagraphStyle(name='Heading2', fontSize=18, leading=22, spaceAfter=15) | |
| body_style = ParagraphStyle(name='Body', fontSize=12, leading=14, spaceAfter=10) | |
| story = [ | |
| Paragraph(f"{site_name}", title_style), | |
| Spacer(1, 0.2 * inch), | |
| Paragraph(f"Description: {site_description}", body_style), | |
| Paragraph(f"Category: {site_category}", body_style), | |
| Paragraph(f"Generated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", body_style), | |
| PageBreak(), | |
| Paragraph("Table of Contents", heading_style), | |
| Spacer(1, 0.2 * inch) | |
| ] | |
| toc_entries = [] | |
| for idx, md_file in enumerate(sorted(md_files), 1): | |
| file_path = os.path.join(output_dir, md_file) | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| md_content = f.read() | |
| title = md_content.split('\n')[0].strip('#').strip() or f"Page {idx}" | |
| try: | |
| prompt = f""" | |
| You are an expert content editor. Below is the content of a Markdown file. Please enhance the content by making it more detailed, well-structured, and polished while preserving the original meaning. Ensure the output is in plain text suitable for inclusion in a PDF. Avoid adding Markdown or HTML formatting in the response. | |
| If there are HTML tags like <p><strong>Agents-MCP-Hackathon (Agents-MCP-Hackathon)</strong></p>, convert them to plain text like Agents-MCP-Hackathon (Agents-MCP-Hackathon). | |
| Original content: | |
| {md_content} | |
| Enhanced content: | |
| """ | |
| response = client.chat.complete( | |
| model=model, | |
| messages=[{"role": "user", "content": prompt}] | |
| ) | |
| enhanced_content = response.choices[0].message.content.strip() | |
| except Exception as e: | |
| print(f"Warning: Failed to process {md_file} with Mistral AI: {str(e)}. Using original content.") | |
| enhanced_content = md_content | |
| html_content = markdown2.markdown(enhanced_content, extras=['fenced-code-blocks', 'tables']) | |
| text_content = re.sub(r'<[^>]+>', '', html_content) | |
| text_content = re.sub(r'\s+', ' ', text_content).strip() | |
| lines = text_content.split('\n') | |
| toc_entries.append(Paragraph(f"{idx}. {title}", body_style)) | |
| story.append(Paragraph(title, heading_style)) | |
| story.append(Spacer(1, 0.1 * inch)) | |
| for line in lines: | |
| if line.strip(): | |
| story.append(Paragraph(line.strip(), body_style)) | |
| story.append(PageBreak()) | |
| story[6:6] = toc_entries + [PageBreak()] | |
| doc.build(story) | |
| return { | |
| "success": True, | |
| "output_pdf": pdf_output_path, | |
| "pages_merged": len(md_files), | |
| "message": f"Successfully merged {len(md_files)} Markdown files into {pdf_output_path} after processing with Mistral AI" | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Failed to merge Markdown files into PDF: {str(e)}", | |
| "output_pdf": None, | |
| "pages_merged": 0 | |
| } | |
| def merge_md_to_pdf_and_convert_to_url(output_dir, site_name, site_description="", site_category="General", output_format="pdf"): | |
| """ | |
| Merge Markdown files into a PDF, upload it to a service, and optionally convert to another format. | |
| Args: | |
| output_dir (str): Directory containing Markdown files. | |
| site_name (str): Name of the site for the PDF title. | |
| site_description (str): Description of the site. | |
| site_category (str): Category of the site. | |
| output_format (str): Optional format to convert the PDF to (e.g., 'docx', 'txt'). | |
| Returns: | |
| dict: Result containing success status, output URL, and message. | |
| """ | |
| try: | |
| merge_result = merge_md_to_pdf(output_dir, site_name, site_description, site_category) | |
| if not merge_result["success"]: | |
| return { | |
| "success": False, | |
| "error": merge_result["error"], | |
| "output_url": None, | |
| "converted_path": None | |
| } | |
| pdf_path = merge_result["output_pdf"] | |
| if not pdf_path or not os.path.exists(pdf_path): | |
| return { | |
| "success": False, | |
| "error": "Generated PDF not found", | |
| "output_url": None, | |
| "converted_path": None | |
| } | |
| pdf_url = upload_to_service(pdf_path) | |
| if not pdf_url.startswith("http"): | |
| return { | |
| "success": False, | |
| "error": f"Failed to obtain URL: {pdf_url}", | |
| "output_url": None, | |
| "converted_path": None | |
| } | |
| converted_path = pdf_path | |
| if output_format != "pdf": | |
| converted_path = convert_from_url(pdf_url, output_format) | |
| if not converted_path.startswith(TEMP_DIR): | |
| return { | |
| "success": False, | |
| "error": f"Conversion failed: {converted_path}", | |
| "output_url": pdf_url, | |
| "converted_path": None | |
| } | |
| return { | |
| "success": True, | |
| "output_url": pdf_url, | |
| "converted_path": converted_path, | |
| "message": f"Successfully merged {merge_result['pages_merged']} Markdown files into PDF and uploaded to {pdf_url}", | |
| "pages_merged": merge_result["pages_merged"] | |
| } | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Error in merging or uploading: {str(e)}", | |
| "output_url": None, | |
| "converted_path": None | |
| } | |