import io import json import os from typing import BinaryIO, List from uuid import UUID from fastapi import UploadFile import markdown from docx import Document from docx.shared import Inches from bs4 import BeautifulSoup import PyPDF2 from PIL import Image import pytesseract import aiofiles import asyncio from src.config import logger import httpx import shutil from src.repositories import SOWRepository class SOWClient: def __init__(self): self._repository = SOWRepository tesseract_path = os.getenv("TESSERACT_PATH", "/usr/bin/tesseract") if os.path.exists(tesseract_path): pytesseract.pytesseract.tesseract_cmd = tesseract_path logger.info(f"Tesseract path configured: {tesseract_path}") else: logger.warning( f"Tesseract not found at {tesseract_path}. OCR functionality may not work." ) async def __aenter__(self): return self async def __aexit__(self, exc_type, exc_value, traceback): pass async def generate_sow(self, sow_id: UUID) -> dict: content = None question_json = None async with self._repository() as repository: sows = await repository.get_sows(sow_id=sow_id) folder_path = os.path.join(os.getcwd(), "temp", str(sow_id)) if os.path.exists(folder_path): content = await self._read_uploaded_files(id=sow_id) sow = sows[0] question_json = json.loads(sow["additional_info"]) if sow["additional_info"] else None sow["requirement"] = content if content else sow["requirement"] del sow["id"] del sow["created_at"] del sow["updated_at"] url = os.getenv( "NN_SOW_URL", "https://n8n.srv763317.hstgr.cloud/webhook/sow-generate" ) async with httpx.AsyncClient() as client: response = await client.post( url, json=sow, timeout=3000, ) response_json = response.json() generated_sow_json = response_json["message"]["content"] if generated_sow_json["is_required_questions"]: if not question_json: question_json = {} for question in generated_sow_json["questions"]: question_json[question] = None question_json = json.dumps(question_json) generated_sow = ( generated_sow_json["generated_sow"] if generated_sow_json["generated_sow"] else None ) async with self._repository() as repository: updated_sow = await repository.update_sow( sow_id=sow_id, sow={ "sow_generated_text": generated_sow, "additional_info": json.dumps(question_json) if isinstance( question_json, dict) else question_json, "requirement": content if content else sow["requirement"], }, ) await self._remove_uploaded_files(id=sow_id) return updated_sow async def save_uploaded_files(self, id: UUID, files: List[UploadFile]) -> str: temp_dir = os.path.join(os.getcwd(), "temp") folder_path = os.path.join(temp_dir, str(id)) os.makedirs(folder_path, exist_ok=True) for file in files: file_path = os.path.join(folder_path, file.filename) async with aiofiles.open(file_path, "wb") as temp_file: await temp_file.write(await file.read()) return folder_path async def _remove_uploaded_files(self, id: UUID): folder_path = os.path.join(os.getcwd(), "temp", str(id)) if os.path.exists(folder_path): shutil.rmtree(folder_path) async def _read_uploaded_files(self, id: UUID) -> str: folder_path = os.path.join(os.getcwd(), "temp", str(id)) final_content = "" if os.path.exists(folder_path): for file in os.listdir(folder_path): file_path = os.path.join(folder_path, file) if os.path.isfile(file_path): content = await self._extract_text_from_file(file_path=file_path) final_content += content return final_content async def _extract_text_from_file(self, file_path: str) -> str: """Extract text from various file formats.""" async with aiofiles.open(file_path, "rb") as file: content = await file.read() file_extension = os.path.splitext(file_path)[1].lower() if file_extension in [".jpg", ".jpeg", ".png"]: return await self._extract_text_from_image(content) elif file_extension == ".pdf": return await self._extract_text_from_pdf(content) elif file_extension == ".docx": return await self._extract_text_from_docx(content) elif file_extension in [".txt", ".md"]: return content.decode("utf-8") else: raise ValueError(f"Unsupported file format: {file_extension}") async def _extract_text_from_image(self, content: bytes) -> str: """Extract text from image using OCR.""" try: temp_dir = os.path.join(os.getcwd(), "temp") os.makedirs(temp_dir, exist_ok=True) temp_path = os.path.join( temp_dir, f"temp_{asyncio.get_event_loop().time()}.png" ) async with aiofiles.open(temp_path, "wb") as temp_file: await temp_file.write(content) loop = asyncio.get_event_loop() image = Image.open(temp_path) text = await loop.run_in_executor(None, pytesseract.image_to_string, image) os.remove(temp_path) return text except Exception as e: logger.error(f"Error extracting text from image: {str(e)}") raise ValueError( "Failed to extract text from image. Please ensure Tesseract is properly installed." ) async def _extract_text_from_pdf(self, content: bytes) -> str: """Extract text from PDF.""" try: pdf_file = io.BytesIO(content) pdf_reader = PyPDF2.PdfReader(pdf_file) text = "" for page in pdf_reader.pages: text += page.extract_text() return text except Exception as e: logger.error(f"Error extracting text from PDF: {str(e)}") raise ValueError("Failed to extract text from PDF") async def _extract_text_from_docx(self, content: bytes) -> str: """Extract text from DOCX.""" try: docx_file = io.BytesIO(content) doc = Document(docx_file) text = "" for paragraph in doc.paragraphs: text += paragraph.text + "\n" return text except Exception as e: logger.error(f"Error extracting text from DOCX: {str(e)}") raise ValueError("Failed to extract text from DOCX") async def markdown_to_docx(self, markdown_text: str) -> bytes: """Convert markdown text to DOCX format.""" try: html = markdown.markdown(markdown_text) soup = BeautifulSoup(html, "html.parser") doc = Document() for element in soup.contents: if element.name == "h1": doc.add_heading(element.text, level=1) elif element.name == "h2": doc.add_heading(element.text, level=2) elif element.name == "h3": doc.add_heading(element.text, level=3) elif element.name == "p": doc.add_paragraph(element.text) elif element.name == "ul": for li in element.find_all("li"): doc.add_paragraph(li.text, style="List Bullet") elif element.name == "ol": for li in element.find_all("li"): doc.add_paragraph(li.text, style="List Number") elif element.name == "blockquote": doc.add_paragraph(element.text, style="Intense Quote") else: doc.add_paragraph(element.text) docx_bytes = io.BytesIO() doc.save(docx_bytes) docx_bytes.seek(0) return docx_bytes.getvalue() except Exception as e: logger.error(f"Error converting markdown to DOCX: {str(e)}") raise ValueError("Failed to convert markdown to DOCX")