ipns-sow / src /utils /_sow_client.py
Aryan Jain
fix json dump issue
df99e8c
import io
import json
import os
from typing import BinaryIO, List
from uuid import UUID
from fastapi import UploadFile
import markdown
from docx import Document
from docx.shared import Inches
from bs4 import BeautifulSoup
import PyPDF2
from PIL import Image
import pytesseract
import aiofiles
import asyncio
from src.config import logger
import httpx
import shutil
from src.repositories import SOWRepository
class SOWClient:
def __init__(self):
self._repository = SOWRepository
tesseract_path = os.getenv("TESSERACT_PATH", "/usr/bin/tesseract")
if os.path.exists(tesseract_path):
pytesseract.pytesseract.tesseract_cmd = tesseract_path
logger.info(f"Tesseract path configured: {tesseract_path}")
else:
logger.warning(
f"Tesseract not found at {tesseract_path}. OCR functionality may not work."
)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
async def generate_sow(self, sow_id: UUID) -> dict:
content = None
question_json = None
async with self._repository() as repository:
sows = await repository.get_sows(sow_id=sow_id)
folder_path = os.path.join(os.getcwd(), "temp", str(sow_id))
if os.path.exists(folder_path):
content = await self._read_uploaded_files(id=sow_id)
sow = sows[0]
question_json = json.loads(sow["additional_info"]) if sow["additional_info"] else None
sow["requirement"] = content if content else sow["requirement"]
del sow["id"]
del sow["created_at"]
del sow["updated_at"]
url = os.getenv(
"NN_SOW_URL", "https://n8n.srv763317.hstgr.cloud/webhook/sow-generate"
)
async with httpx.AsyncClient() as client:
response = await client.post(
url,
json=sow,
timeout=3000,
)
response_json = response.json()
generated_sow_json = response_json["message"]["content"]
if generated_sow_json["is_required_questions"]:
if not question_json:
question_json = {}
for question in generated_sow_json["questions"]:
question_json[question] = None
question_json = json.dumps(question_json)
generated_sow = (
generated_sow_json["generated_sow"]
if generated_sow_json["generated_sow"]
else None
)
async with self._repository() as repository:
updated_sow = await repository.update_sow(
sow_id=sow_id,
sow={
"sow_generated_text": generated_sow,
"additional_info": json.dumps(question_json) if isinstance(
question_json, dict) else question_json,
"requirement": content if content else sow["requirement"],
},
)
await self._remove_uploaded_files(id=sow_id)
return updated_sow
async def save_uploaded_files(self, id: UUID, files: List[UploadFile]) -> str:
temp_dir = os.path.join(os.getcwd(), "temp")
folder_path = os.path.join(temp_dir, str(id))
os.makedirs(folder_path, exist_ok=True)
for file in files:
file_path = os.path.join(folder_path, file.filename)
async with aiofiles.open(file_path, "wb") as temp_file:
await temp_file.write(await file.read())
return folder_path
async def _remove_uploaded_files(self, id: UUID):
folder_path = os.path.join(os.getcwd(), "temp", str(id))
if os.path.exists(folder_path):
shutil.rmtree(folder_path)
async def _read_uploaded_files(self, id: UUID) -> str:
folder_path = os.path.join(os.getcwd(), "temp", str(id))
final_content = ""
if os.path.exists(folder_path):
for file in os.listdir(folder_path):
file_path = os.path.join(folder_path, file)
if os.path.isfile(file_path):
content = await self._extract_text_from_file(file_path=file_path)
final_content += content
return final_content
async def _extract_text_from_file(self, file_path: str) -> str:
"""Extract text from various file formats."""
async with aiofiles.open(file_path, "rb") as file:
content = await file.read()
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension in [".jpg", ".jpeg", ".png"]:
return await self._extract_text_from_image(content)
elif file_extension == ".pdf":
return await self._extract_text_from_pdf(content)
elif file_extension == ".docx":
return await self._extract_text_from_docx(content)
elif file_extension in [".txt", ".md"]:
return content.decode("utf-8")
else:
raise ValueError(f"Unsupported file format: {file_extension}")
async def _extract_text_from_image(self, content: bytes) -> str:
"""Extract text from image using OCR."""
try:
temp_dir = os.path.join(os.getcwd(), "temp")
os.makedirs(temp_dir, exist_ok=True)
temp_path = os.path.join(
temp_dir, f"temp_{asyncio.get_event_loop().time()}.png"
)
async with aiofiles.open(temp_path, "wb") as temp_file:
await temp_file.write(content)
loop = asyncio.get_event_loop()
image = Image.open(temp_path)
text = await loop.run_in_executor(None, pytesseract.image_to_string, image)
os.remove(temp_path)
return text
except Exception as e:
logger.error(f"Error extracting text from image: {str(e)}")
raise ValueError(
"Failed to extract text from image. Please ensure Tesseract is properly installed."
)
async def _extract_text_from_pdf(self, content: bytes) -> str:
"""Extract text from PDF."""
try:
pdf_file = io.BytesIO(content)
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
return text
except Exception as e:
logger.error(f"Error extracting text from PDF: {str(e)}")
raise ValueError("Failed to extract text from PDF")
async def _extract_text_from_docx(self, content: bytes) -> str:
"""Extract text from DOCX."""
try:
docx_file = io.BytesIO(content)
doc = Document(docx_file)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
except Exception as e:
logger.error(f"Error extracting text from DOCX: {str(e)}")
raise ValueError("Failed to extract text from DOCX")
async def markdown_to_docx(self, markdown_text: str) -> bytes:
"""Convert markdown text to DOCX format."""
try:
html = markdown.markdown(markdown_text)
soup = BeautifulSoup(html, "html.parser")
doc = Document()
for element in soup.contents:
if element.name == "h1":
doc.add_heading(element.text, level=1)
elif element.name == "h2":
doc.add_heading(element.text, level=2)
elif element.name == "h3":
doc.add_heading(element.text, level=3)
elif element.name == "p":
doc.add_paragraph(element.text)
elif element.name == "ul":
for li in element.find_all("li"):
doc.add_paragraph(li.text, style="List Bullet")
elif element.name == "ol":
for li in element.find_all("li"):
doc.add_paragraph(li.text, style="List Number")
elif element.name == "blockquote":
doc.add_paragraph(element.text, style="Intense Quote")
else:
doc.add_paragraph(element.text)
docx_bytes = io.BytesIO()
doc.save(docx_bytes)
docx_bytes.seek(0)
return docx_bytes.getvalue()
except Exception as e:
logger.error(f"Error converting markdown to DOCX: {str(e)}")
raise ValueError("Failed to convert markdown to DOCX")