|
|
|
|
|
|
|
|
import os |
|
|
import json |
|
|
import logging |
|
|
import textwrap |
|
|
import asyncio |
|
|
import re |
|
|
import httpx |
|
|
|
|
|
import langextract as lx |
|
|
from bs4 import BeautifulSoup |
|
|
from dotenv import load_dotenv |
|
|
import google.generativeai as genai |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
try: |
|
|
api_key = os.getenv("GEMINI_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError("GEMINI_API_KEY not found in environment variables.") |
|
|
os.environ["LANGEXTRACT_API_KEY"] = api_key |
|
|
genai.configure(api_key=api_key) |
|
|
except ValueError as e: |
|
|
logger.warning(f"API not configured. Tool will fail. Reason: {e}") |
|
|
|
|
|
|
|
|
def extract_text_from_html(html_content: str) -> str: |
|
|
""" |
|
|
Parses an HTML string and extracts all human-readable text from the body. |
|
|
""" |
|
|
if not html_content: |
|
|
return "" |
|
|
soup = BeautifulSoup(html_content, "html.parser") |
|
|
for script_or_style in soup(["script", "style"]): |
|
|
script_or_style.decompose() |
|
|
text = soup.get_text(separator=" ", strip=True) |
|
|
return text |
|
|
|
|
|
|
|
|
async def _pre_clean_text_with_gemini(messy_text: str) -> str: |
|
|
""" |
|
|
Takes messy OCR text and uses Gemini to clean it into a coherent document. |
|
|
""" |
|
|
model = genai.GenerativeModel(model_name="gemini-2.5-flash") |
|
|
prompt = textwrap.dedent( |
|
|
f""" |
|
|
The following text is from a messy OCR process. It contains extra spaces, incorrect line breaks, and jumbled words. |
|
|
Your task is to clean and reformat it into a single, coherent block of text that reads like a proper document. |
|
|
Do not summarize or change the content. Just fix the formatting and structure. |
|
|
Return ONLY the cleaned text, with no explanations. |
|
|
|
|
|
**Messy Text:** |
|
|
--- |
|
|
{messy_text} |
|
|
--- |
|
|
""" |
|
|
) |
|
|
try: |
|
|
response = await model.generate_content_async(prompt) |
|
|
return response.text.strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Error during text pre-cleaning: {e}") |
|
|
return messy_text |
|
|
|
|
|
|
|
|
async def _translate_text_to_english_with_sealion(text: str) -> str: |
|
|
""" |
|
|
Translates the given text to English using the Sea-Lion model. |
|
|
""" |
|
|
url = "https://api.sea-lion.ai/v1/chat/completions" |
|
|
api_key = os.getenv("SEALION_API_KEY") |
|
|
|
|
|
if not api_key: |
|
|
logger.warning("SEALION_API_KEY not found. Skipping translation.") |
|
|
return text |
|
|
|
|
|
headers = { |
|
|
"Authorization": f"Bearer {api_key}", |
|
|
"Content-Type": "application/json", |
|
|
} |
|
|
prompt = f'Translate the following text to English. Return ONLY the translated text, without any additional explanations, formatting, or quotation marks:\n\n"{text}"' |
|
|
payload = { |
|
|
"max_completion_tokens": 4096, |
|
|
"messages": [{"role": "user", "content": prompt}], |
|
|
"model": "aisingapore/Gemma-SEA-LION-v3-9B-IT", |
|
|
} |
|
|
|
|
|
async with httpx.AsyncClient() as client: |
|
|
try: |
|
|
response = await client.post( |
|
|
url, headers=headers, json=payload, timeout=60.0 |
|
|
) |
|
|
response.raise_for_status() |
|
|
response_json = response.json() |
|
|
translated_text = response_json["choices"][0]["message"]["content"].strip() |
|
|
return re.sub(r'^"|"$', "", translated_text) |
|
|
except httpx.RequestError as e: |
|
|
logger.error(f"Translation request to Sea-Lion failed: {e}") |
|
|
return text |
|
|
except (KeyError, IndexError) as e: |
|
|
logger.error(f"Could not parse Sea-Lion translation response: {e}") |
|
|
return text |
|
|
|
|
|
|
|
|
async def _generate_html_summary(extracted_data: dict) -> str: |
|
|
""" |
|
|
Takes the structured data and generates a clean, user-friendly HTML summary sheet in English. |
|
|
""" |
|
|
model = genai.GenerativeModel(model_name="gemini-2.5-flash") |
|
|
prompt_data = json.dumps(extracted_data, indent=2, ensure_ascii=False) |
|
|
prompt = textwrap.dedent( |
|
|
f""" |
|
|
You are a web designer creating a one-page summary sheet. |
|
|
Your task is to convert the following JSON data into a simple, clean, and easy-to-read HTML document. |
|
|
The entire document MUST be in English. |
|
|
|
|
|
**JSON Data:** |
|
|
```json |
|
|
{prompt_data} |
|
|
``` |
|
|
|
|
|
**Instructions:** |
|
|
1. Use a single HTML file structure. Include modern, clean CSS in a `<style>` tag. |
|
|
2. Create a main container and use a card-based layout. |
|
|
3. Use clear headings (e.g., `<h2>`, `<h3>`) for each section. |
|
|
4. Display the `summary` for each clause prominently. |
|
|
5. The final output must ONLY be the raw HTML code. |
|
|
""" |
|
|
) |
|
|
try: |
|
|
response = await model.generate_content_async(prompt) |
|
|
html_match = re.search(r"```html\n(.*?)\n```", response.text, re.DOTALL) |
|
|
if html_match: |
|
|
return html_match.group(1).strip() |
|
|
return response.text.strip() |
|
|
except Exception as e: |
|
|
logger.error(f"Error generating HTML summary: {e}") |
|
|
return f"<html><body><h1>Error</h1><p>Could not generate the final summary sheet.</p><p>{str(e)}</p></body></html>" |
|
|
|
|
|
|
|
|
async def analyze_contract(html_content: str) -> dict: |
|
|
""" |
|
|
Analyzes a contract by cleaning, translating, extracting data, and generating a summary. |
|
|
""" |
|
|
messy_document_text = extract_text_from_html(html_content) |
|
|
if not messy_document_text.strip(): |
|
|
return { |
|
|
"error": "Could not extract any meaningful text from the provided HTML content." |
|
|
} |
|
|
|
|
|
logger.info("Stage 1: Pre-cleaning raw text...") |
|
|
cleaned_document_text = await _pre_clean_text_with_gemini(messy_document_text) |
|
|
logger.info("Stage 1: Pre-cleaning complete.") |
|
|
|
|
|
logger.info("Stage 2: Translating text to English with Sea-Lion...") |
|
|
english_document_text = await _translate_text_to_english_with_sealion( |
|
|
cleaned_document_text |
|
|
) |
|
|
logger.info("Stage 2: Translation complete.") |
|
|
|
|
|
|
|
|
prompt = textwrap.dedent( |
|
|
""" |
|
|
You are a meticulous data extraction system specializing in payslips and employment contracts. |
|
|
Your task is to extract specific entities from the provided English text. Follow these rules precisely: |
|
|
|
|
|
**Extraction Rules:** |
|
|
1. **Extract Exact Text:** The `extraction_text` must be the exact text from the document representing the entity's value, without including the label (e.g., for "Basic Pay: $2000", extract "$2000", not the whole phrase). |
|
|
2. **Do Not Overlap:** Entities must not overlap. |
|
|
3. **Be Comprehensive:** Extract all occurrences of each entity type. For example, if there are multiple bonuses or deductions, extract each one as a separate entity. |
|
|
4. **No Inference:** If an entity is not explicitly mentioned, do not extract anything for it. Do not invent information. |
|
|
|
|
|
**Entities to Extract:** |
|
|
- `employer`: The name of the company or employer. |
|
|
- `employee`: The name of the employee. |
|
|
- `pay_period`: The specific date range for the payslip (e.g., "September 1, 2021 to September 30, 2021"). |
|
|
- `salary`: The primary or base salary amount. |
|
|
- `deductions`: Any amount subtracted from the pay. |
|
|
- `bonus`: Any additional payments like bonuses, allowances, or overtime pay. |
|
|
|
|
|
**Attribute Generation:** |
|
|
- For every extraction, you MUST generate a `summary` attribute. |
|
|
- The summary should be a complete, simple English sentence describing the extracted entity. For example: "The employer is ABC PTE LTD." or "The base salary is $2000." |
|
|
""" |
|
|
) |
|
|
examples = [ |
|
|
|
|
|
lx.data.ExampleData( |
|
|
text="Payslip for September 1, 2021 - September 30, 2021. Company: ABC PTE LTD. Staff: Tan Ah Kow. Basic Pay: $2000. Annual Bonus: $2000.", |
|
|
extractions=[ |
|
|
lx.data.Extraction( |
|
|
extraction_class="pay_period", |
|
|
extraction_text="September 1, 2021 - September 30, 2021", |
|
|
attributes={ |
|
|
"summary": "The pay period is from September 1, 2021 to September 30, 2021." |
|
|
}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="employer", |
|
|
extraction_text="ABC PTE LTD", |
|
|
attributes={"summary": "The employer is ABC PTE LTD."}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="employee", |
|
|
extraction_text="Tan Ah Kow", |
|
|
attributes={"summary": "The employee's name is Tan Ah Kow."}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="salary", |
|
|
extraction_text="$2000", |
|
|
attributes={"summary": "The base salary is $2000."}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="bonus", |
|
|
extraction_text="$2000", |
|
|
attributes={"summary": "An annual bonus of $2000 was paid."}, |
|
|
), |
|
|
], |
|
|
), |
|
|
|
|
|
lx.data.ExampleData( |
|
|
text="Employer Name ABC Global Services Period of Pay 01/10/2022 to 31/10/2022 Employee John Doe Earnings Base Salary 3,500.00 Transport Allowance 150.00 Deductions CPF Contribution 700.00", |
|
|
extractions=[ |
|
|
lx.data.Extraction( |
|
|
extraction_class="employer", |
|
|
extraction_text="ABC Global Services", |
|
|
attributes={"summary": "The employer is ABC Global Services."}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="pay_period", |
|
|
extraction_text="01/10/2022 to 31/10/2022", |
|
|
attributes={ |
|
|
"summary": "The pay period is from 01/10/2022 to 31/10/2022." |
|
|
}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="employee", |
|
|
extraction_text="John Doe", |
|
|
attributes={"summary": "The employee's name is John Doe."}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="salary", |
|
|
extraction_text="3,500.00", |
|
|
attributes={"summary": "The base salary is 3,500.00."}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="bonus", |
|
|
extraction_text="150.00", |
|
|
attributes={ |
|
|
"summary": "A transport allowance of 150.00 was provided." |
|
|
}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="deductions", |
|
|
extraction_text="700.00", |
|
|
attributes={"summary": "A CPF deduction of 700.00 was made."}, |
|
|
), |
|
|
], |
|
|
), |
|
|
|
|
|
lx.data.ExampleData( |
|
|
text="Payslip for Jane Smith at Innovate Corp. For the month of November 2023. Salary: 4000 SGD. Deductions include a loan payment of 200 and a charity donation of 50. No bonus was issued.", |
|
|
extractions=[ |
|
|
lx.data.Extraction( |
|
|
extraction_class="employee", |
|
|
extraction_text="Jane Smith", |
|
|
attributes={"summary": "The employee's name is Jane Smith."}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="employer", |
|
|
extraction_text="Innovate Corp", |
|
|
attributes={"summary": "The employer is Innovate Corp."}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="pay_period", |
|
|
extraction_text="November 2023", |
|
|
attributes={ |
|
|
"summary": "The pay period is for the month of November 2023." |
|
|
}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="salary", |
|
|
extraction_text="4000 SGD", |
|
|
attributes={"summary": "The salary is 4000 SGD."}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="deductions", |
|
|
extraction_text="200", |
|
|
attributes={"summary": "A loan payment deduction of 200 was made."}, |
|
|
), |
|
|
lx.data.Extraction( |
|
|
extraction_class="deductions", |
|
|
extraction_text="50", |
|
|
attributes={ |
|
|
"summary": "A charity donation deduction of 50 was made." |
|
|
}, |
|
|
), |
|
|
], |
|
|
), |
|
|
] |
|
|
|
|
|
|
|
|
try: |
|
|
logger.info("Stage 3: Starting structured data extraction from English text...") |
|
|
annotated_document = await asyncio.to_thread( |
|
|
lx.extract, |
|
|
text_or_documents=english_document_text, |
|
|
prompt_description=prompt, |
|
|
examples=examples, |
|
|
model_id="gemini-2.5-flash", |
|
|
) |
|
|
logger.info("Stage 3: Extraction complete.") |
|
|
|
|
|
extracted_data = {} |
|
|
debug_visualization_html = lx.visualize(annotated_document) |
|
|
|
|
|
for extr in annotated_document.extractions: |
|
|
if extr.attributes: |
|
|
class_key = extr.extraction_class.replace(" ", "_") |
|
|
if class_key not in extracted_data: |
|
|
extracted_data[class_key] = [] |
|
|
extracted_data[class_key].append( |
|
|
{ |
|
|
"text": extr.extraction_text, |
|
|
"summary": extr.attributes.get( |
|
|
"summary", "No summary provided." |
|
|
), |
|
|
} |
|
|
) |
|
|
|
|
|
logger.info("Stage 4: Generating final HTML summary sheet...") |
|
|
summary_sheet_html = await _generate_html_summary(extracted_data) |
|
|
logger.info("Stage 4: HTML summary sheet generated.") |
|
|
|
|
|
return { |
|
|
"language": "en", |
|
|
"extracted_data": extracted_data, |
|
|
"summary_sheet_html": summary_sheet_html, |
|
|
"debug_visualization_html": debug_visualization_html, |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"An error occurred during contract analysis: {e}", exc_info=True) |
|
|
return {"error": f"An unexpected error occurred: {str(e)}"} |
|
|
|