Spaces:
Runtime error
Runtime error
File size: 8,624 Bytes
12874e6 e3ba939 12874e6 1b605f8 12874e6 1b605f8 12874e6 1b605f8 12874e6 1b605f8 d191819 1b605f8 d191819 e3ba939 d191819 1b605f8 d191819 1b605f8 d191819 1b605f8 d191819 e3ba939 d191819 e3ba939 d191819 1b605f8 d191819 df99e8c d191819 1b605f8 12874e6 1b605f8 12874e6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 | import io
import json
import os
from typing import BinaryIO, List
from uuid import UUID
from fastapi import UploadFile
import markdown
from docx import Document
from docx.shared import Inches
from bs4 import BeautifulSoup
import PyPDF2
from PIL import Image
import pytesseract
import aiofiles
import asyncio
from src.config import logger
import httpx
import shutil
from src.repositories import SOWRepository
class SOWClient:
def __init__(self):
self._repository = SOWRepository
tesseract_path = os.getenv("TESSERACT_PATH", "/usr/bin/tesseract")
if os.path.exists(tesseract_path):
pytesseract.pytesseract.tesseract_cmd = tesseract_path
logger.info(f"Tesseract path configured: {tesseract_path}")
else:
logger.warning(
f"Tesseract not found at {tesseract_path}. OCR functionality may not work."
)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_value, traceback):
pass
async def generate_sow(self, sow_id: UUID) -> dict:
content = None
question_json = None
async with self._repository() as repository:
sows = await repository.get_sows(sow_id=sow_id)
folder_path = os.path.join(os.getcwd(), "temp", str(sow_id))
if os.path.exists(folder_path):
content = await self._read_uploaded_files(id=sow_id)
sow = sows[0]
question_json = json.loads(sow["additional_info"]) if sow["additional_info"] else None
sow["requirement"] = content if content else sow["requirement"]
del sow["id"]
del sow["created_at"]
del sow["updated_at"]
url = os.getenv(
"NN_SOW_URL", "https://n8n.srv763317.hstgr.cloud/webhook/sow-generate"
)
async with httpx.AsyncClient() as client:
response = await client.post(
url,
json=sow,
timeout=3000,
)
response_json = response.json()
generated_sow_json = response_json["message"]["content"]
if generated_sow_json["is_required_questions"]:
if not question_json:
question_json = {}
for question in generated_sow_json["questions"]:
question_json[question] = None
question_json = json.dumps(question_json)
generated_sow = (
generated_sow_json["generated_sow"]
if generated_sow_json["generated_sow"]
else None
)
async with self._repository() as repository:
updated_sow = await repository.update_sow(
sow_id=sow_id,
sow={
"sow_generated_text": generated_sow,
"additional_info": json.dumps(question_json) if isinstance(
question_json, dict) else question_json,
"requirement": content if content else sow["requirement"],
},
)
await self._remove_uploaded_files(id=sow_id)
return updated_sow
async def save_uploaded_files(self, id: UUID, files: List[UploadFile]) -> str:
temp_dir = os.path.join(os.getcwd(), "temp")
folder_path = os.path.join(temp_dir, str(id))
os.makedirs(folder_path, exist_ok=True)
for file in files:
file_path = os.path.join(folder_path, file.filename)
async with aiofiles.open(file_path, "wb") as temp_file:
await temp_file.write(await file.read())
return folder_path
async def _remove_uploaded_files(self, id: UUID):
folder_path = os.path.join(os.getcwd(), "temp", str(id))
if os.path.exists(folder_path):
shutil.rmtree(folder_path)
async def _read_uploaded_files(self, id: UUID) -> str:
folder_path = os.path.join(os.getcwd(), "temp", str(id))
final_content = ""
if os.path.exists(folder_path):
for file in os.listdir(folder_path):
file_path = os.path.join(folder_path, file)
if os.path.isfile(file_path):
content = await self._extract_text_from_file(file_path=file_path)
final_content += content
return final_content
async def _extract_text_from_file(self, file_path: str) -> str:
"""Extract text from various file formats."""
async with aiofiles.open(file_path, "rb") as file:
content = await file.read()
file_extension = os.path.splitext(file_path)[1].lower()
if file_extension in [".jpg", ".jpeg", ".png"]:
return await self._extract_text_from_image(content)
elif file_extension == ".pdf":
return await self._extract_text_from_pdf(content)
elif file_extension == ".docx":
return await self._extract_text_from_docx(content)
elif file_extension in [".txt", ".md"]:
return content.decode("utf-8")
else:
raise ValueError(f"Unsupported file format: {file_extension}")
async def _extract_text_from_image(self, content: bytes) -> str:
"""Extract text from image using OCR."""
try:
temp_dir = os.path.join(os.getcwd(), "temp")
os.makedirs(temp_dir, exist_ok=True)
temp_path = os.path.join(
temp_dir, f"temp_{asyncio.get_event_loop().time()}.png"
)
async with aiofiles.open(temp_path, "wb") as temp_file:
await temp_file.write(content)
loop = asyncio.get_event_loop()
image = Image.open(temp_path)
text = await loop.run_in_executor(None, pytesseract.image_to_string, image)
os.remove(temp_path)
return text
except Exception as e:
logger.error(f"Error extracting text from image: {str(e)}")
raise ValueError(
"Failed to extract text from image. Please ensure Tesseract is properly installed."
)
async def _extract_text_from_pdf(self, content: bytes) -> str:
"""Extract text from PDF."""
try:
pdf_file = io.BytesIO(content)
pdf_reader = PyPDF2.PdfReader(pdf_file)
text = ""
for page in pdf_reader.pages:
text += page.extract_text()
return text
except Exception as e:
logger.error(f"Error extracting text from PDF: {str(e)}")
raise ValueError("Failed to extract text from PDF")
async def _extract_text_from_docx(self, content: bytes) -> str:
"""Extract text from DOCX."""
try:
docx_file = io.BytesIO(content)
doc = Document(docx_file)
text = ""
for paragraph in doc.paragraphs:
text += paragraph.text + "\n"
return text
except Exception as e:
logger.error(f"Error extracting text from DOCX: {str(e)}")
raise ValueError("Failed to extract text from DOCX")
async def markdown_to_docx(self, markdown_text: str) -> bytes:
"""Convert markdown text to DOCX format."""
try:
html = markdown.markdown(markdown_text)
soup = BeautifulSoup(html, "html.parser")
doc = Document()
for element in soup.contents:
if element.name == "h1":
doc.add_heading(element.text, level=1)
elif element.name == "h2":
doc.add_heading(element.text, level=2)
elif element.name == "h3":
doc.add_heading(element.text, level=3)
elif element.name == "p":
doc.add_paragraph(element.text)
elif element.name == "ul":
for li in element.find_all("li"):
doc.add_paragraph(li.text, style="List Bullet")
elif element.name == "ol":
for li in element.find_all("li"):
doc.add_paragraph(li.text, style="List Number")
elif element.name == "blockquote":
doc.add_paragraph(element.text, style="Intense Quote")
else:
doc.add_paragraph(element.text)
docx_bytes = io.BytesIO()
doc.save(docx_bytes)
docx_bytes.seek(0)
return docx_bytes.getvalue()
except Exception as e:
logger.error(f"Error converting markdown to DOCX: {str(e)}")
raise ValueError("Failed to convert markdown to DOCX")
|