QC_Rules / src /core /analysis.py
Jakecole1's picture
Upload 18 files
863cb78 verified
import os
import anthropic
import requests
import streamlit as st
import numpy as np
import json
import re
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from src.extract_text.google_document_api import GoogleDocumentAPI
CLAUDE_API_URL = "https://api.anthropic.com/v1/messages"
class LLM:
def __init__(self):
self.claude_api_key = os.getenv('CLAUDE_API_KEY')
if not self.claude_api_key:
raise ValueError("Please set the CLAUDE_API_KEY environment variable.")
# Configure retry strategy with more comprehensive error handling
retry_strategy = Retry(
total=5, # Increased total retries
backoff_factor=2, # Increased backoff factor for exponential backoff
status_forcelist=[429, 500, 502, 503, 504, 529], # Added 529 for server overload
allowed_methods=["POST"], # Only retry POST requests
respect_retry_after_header=True, # Respect Retry-After headers
)
# Create session with retry strategy
self.session = requests.Session()
self.session.mount("https://", HTTPAdapter(max_retries=retry_strategy))
def call_claude_api(self, prompt, system_prompt, model="claude-sonnet-4-20250514", max_tokens=2000) -> str:
"""
Helper function to call Claude API with consistent parameters and enhanced error handling.
"""
headers = {
"x-api-key": self.claude_api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"
}
payload = {
"model": model,
"max_tokens": max_tokens,
"temperature": 0.1,
"messages": [
{
"role": "user",
"content": prompt
}
],
"system": system_prompt
}
max_retries = 3
for attempt in range(max_retries):
try:
response = self.session.post(
CLAUDE_API_URL,
headers=headers,
json=payload,
verify=True, # Explicitly enable SSL verification
timeout=60 # Increased timeout for better reliability
)
# Handle specific error codes
if response.status_code == 529:
st.warning(f"Server overload (529) on attempt {attempt + 1}/{max_retries}. Retrying with exponential backoff...")
if attempt < max_retries - 1:
import time
time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s, 4s
continue
else:
st.error("Server overload after all retries. Please try again later.")
return ""
response.raise_for_status() # Raise exception for other bad status codes
# Parse response
response_data = response.json()
if "content" in response_data and len(response_data["content"]) > 0:
return response_data["content"][0]["text"]
else:
st.error("Unexpected response format from Claude API")
return ""
except requests.exceptions.SSLError as ssl_err:
st.error(f"SSL Error when calling Claude API. Please check your SSL certificates and network connection. Error: {ssl_err}")
return ""
except requests.exceptions.Timeout as timeout_err:
st.warning(f"Timeout on attempt {attempt + 1}/{max_retries}. Retrying...")
if attempt == max_retries - 1:
st.error("Request timed out after all retries")
return ""
except requests.exceptions.RequestException as e:
st.error(f"Error calling Claude API: {str(e)}")
return ""
except json.JSONDecodeError as json_err:
st.error(f"Invalid JSON response from Claude API: {json_err}")
return ""
return ""
def call_claude_vision_api(self, prompt, system_prompt, image_base64, model="claude-sonnet-4-20250514", max_tokens=2000) -> str:
"""
Helper function to call Claude Vision API with image support and enhanced error handling.
"""
headers = {
"x-api-key": self.claude_api_key,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json"
}
content = [
{
"type": "text",
"text": prompt
},
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/png",
"data": image_base64
}
}
]
payload = {
"model": model,
"max_tokens": max_tokens,
"temperature": 0,
"messages": [
{
"role": "user",
"content": content
}
],
"system": system_prompt
}
max_retries = 3
for attempt in range(max_retries):
try:
response = self.session.post(
CLAUDE_API_URL,
headers=headers,
json=payload,
verify=True, # Explicitly enable SSL verification
timeout=90 # Increased timeout for vision API calls
)
# Handle specific error codes
if response.status_code == 529:
st.warning(f"Server overload (529) on attempt {attempt + 1}/{max_retries}. Retrying with exponential backoff...")
if attempt < max_retries - 1:
import time
time.sleep(2 ** attempt) # Exponential backoff: 1s, 2s, 4s
continue
else:
st.error("Server overload after all retries. Please try again later.")
return ""
response.raise_for_status() # Raise exception for other bad status codes
# Parse response
response_data = response.json()
if "content" in response_data and len(response_data["content"]) > 0:
return response_data["content"][0]["text"]
else:
st.error("Unexpected response format from Claude Vision API")
return ""
except requests.exceptions.SSLError as ssl_err:
st.error(f"SSL Error when calling Claude Vision API. Please check your SSL certificates and network connection. Error: {ssl_err}")
return ""
except requests.exceptions.Timeout as timeout_err:
st.warning(f"Timeout on attempt {attempt + 1}/{max_retries}. Retrying...")
if attempt == max_retries - 1:
st.error("Request timed out after all retries")
return ""
except requests.exceptions.RequestException as e:
st.error(f"Error calling Claude Vision API: {str(e)}")
return ""
except json.JSONDecodeError as json_err:
st.error(f"Invalid JSON response from Claude Vision API: {json_err}")
return ""
return ""
def call_claude_pdf_api(self, prompt, system_prompt, pdf_base64, model="claude-sonnet-4-20250514", max_tokens=4000) -> str:
"""
Helper function to call Claude API with PDF support for requirements documents.
For now, we'll fall back to text-based processing since PDF API requires specific setup.
"""
# For now, we'll use the regular API with text extraction
# In the future, this can be enhanced to use the Converse API with citations
st.info("πŸ“„ PDF requirements detected. Using text-based processing for now.")
st.info("πŸ’‘ For full visual PDF analysis, consider using the Converse API with citations enabled.")
# Extract text from PDF using a simple approach
# In a production environment, you might want to use a more robust PDF text extraction library
try:
import base64
import io
# Try to import PyPDF2
try:
from PyPDF2 import PdfReader
pdf_reader_available = True
except ImportError:
pdf_reader_available = False
st.warning("PyPDF2 not available. Using basic text processing for PDF.")
if pdf_reader_available:
# Decode base64 PDF
pdf_bytes = base64.b64decode(pdf_base64)
pdf_stream = io.BytesIO(pdf_bytes)
# Extract text from PDF
reader = PdfReader(pdf_stream)
text_content = ""
for page in reader.pages:
text_content += page.extract_text() + "\n"
if not text_content.strip():
text_content = "PDF Requirements Document (text extraction limited)"
# Use regular API with extracted text
return self.call_claude_api(prompt, system_prompt, model=model, max_tokens=max_tokens)
else:
# Fallback when PyPDF2 is not available
return self.call_claude_api(prompt, system_prompt, model=model, max_tokens=max_tokens)
except Exception as e:
st.warning(f"PDF text extraction failed: {e}")
st.warning("Falling back to basic text processing")
# Fallback to basic text processing
return self.call_claude_api(prompt, system_prompt, model=model, max_tokens=max_tokens)
class ComplianceAnalysis:
def __init__(self):
self.llm = LLM()
def extract_structured_requirements(self, requirements_data) -> list[dict]:
"""
Use Claude to extract structured requirements from the requirements document.
Args:
requirements_data: Either a string (for text files) or a dict (for PDF files) containing requirements.
Returns:
A list of dictionaries, each containing a requirement ID, description, and category.
"""
# Handle both text and PDF requirements
if isinstance(requirements_data, str):
# Text-based requirements
requirements_text = requirements_data
requirements_type = "text"
elif isinstance(requirements_data, dict):
# PDF-based requirements
requirements_text = requirements_data.get('text_content', '')
requirements_type = requirements_data.get('type', 'text')
pdf_base64 = requirements_data.get('content', '') if requirements_type == 'pdf' else None
else:
st.error("Invalid requirements data format. Please upload a valid requirements document.")
return []
# Check if requirements text is empty or None
if not requirements_text or not requirements_text.strip():
st.error("Requirements text is empty. Please upload a valid requirements document.")
return []
system_prompt = """You are an expert requirements analyst. Extract clear, structured requirements from documents. You must always return valid JSON, even if no specific requirements are found."""
extraction_prompt = f"""
Extract all requirements from this document (not just allergen requirements):
{requirements_text}
For each requirement found, provide:
1. Unique ID (REQ001, REQ002, etc.)
2. Description (verbatim from the document)
3. Category (Font Size, Allergen List, Formatting, Placement, Barcode, Organic, Promotional, etc.)
4. Source reference (section/paragraph or line number)
If no requirements are found, return an empty array: []
Return as JSON array with fields: id, description, category, source_reference.
Example:
```json
[
{{
"id": "REQ001",
"description": "IF the product is labeled as organic, THEN a certified organic seal must be visible",
"category": "Organic",
"source_reference": "Line 1"
}},
{{
"id": "REQ002",
"description": "IF there is a promotional offer mentioned, THEN include the offer expiry date",
"category": "Promotional",
"source_reference": "Line 2"
}}
]
```
IMPORTANT: Always return valid JSON. If you cannot extract any requirements, return an empty array: []
"""
# Use appropriate API based on requirements type
if requirements_type == 'pdf' and pdf_base64:
# Use PDF API for native PDF processing
response = self.llm.call_claude_pdf_api(extraction_prompt, system_prompt, pdf_base64, model='claude-sonnet-4-20250514')
else:
# Use regular API for text processing
response = self.llm.call_claude_api(extraction_prompt, system_prompt, model='claude-3-5-haiku-20241022')
# Extract JSON from the response
try:
# Find JSON content between triple backticks if present
if "```json" in response and "```" in response.split("```json")[1]:
json_content = response.split("```json")[1].split("```")[0].strip()
elif "```" in response:
# Try to find any code block
json_content = response.split("```")[1].split("```")[0].strip()
else:
# Assume the entire response is JSON
json_content = response
# Clean the JSON content to handle control characters
# Remove or replace invalid control characters except newlines and tabs
json_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', json_content)
# Replace newlines within strings with escaped newlines
json_content = re.sub(r'(?<!\\)"(?:[^"\\]|\\.)*?(?<!\\)"', lambda m: m.group(0).replace('\n', '\\n'), json_content)
requirements = json.loads(json_content)
return requirements
except Exception as e:
st.error(f"Error parsing extracted requirements: {e}")
st.error(f"Raw response: {response}")
# Return empty array as fallback
return []
def verify_individual_requirement(self, requirement, markdown_table, image=None, barcode_data=None, metadata=None, requirements_data=None):
"""
Use structured reasoning to verify if a specific requirement is met in the packaging text.
Args:
requirement: A dictionary containing requirement details
markdown_table: The markdown table extracted from the packaging PDF
image: The image of the packaging document (optional)
barcode_data: List of barcode objects with position data (optional)
metadata: Dictionary containing font, font size, and color metadata (optional)
requirements_data: Original requirements data (text or PDF) for context (optional)
Returns:
A dictionary with verification results including reasoning and compliance status
"""
system_prompt = """You are a regulatory compliance expert. Provide detailed, objective compliance reports."""
# Build the prompt for verification
verification_prompt = f"""
You are a regulatory compliance expert. Provide detailed, objective compliance reports.
I need to verify if the following specific requirement is met in the packaging text:
Requirement ID: {requirement['id']}
Requirement Description: {requirement['description']}
Requirement Category: {requirement['category']}
Here is the packaging text to analyze:
{markdown_table}
"""
# Add barcode information if available
if barcode_data:
# Create minimal barcode summary for LLM (save tokens)
barcode_summary = []
for barcode in barcode_data:
barcode_summary.append({
'id': barcode['id'],
'type': barcode['type'],
'data': barcode['data'],
'valid': barcode['valid']
})
verification_prompt += f"""
Barcode Information Found:
{json.dumps(barcode_summary, indent=2)}
When analyzing barcode-related requirements, consider:
- Barcode ID for evidence reference
- Barcode type and validation status
"""
# Add metadata information if available
if metadata and not metadata.get('error'):
# Create metadata summary for LLM (save tokens)
metadata_summary = {
'extraction_method': metadata.get('extraction_method', 'unknown'),
'has_selectable_text': metadata.get('has_selectable_text', False),
'pages_processed': metadata.get('pages_processed', 0),
'dominant_font': metadata.get('fonts', {}),
'dominant_font_size': metadata.get('font_sizes', {}),
'dominant_text_color': metadata.get('text_colors', {})
}
verification_prompt += f"""
Typography and Design Metadata:
{json.dumps(metadata_summary, indent=2)}
When analyzing typography and design requirements, consider:
- Font types and their usage frequency
- Font sizes and their distribution
- Text colors and their application
- Whether text is selectable or requires OCR
"""
verification_prompt += f"""
Verify this requirement using these steps:
1. Break down into checkable criteria
2. Search for evidence in packaging text (provide Text ID)
3. For visual elements not in text, describe clearly (text_id = null)
4. For barcode evidence, use Barcode ID (text_id = null)
5. Provide specific examples/quotes
6. Determine: COMPLIANT/NON-COMPLIANT/PARTIALLY COMPLIANT
- Compliant: All applicable rules are fully met without any deviation.
- Partially Compliant: Some rules are met, but minor issues/omissions that don't constitute a full failure but need attention.
- Non-Compliant: One or more critical rules are violated or omitted, posing a regulatory, safety, or logistical risk.
7. Explain reasoning
For visual evidence, describe:
- Location (e.g., "top right corner", "bottom section")
- Visual characteristics (e.g., "large bold text", "red warning box")
- Content description (e.g., "allergen warning in red box")
If there is barcode evidence, include:
- Barcode ID
- Barcode type and validation status
Return JSON with structure:
```json
{{
"requirement_id": "{requirement['id']}",
"criteria": ["criterion 1", "criterion 2"],
"evidence_found": [
{{"text_id": <Text ID or null>, "evidence_text": "<description>", "barcode_id": "<Barcode ID ONLY if applicable>"}}
],
"compliance_status": "COMPLIANT/NON-COMPLIANT/PARTIALLY COMPLIANT",
"reasoning": "Detailed explanation",
"confidence": 0.95
}}
```
"""
# Use vision API if image is provided, otherwise use regular API
if image:
response = self.llm.call_claude_vision_api(verification_prompt, system_prompt, image)
else:
response = self.llm.call_claude_api(verification_prompt, system_prompt)
# Extract JSON from the response with enhanced error handling
try:
# Check if response is empty or None
if not response or not response.strip():
st.error("Empty response received from Claude API")
return {
"requirement_id": requirement['id'],
"evidence_found": [],
"compliance_status": "ERROR",
"reasoning": "Empty response received from Claude API",
"confidence": 0
}
# Find JSON content between triple backticks if present
if "```json" in response and "```" in response.split("```json")[1]:
json_content = response.split("```json")[1].split("```")[0].strip()
elif "```" in response:
# Try to find any code block
json_content = response.split("```")[1].split("```")[0].strip()
else:
# Assume the entire response is JSON
json_content = response
# Clean the JSON content to handle control characters
# Remove or replace invalid control characters except newlines and tabs
json_content = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]', '', json_content)
# Replace newlines within strings with escaped newlines
json_content = re.sub(r'(?<!\\)"(?:[^"\\]|\\.)*?(?<!\\)"', lambda m: m.group(0).replace('\n', '\\n'), json_content)
# Try to parse JSON with multiple fallback strategies
verification_result = None
# Strategy 1: Direct parsing
try:
verification_result = json.loads(json_content)
except json.JSONDecodeError as e1:
st.warning(f"Initial JSON parsing failed: {e1}")
# Strategy 2: Try to extract JSON from malformed response
try:
# Look for JSON-like structure
json_match = re.search(r'\{.*\}', json_content, re.DOTALL)
if json_match:
potential_json = json_match.group(0)
verification_result = json.loads(potential_json)
st.info("Successfully extracted JSON from malformed response")
except json.JSONDecodeError as e2:
st.warning(f"JSON extraction failed: {e2}")
# Strategy 3: Create a minimal valid JSON structure
try:
# Try to extract key information from the response
compliance_status = "UNKNOWN"
if "COMPLIANT" in response.upper():
compliance_status = "COMPLIANT"
elif "NON-COMPLIANT" in response.upper():
compliance_status = "NON-COMPLIANT"
elif "PARTIALLY" in response.upper():
compliance_status = "PARTIALLY COMPLIANT"
verification_result = {
"requirement_id": requirement['id'],
"criteria": ["Unable to parse criteria"],
"evidence_found": [],
"compliance_status": compliance_status,
"reasoning": f"Response parsing failed. Raw response: {response[:200]}...",
"confidence": 0.1
}
st.warning("Created fallback JSON structure due to parsing errors")
except Exception as e3:
st.error(f"Fallback JSON creation failed: {e3}")
raise e3
if verification_result:
return verification_result
else:
raise Exception("All JSON parsing strategies failed")
except Exception as e:
st.error(f"Error parsing verification result: {e}")
st.error(f"Raw response: {response}")
# Return a failure result
return {
"requirement_id": requirement['id'],
"evidence_found": [],
"compliance_status": "ERROR",
"reasoning": f"Failed to verify requirement due to parsing error: {str(e)}",
"confidence": 0
}
def analyze_compliance(self, requirements_data, packaging_text, packaging_data, image=None, barcode_data=None, metadata=None, model="claude-sonnet-4-20250514"):
"""
Analyze packaging compliance through multi-step process:
1. Extract structured requirements
2. Verify each requirement with structured reasoning
Args:
requirements_data: The requirements data (text string or PDF dict)
packaging_text: Markdown table extracted from the packaging PDF
packaging_data: Structured text with bounding boxes
image: The image of the packaging document
barcode_data: List of barcode objects with position data
metadata: Dictionary containing font, font size, and color metadata
model: The Claude model to use
Returns:
A dictionary containing compliance analysis results
"""
# Step 1: Extract structured requirements
st.info("Extracting structured requirements...")
requirements = self.extract_structured_requirements(requirements_data)
if not requirements:
st.warning("No requirements found in the document. Please check that your requirements file contains valid requirement statements.")
return {"error": "No requirements found", "requirements": [], "verifications": []}
st.success(f"Extracted {len(requirements)} requirements")
# Step 2: Verify each requirement with structured reasoning
st.info("Verifying requirements...")
verifications = []
for i, req in enumerate(requirements):
st.text(f"Verifying requirement {i+1}/{len(requirements)}: {req['id']}")
# Get verification result
verification = self.verify_individual_requirement(req, packaging_text, image, barcode_data, metadata, requirements_data)
verifications.append(verification)
# Step 4: Generate final compliance report
system_prompt = """You are a regulatory compliance expert. Provide detailed, objective compliance reports."""
# Create minimal summary for LLM (save tokens)
compliance_summary = []
for verification in verifications:
compliance_summary.append({
'requirement_id': verification.get('requirement_id', 'Unknown'),
'compliance_status': verification.get('compliance_status', 'UNKNOWN'),
'confidence': verification.get('confidence', 0),
'evidence_count': len(verification.get('evidence_found', []))
})
summary_prompt = f"""
Based on the verification of {len(requirements)} requirements,
please provide a final compliance summary report.
Requirements Summary:
{json.dumps([{'id': req['id'], 'description': req['description'], 'category': req['category']} for req in requirements], indent=2)}
Compliance Results Summary:
{json.dumps(compliance_summary, indent=2)}
Format your response in the following template:
## 🎯 **Analysis Requirements**
Summarize the overall compliance status with focus on:
1. **Quantitative Metrics**: Count of fully compliant, partially compliant, and non-compliant requirements
2. **Critical Issues**: Most urgent compliance gaps requiring immediate attention
3. **Strategic Recommendations**: Actionable steps for the artwork designer to fix the compliance issues
---
## πŸ“‹ **Response Template**
### πŸ” **Executive Summary**
Provide a single, clear statement of overall compliance status
*Example: "Organization achieved 70% compliance (14/20 requirements); moderate risk profile with 3 critical gaps identified."*
---
### πŸ“ˆ **Compliance Statistics**
| **Metric** | **Count** | **Percentage** |
|------------|-----------|----------------|
| **Total Requirements** | `[total]` | `100%` |
| βœ… **Fully Compliant** | `[count]` | `[%]` |
| ⚠️ **Partially Compliant** | `[count]` | `[%]` |
| ❌ **Non-Compliant** | `[count]` | `[%]` |
---
### 🚨 **Priority Findings**
List 3-5 highest-severity issues in order of criticality:
1. **[REQ-ID]** - [Brief description of critical issue]
2. **[REQ-ID]** - [Brief description of high-priority gap]
3. **[REQ-ID]** - [Brief description of moderate-priority concern]
---
### πŸ’‘ **Targeted Recommendations**
For each Priority Finding, provide specific corrective actions:
| **Finding** | **Recommended Action** | **Priority** |
|-------------|------------------------|--------------|
| **[REQ-ID]** | [Specific artwork designer action] | πŸ”΄ **Critical** |
| **[REQ-ID]** | [Specific artwork designer action] | 🟑 **High** |
| **[REQ-ID]** | [Specific artwork designer action] | 🟒 **Medium** |
---
### πŸ“ **Detailed Assessment Results**
*[Provide comprehensive breakdown of each requirement with status and supporting details]*
---
### πŸ“Š **Supporting Evidence**
*[Include relevant data, metrics, or documentation that supports the compliance assessment]*
"""
# Get the final compliance report
compliance_report = self.llm.call_claude_api(summary_prompt, system_prompt, model='claude-3-5-haiku-20241022')
# Compile all results
result = {
"requirements": requirements,
"verifications": verifications,
"compliance_report": compliance_report,
"packaging_data": packaging_data,
"barcode_data": barcode_data,
"metadata": metadata
}
return result