aaa / app.py
Ashar086's picture
Rename src/lawgenie/app.py to app.py
5819611 verified
import json
import os
import re
import regex
import requests
from dotenv import load_dotenv
from flask import Flask, jsonify, request
from together import Together
from werkzeug.utils import secure_filename
from lawgenie.crew import get_agent_output
app = Flask(__name__)
UPLOAD_FOLDER = "uploads"
ALLOWED_EXTENSIONS = {"pdf", "docx"}
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
load_dotenv()
TOGETHER_API_KEY = os.environ.get("TOGETHER_API_KEY")
print("TOGETHER_API_KEY: ", TOGETHER_API_KEY)
client = Together(api_key=TOGETHER_API_KEY)
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
def inspect_and_serialize(obj):
"""
Inspect the object and return a JSON-serializable version of it.
"""
if isinstance(obj, (str, int, float, bool, type(None))):
return obj
elif isinstance(obj, list):
return [inspect_and_serialize(item) for item in obj]
elif isinstance(obj, dict):
return {key: inspect_and_serialize(value) for key, value in obj.items()}
elif hasattr(obj, "__dict__"):
return inspect_and_serialize(obj.__dict__)
else:
return str(obj)
def debug_crew_output(crew_output):
print("Type of crew_output:", type(crew_output))
print("Content of crew_output:")
print(json.dumps(inspect_and_serialize(crew_output), indent=2))
def parse_combined_output(combined_output):
sections = {}
current_section = None
current_summary = ""
current_full_text = ""
lines = combined_output.splitlines()
for line in lines:
line = line.strip()
if line.startswith("Section Name:"):
if current_section:
sections[current_section] = {
"summary": current_summary.strip(),
"full_text": current_full_text.strip(),
}
current_section = line[len("Section Name:") :].strip()
current_summary = ""
current_full_text = ""
elif line.startswith("Summary:"):
current_summary = line[len("Summary:") :].strip()
elif line.startswith("Full Text:"):
current_full_text = line[len("Full Text:") :].strip()
elif current_section:
current_full_text += line + " "
if current_section:
sections[current_section] = {
"summary": current_summary.strip(),
"full_text": current_full_text.strip(),
}
return sections
def extract_json(text):
match = regex.search(r"\{(?:[^{}]|(?R))*\}", text, regex.DOTALL)
if match:
return match.group(0)
else:
raise ValueError("No JSON object found in the response.")
def call_llama_via_together_ai(prompt):
response = client.chat.completions.create(
model="meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo",
messages=[{"role": "user", "content": prompt}],
max_tokens=2048,
temperature=0.0,
top_p=0.7,
top_k=50,
repetition_penalty=1,
stop=["<|eot_id|>", "<|eom_id|>"],
stream=False,
)
return response.choices[0].message.content
def segment_contract(contract_text):
print("In segment")
print(f"Contract text length: {len(contract_text)}")
chunk_size = 16000
chunks = [
contract_text[i : i + chunk_size]
for i in range(0, len(contract_text), chunk_size)
]
print(f"Number of chunks: {len(chunks)}")
combined_output = ""
for idx, chunk in enumerate(chunks):
prompt = f"""
Analyze the following part {idx+1}/{len(chunks)} of a Non-Disclosure Agreement (NDA) and segment it into key sections.
Focus on identifying these common NDA sections:
1. Parties
2. Definition of Confidential Information
3. Obligations of Receiving Party
4. Exclusions from Confidential Information
5. Term and Termination
6. Return of Confidential Information
7. Remedies
For each identified section, provide:
- Section Name: [Name of the section]
- Summary: [A brief summary of what the section covers (1-2 sentences)]
- Full Text: [The full text of the section. Do not skip any text within each section.]
If a section is not present in this part, ignore it.
If you find additional important sections not listed above, include them as well.
Output Format:
For each section, output in the following format:
Section Name: [Name] Summary: [Summary] Full Text: [Full text]
Do not include any additional text outside this format.
NDA text part {idx+1}/{len(chunks)}:
{chunk}
"""
try:
print(f"Processing chunk {idx+1}/{len(chunks)}")
response_content = call_llama_via_together_ai(prompt)
print(f"Received response for chunk {idx+1}")
combined_output += response_content + "\n"
except Exception as e:
print(f"An error occurred while processing chunk {idx+1}: {e}")
continue
print("Segmentation complete.")
return combined_output
def parse_contract(file_path):
api_key = os.getenv("UPSTAGE_API_KEY")
if not api_key:
raise Exception("API key is missing")
print(f"API Key: {api_key}")
url = "https://api.upstage.ai/v1/document-ai/document-parse"
headers = {"Authorization": f"Bearer {api_key}"}
try:
with open(file_path, "rb") as file:
files = {"document": file}
data = {"ocr": "auto", "coordinates": "false", "output_formats": "['text']"}
print("Sending request to Document Parse API...")
response = requests.post(url, headers=headers, files=files, data=data)
if response.status_code == 200:
result = response.json()
print(f"API Response: {json.dumps(result, indent=2)}")
contract_text = ""
if "content" in result and "text" in result["content"]:
contract_text = result["content"]["text"]
else:
print("Warning: 'content' or 'text' not found in API response")
for page in result.get("pages", []):
for element in page.get("elements", []):
if element.get("category") == "text":
contract_text += element.get("text", "") + "\n"
print(f"Extracted text length: {len(contract_text)}")
print(f"First 500 characters of extracted text: {contract_text[:500]}")
if len(contract_text) == 0:
print("Warning: No text extracted from the document")
print("API Response structure:")
print(json.dumps(result, indent=2))
return contract_text
else:
raise Exception(
f"Error in Document Parse API: {response.status_code}, {response.text}"
)
except Exception as e:
print(f"An error occurred: {e}")
raise
def segment_clauses(text):
return [
clause.strip() for clause in re.split(r"\n\n|\r\n\r\n", text) if clause.strip()
]
def generate_recommendation(clause, analysis):
recommendations = []
if "payment" in analysis:
recommendations.append(
"Ensure payment terms are clearly defined and favorable."
)
if "deadline" in analysis:
recommendations.append(
"Review deadlines to ensure they are realistic and include buffer time."
)
if "confidentiality" in analysis:
recommendations.append(
"Verify that confidentiality clauses protect your interests adequately."
)
if "termination" in analysis:
recommendations.append(
"Check termination conditions and ensure they are fair to both parties."
)
return (
recommendations
if recommendations
else ["No specific recommendations. The clause appears standard."]
)
@app.route("/analyze", methods=["POST"])
def analyze_contract():
data = request.json
contract_text = data.get("text", "")
clauses = segment_clauses(contract_text)
analysis = [
{"clause": clause, "analysis": analyze_clause(clause)} # noqa: F821
for clause in clauses
]
return jsonify({"analysis": analysis})
@app.route("/upload", methods=["POST"])
def upload_file():
if "file" not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files["file"]
if file.filename == "":
return jsonify({"error": "No selected file"}), 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename)
file.save(file_path)
print(f"File saved: {file_path}")
print(f"File size: {os.path.getsize(file_path)} bytes")
try:
print("Parsing contract...")
contract_text = parse_contract(file_path)
print(f"Parsed contract text length: {len(contract_text)}")
print("Contract parsed. Starting segmentation...")
combined_output = segment_contract(contract_text)
print("Parsing combined output...")
segmented_contract = parse_combined_output(combined_output)
print("Segmentation complete.")
crew_output = get_agent_output(segmented_contract)
debug_crew_output(crew_output)
response_data = {
"message": "File uploaded and processed successfully",
"segmented_contract": segmented_contract,
"crew_analysis": inspect_and_serialize(crew_output),
}
print("Response Data:", response_data)
return jsonify(response_data)
except Exception as e:
print(f"An error occurred: {str(e)}")
import traceback
traceback.print_exc()
return jsonify({"error": str(e)}), 500
return jsonify({"error": "File type not allowed"}), 400
@app.route("/recommend", methods=["POST"])
def recommend():
data = request.json
clause = data.get("clause", "")
analysis = data.get("analysis", [])
recommendations = generate_recommendation(clause, " ".join(analysis))
return jsonify({"recommendations": recommendations})
if __name__ == "__main__":
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
app.run(host="0.0.0.0", debug=True, port=5002)