Spaces:
Runtime error
Runtime error
| import io | |
| import json | |
| import os | |
| from typing import BinaryIO, List | |
| from uuid import UUID | |
| from fastapi import UploadFile | |
| import markdown | |
| from docx import Document | |
| from docx.shared import Inches | |
| from bs4 import BeautifulSoup | |
| import PyPDF2 | |
| from PIL import Image | |
| import pytesseract | |
| import aiofiles | |
| import asyncio | |
| from src.config import logger | |
| import httpx | |
| import shutil | |
| from src.repositories import SOWRepository | |
| class SOWClient: | |
| def __init__(self): | |
| self._repository = SOWRepository | |
| tesseract_path = os.getenv("TESSERACT_PATH", "/usr/bin/tesseract") | |
| if os.path.exists(tesseract_path): | |
| pytesseract.pytesseract.tesseract_cmd = tesseract_path | |
| logger.info(f"Tesseract path configured: {tesseract_path}") | |
| else: | |
| logger.warning( | |
| f"Tesseract not found at {tesseract_path}. OCR functionality may not work." | |
| ) | |
| async def __aenter__(self): | |
| return self | |
| async def __aexit__(self, exc_type, exc_value, traceback): | |
| pass | |
| async def generate_sow(self, sow_id: UUID) -> dict: | |
| content = None | |
| question_json = None | |
| async with self._repository() as repository: | |
| sows = await repository.get_sows(sow_id=sow_id) | |
| folder_path = os.path.join(os.getcwd(), "temp", str(sow_id)) | |
| if os.path.exists(folder_path): | |
| content = await self._read_uploaded_files(id=sow_id) | |
| sow = sows[0] | |
| question_json = json.loads(sow["additional_info"]) if sow["additional_info"] else None | |
| sow["requirement"] = content if content else sow["requirement"] | |
| del sow["id"] | |
| del sow["created_at"] | |
| del sow["updated_at"] | |
| url = os.getenv( | |
| "NN_SOW_URL", "https://n8n.srv763317.hstgr.cloud/webhook/sow-generate" | |
| ) | |
| async with httpx.AsyncClient() as client: | |
| response = await client.post( | |
| url, | |
| json=sow, | |
| timeout=3000, | |
| ) | |
| response_json = response.json() | |
| generated_sow_json = response_json["message"]["content"] | |
| if generated_sow_json["is_required_questions"]: | |
| if not question_json: | |
| question_json = {} | |
| for question in generated_sow_json["questions"]: | |
| question_json[question] = None | |
| question_json = json.dumps(question_json) | |
| generated_sow = ( | |
| generated_sow_json["generated_sow"] | |
| if generated_sow_json["generated_sow"] | |
| else None | |
| ) | |
| async with self._repository() as repository: | |
| updated_sow = await repository.update_sow( | |
| sow_id=sow_id, | |
| sow={ | |
| "sow_generated_text": generated_sow, | |
| "additional_info": json.dumps(question_json) if isinstance( | |
| question_json, dict) else question_json, | |
| "requirement": content if content else sow["requirement"], | |
| }, | |
| ) | |
| await self._remove_uploaded_files(id=sow_id) | |
| return updated_sow | |
| async def save_uploaded_files(self, id: UUID, files: List[UploadFile]) -> str: | |
| temp_dir = os.path.join(os.getcwd(), "temp") | |
| folder_path = os.path.join(temp_dir, str(id)) | |
| os.makedirs(folder_path, exist_ok=True) | |
| for file in files: | |
| file_path = os.path.join(folder_path, file.filename) | |
| async with aiofiles.open(file_path, "wb") as temp_file: | |
| await temp_file.write(await file.read()) | |
| return folder_path | |
| async def _remove_uploaded_files(self, id: UUID): | |
| folder_path = os.path.join(os.getcwd(), "temp", str(id)) | |
| if os.path.exists(folder_path): | |
| shutil.rmtree(folder_path) | |
| async def _read_uploaded_files(self, id: UUID) -> str: | |
| folder_path = os.path.join(os.getcwd(), "temp", str(id)) | |
| final_content = "" | |
| if os.path.exists(folder_path): | |
| for file in os.listdir(folder_path): | |
| file_path = os.path.join(folder_path, file) | |
| if os.path.isfile(file_path): | |
| content = await self._extract_text_from_file(file_path=file_path) | |
| final_content += content | |
| return final_content | |
| async def _extract_text_from_file(self, file_path: str) -> str: | |
| """Extract text from various file formats.""" | |
| async with aiofiles.open(file_path, "rb") as file: | |
| content = await file.read() | |
| file_extension = os.path.splitext(file_path)[1].lower() | |
| if file_extension in [".jpg", ".jpeg", ".png"]: | |
| return await self._extract_text_from_image(content) | |
| elif file_extension == ".pdf": | |
| return await self._extract_text_from_pdf(content) | |
| elif file_extension == ".docx": | |
| return await self._extract_text_from_docx(content) | |
| elif file_extension in [".txt", ".md"]: | |
| return content.decode("utf-8") | |
| else: | |
| raise ValueError(f"Unsupported file format: {file_extension}") | |
| async def _extract_text_from_image(self, content: bytes) -> str: | |
| """Extract text from image using OCR.""" | |
| try: | |
| temp_dir = os.path.join(os.getcwd(), "temp") | |
| os.makedirs(temp_dir, exist_ok=True) | |
| temp_path = os.path.join( | |
| temp_dir, f"temp_{asyncio.get_event_loop().time()}.png" | |
| ) | |
| async with aiofiles.open(temp_path, "wb") as temp_file: | |
| await temp_file.write(content) | |
| loop = asyncio.get_event_loop() | |
| image = Image.open(temp_path) | |
| text = await loop.run_in_executor(None, pytesseract.image_to_string, image) | |
| os.remove(temp_path) | |
| return text | |
| except Exception as e: | |
| logger.error(f"Error extracting text from image: {str(e)}") | |
| raise ValueError( | |
| "Failed to extract text from image. Please ensure Tesseract is properly installed." | |
| ) | |
| async def _extract_text_from_pdf(self, content: bytes) -> str: | |
| """Extract text from PDF.""" | |
| try: | |
| pdf_file = io.BytesIO(content) | |
| pdf_reader = PyPDF2.PdfReader(pdf_file) | |
| text = "" | |
| for page in pdf_reader.pages: | |
| text += page.extract_text() | |
| return text | |
| except Exception as e: | |
| logger.error(f"Error extracting text from PDF: {str(e)}") | |
| raise ValueError("Failed to extract text from PDF") | |
| async def _extract_text_from_docx(self, content: bytes) -> str: | |
| """Extract text from DOCX.""" | |
| try: | |
| docx_file = io.BytesIO(content) | |
| doc = Document(docx_file) | |
| text = "" | |
| for paragraph in doc.paragraphs: | |
| text += paragraph.text + "\n" | |
| return text | |
| except Exception as e: | |
| logger.error(f"Error extracting text from DOCX: {str(e)}") | |
| raise ValueError("Failed to extract text from DOCX") | |
| async def markdown_to_docx(self, markdown_text: str) -> bytes: | |
| """Convert markdown text to DOCX format.""" | |
| try: | |
| html = markdown.markdown(markdown_text) | |
| soup = BeautifulSoup(html, "html.parser") | |
| doc = Document() | |
| for element in soup.contents: | |
| if element.name == "h1": | |
| doc.add_heading(element.text, level=1) | |
| elif element.name == "h2": | |
| doc.add_heading(element.text, level=2) | |
| elif element.name == "h3": | |
| doc.add_heading(element.text, level=3) | |
| elif element.name == "p": | |
| doc.add_paragraph(element.text) | |
| elif element.name == "ul": | |
| for li in element.find_all("li"): | |
| doc.add_paragraph(li.text, style="List Bullet") | |
| elif element.name == "ol": | |
| for li in element.find_all("li"): | |
| doc.add_paragraph(li.text, style="List Number") | |
| elif element.name == "blockquote": | |
| doc.add_paragraph(element.text, style="Intense Quote") | |
| else: | |
| doc.add_paragraph(element.text) | |
| docx_bytes = io.BytesIO() | |
| doc.save(docx_bytes) | |
| docx_bytes.seek(0) | |
| return docx_bytes.getvalue() | |
| except Exception as e: | |
| logger.error(f"Error converting markdown to DOCX: {str(e)}") | |
| raise ValueError("Failed to convert markdown to DOCX") | |