backend / tools /tools.py
KevanSoon
stricter tools.py prompt
d9b934b
# ./tools/tools.py
import os
import json
import logging
import textwrap
import asyncio
import re
import httpx
import langextract as lx
from bs4 import BeautifulSoup
from dotenv import load_dotenv
import google.generativeai as genai
# Step 1: Load environment variables and configure API keys
load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
try:
api_key = os.getenv("GEMINI_API_KEY")
if not api_key:
raise ValueError("GEMINI_API_KEY not found in environment variables.")
os.environ["LANGEXTRACT_API_KEY"] = api_key
genai.configure(api_key=api_key)
except ValueError as e:
logger.warning(f"API not configured. Tool will fail. Reason: {e}")
def extract_text_from_html(html_content: str) -> str:
"""
Parses an HTML string and extracts all human-readable text from the body.
"""
if not html_content:
return ""
soup = BeautifulSoup(html_content, "html.parser")
for script_or_style in soup(["script", "style"]):
script_or_style.decompose()
text = soup.get_text(separator=" ", strip=True)
return text
async def _pre_clean_text_with_gemini(messy_text: str) -> str:
"""
Takes messy OCR text and uses Gemini to clean it into a coherent document.
"""
model = genai.GenerativeModel(model_name="gemini-2.5-flash")
prompt = textwrap.dedent(
f"""
The following text is from a messy OCR process. It contains extra spaces, incorrect line breaks, and jumbled words.
Your task is to clean and reformat it into a single, coherent block of text that reads like a proper document.
Do not summarize or change the content. Just fix the formatting and structure.
Return ONLY the cleaned text, with no explanations.
**Messy Text:**
---
{messy_text}
---
"""
)
try:
response = await model.generate_content_async(prompt)
return response.text.strip()
except Exception as e:
logger.error(f"Error during text pre-cleaning: {e}")
return messy_text
async def _translate_text_to_english_with_sealion(text: str) -> str:
"""
Translates the given text to English using the Sea-Lion model.
"""
url = "https://api.sea-lion.ai/v1/chat/completions"
api_key = os.getenv("SEALION_API_KEY")
if not api_key:
logger.warning("SEALION_API_KEY not found. Skipping translation.")
return text
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
prompt = f'Translate the following text to English. Return ONLY the translated text, without any additional explanations, formatting, or quotation marks:\n\n"{text}"'
payload = {
"max_completion_tokens": 4096,
"messages": [{"role": "user", "content": prompt}],
"model": "aisingapore/Gemma-SEA-LION-v3-9B-IT",
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(
url, headers=headers, json=payload, timeout=60.0
)
response.raise_for_status()
response_json = response.json()
translated_text = response_json["choices"][0]["message"]["content"].strip()
return re.sub(r'^"|"$', "", translated_text)
except httpx.RequestError as e:
logger.error(f"Translation request to Sea-Lion failed: {e}")
return text
except (KeyError, IndexError) as e:
logger.error(f"Could not parse Sea-Lion translation response: {e}")
return text
async def _generate_html_summary(extracted_data: dict) -> str:
"""
Takes the structured data and generates a clean, user-friendly HTML summary sheet in English.
"""
model = genai.GenerativeModel(model_name="gemini-2.5-flash")
prompt_data = json.dumps(extracted_data, indent=2, ensure_ascii=False)
prompt = textwrap.dedent(
f"""
You are a web designer creating a one-page summary sheet.
Your task is to convert the following JSON data into a simple, clean, and easy-to-read HTML document.
The entire document MUST be in English.
**JSON Data:**
```json
{prompt_data}
```
**Instructions:**
1. Use a single HTML file structure. Include modern, clean CSS in a `<style>` tag.
2. Create a main container and use a card-based layout.
3. Use clear headings (e.g., `<h2>`, `<h3>`) for each section.
4. Display the `summary` for each clause prominently.
5. The final output must ONLY be the raw HTML code.
"""
)
try:
response = await model.generate_content_async(prompt)
html_match = re.search(r"```html\n(.*?)\n```", response.text, re.DOTALL)
if html_match:
return html_match.group(1).strip()
return response.text.strip()
except Exception as e:
logger.error(f"Error generating HTML summary: {e}")
return f"<html><body><h1>Error</h1><p>Could not generate the final summary sheet.</p><p>{str(e)}</p></body></html>"
async def analyze_contract(html_content: str) -> dict:
"""
Analyzes a contract by cleaning, translating, extracting data, and generating a summary.
"""
messy_document_text = extract_text_from_html(html_content)
if not messy_document_text.strip():
return {
"error": "Could not extract any meaningful text from the provided HTML content."
}
logger.info("Stage 1: Pre-cleaning raw text...")
cleaned_document_text = await _pre_clean_text_with_gemini(messy_document_text)
logger.info("Stage 1: Pre-cleaning complete.")
logger.info("Stage 2: Translating text to English with Sea-Lion...")
english_document_text = await _translate_text_to_english_with_sealion(
cleaned_document_text
)
logger.info("Stage 2: Translation complete.")
# --- START: IMPROVED PROMPT AND EXAMPLES ---
prompt = textwrap.dedent(
"""
You are a meticulous data extraction system specializing in payslips and employment contracts.
Your task is to extract specific entities from the provided English text. Follow these rules precisely:
**Extraction Rules:**
1. **Extract Exact Text:** The `extraction_text` must be the exact text from the document representing the entity's value, without including the label (e.g., for "Basic Pay: $2000", extract "$2000", not the whole phrase).
2. **Do Not Overlap:** Entities must not overlap.
3. **Be Comprehensive:** Extract all occurrences of each entity type. For example, if there are multiple bonuses or deductions, extract each one as a separate entity.
4. **No Inference:** If an entity is not explicitly mentioned, do not extract anything for it. Do not invent information.
**Entities to Extract:**
- `employer`: The name of the company or employer.
- `employee`: The name of the employee.
- `pay_period`: The specific date range for the payslip (e.g., "September 1, 2021 to September 30, 2021").
- `salary`: The primary or base salary amount.
- `deductions`: Any amount subtracted from the pay.
- `bonus`: Any additional payments like bonuses, allowances, or overtime pay.
**Attribute Generation:**
- For every extraction, you MUST generate a `summary` attribute.
- The summary should be a complete, simple English sentence describing the extracted entity. For example: "The employer is ABC PTE LTD." or "The base salary is $2000."
"""
)
examples = [
# Example 1: Clean, standard key-value format
lx.data.ExampleData(
text="Payslip for September 1, 2021 - September 30, 2021. Company: ABC PTE LTD. Staff: Tan Ah Kow. Basic Pay: $2000. Annual Bonus: $2000.",
extractions=[
lx.data.Extraction(
extraction_class="pay_period",
extraction_text="September 1, 2021 - September 30, 2021",
attributes={
"summary": "The pay period is from September 1, 2021 to September 30, 2021."
},
),
lx.data.Extraction(
extraction_class="employer",
extraction_text="ABC PTE LTD",
attributes={"summary": "The employer is ABC PTE LTD."},
),
lx.data.Extraction(
extraction_class="employee",
extraction_text="Tan Ah Kow",
attributes={"summary": "The employee's name is Tan Ah Kow."},
),
lx.data.Extraction(
extraction_class="salary",
extraction_text="$2000",
attributes={"summary": "The base salary is $2000."},
),
lx.data.Extraction(
extraction_class="bonus",
extraction_text="$2000",
attributes={"summary": "An annual bonus of $2000 was paid."},
),
],
),
# Example 2: Messier, tabular-style text without clear key-value pairs
lx.data.ExampleData(
text="Employer Name ABC Global Services Period of Pay 01/10/2022 to 31/10/2022 Employee John Doe Earnings Base Salary 3,500.00 Transport Allowance 150.00 Deductions CPF Contribution 700.00",
extractions=[
lx.data.Extraction(
extraction_class="employer",
extraction_text="ABC Global Services",
attributes={"summary": "The employer is ABC Global Services."},
),
lx.data.Extraction(
extraction_class="pay_period",
extraction_text="01/10/2022 to 31/10/2022",
attributes={
"summary": "The pay period is from 01/10/2022 to 31/10/2022."
},
),
lx.data.Extraction(
extraction_class="employee",
extraction_text="John Doe",
attributes={"summary": "The employee's name is John Doe."},
),
lx.data.Extraction(
extraction_class="salary",
extraction_text="3,500.00",
attributes={"summary": "The base salary is 3,500.00."},
),
lx.data.Extraction(
extraction_class="bonus",
extraction_text="150.00",
attributes={
"summary": "A transport allowance of 150.00 was provided."
},
),
lx.data.Extraction(
extraction_class="deductions",
extraction_text="700.00",
attributes={"summary": "A CPF deduction of 700.00 was made."},
),
],
),
# Example 3: Multiple entries for one class, and a missing class
lx.data.ExampleData(
text="Payslip for Jane Smith at Innovate Corp. For the month of November 2023. Salary: 4000 SGD. Deductions include a loan payment of 200 and a charity donation of 50. No bonus was issued.",
extractions=[
lx.data.Extraction(
extraction_class="employee",
extraction_text="Jane Smith",
attributes={"summary": "The employee's name is Jane Smith."},
),
lx.data.Extraction(
extraction_class="employer",
extraction_text="Innovate Corp",
attributes={"summary": "The employer is Innovate Corp."},
),
lx.data.Extraction(
extraction_class="pay_period",
extraction_text="November 2023",
attributes={
"summary": "The pay period is for the month of November 2023."
},
),
lx.data.Extraction(
extraction_class="salary",
extraction_text="4000 SGD",
attributes={"summary": "The salary is 4000 SGD."},
),
lx.data.Extraction(
extraction_class="deductions",
extraction_text="200",
attributes={"summary": "A loan payment deduction of 200 was made."},
),
lx.data.Extraction(
extraction_class="deductions",
extraction_text="50",
attributes={
"summary": "A charity donation deduction of 50 was made."
},
),
],
),
]
# --- END: IMPROVED PROMPT AND EXAMPLES ---
try:
logger.info("Stage 3: Starting structured data extraction from English text...")
annotated_document = await asyncio.to_thread(
lx.extract,
text_or_documents=english_document_text,
prompt_description=prompt,
examples=examples,
model_id="gemini-2.5-flash",
)
logger.info("Stage 3: Extraction complete.")
extracted_data = {}
debug_visualization_html = lx.visualize(annotated_document)
for extr in annotated_document.extractions:
if extr.attributes:
class_key = extr.extraction_class.replace(" ", "_")
if class_key not in extracted_data:
extracted_data[class_key] = []
extracted_data[class_key].append(
{
"text": extr.extraction_text,
"summary": extr.attributes.get(
"summary", "No summary provided."
),
}
)
logger.info("Stage 4: Generating final HTML summary sheet...")
summary_sheet_html = await _generate_html_summary(extracted_data)
logger.info("Stage 4: HTML summary sheet generated.")
return {
"language": "en",
"extracted_data": extracted_data,
"summary_sheet_html": summary_sheet_html,
"debug_visualization_html": debug_visualization_html,
}
except Exception as e:
logger.error(f"An error occurred during contract analysis: {e}", exc_info=True)
return {"error": f"An unexpected error occurred: {str(e)}"}