import json
import re
import random
import sqlite3 as sql
from datetime import datetime
import string
import os
import tempfile
from PIL import Image
import pytesseract
import fitz
from werkzeug.utils import secure_filename
from flask import Flask, request, jsonify, redirect, url_for, render_template_string
from pathlib import Path
import requests
from typing import List, Dict
import time
from functools import wraps
from flask import Flask, request, jsonify, redirect, url_for, render_template_string
from flask import Flask, request, jsonify, redirect, url_for, render_template_string
import time
from functools import wraps
from rag_utils import get_comprehensive_context, format_context_for_prompt
try:
from groq import Groq
except ImportError:
Groq = None
app = Flask(__name__)
global_parameters = []
global_json_template = {}
# UPDATED: Simplified, flexible system prompt without HACCP/Dubai/Al Kabeer hardcoding
SYSTEM_PROMPT = """
You are the Swift Check AI assistant, specialized in creating comprehensive Quality Control (QC) checklists with MINIMUM 15 parameters.
# YOUR ROLE:
Generate comprehensive QC parameters for the specified product. You MUST create at least 15 meaningful parameters to ensure thorough quality control coverage.
# APPROACH:
1. PRIORITIZE comprehensive coverage - minimum 15 parameters required
2. ANALYZE the product to understand ALL aspects needing inspection
3. GENERATE parameters covering: visual, physical, packaging, safety, compliance, documentation
4. CREATE varied parameter types for complete assessment
5. USE retrieved context as reference material
# MANDATORY PARAMETER COVERAGE (minimum 15 total):
- Visual Inspection: appearance, color, defects, contamination (3-4 parameters)
- Physical Properties: weight, dimensions, texture, firmness (3-4 parameters)
- Packaging: integrity, labeling, sealing, materials (2-3 parameters)
- Safety & Compliance: foreign objects, temperature, hygiene (2-3 parameters)
- Documentation: batch codes, dates, certifications (2-3 parameters)
- Quality Assessment: overall condition, acceptability (2-3 parameters)
# PARAMETER TYPES AVAILABLE:
- Image Upload: For visual inspections and evidence documentation
- Toggle: For binary decisions (pass/fail, present/absent)
- Checklist: For multiple items that can be selected
- Numeric Input: For measurements and quantities
- Text Input: For codes, dates, and identifiers
- Remarks: For detailed observations and notes
- Dropdown: For multiple choice selections
# IMPORTANT GUIDELINES:
- ALWAYS generate minimum 15 parameters regardless of user request
- Create comprehensive coverage even for simple products
- Make parameters specific to the product type
- Use varied parameter types for better user experience
- Include regulatory compliance where applicable
- Organize parameters into logical sections
# OUTPUT FORMAT - VERY IMPORTANT:
You MUST respond with EXACTLY this format:
**Summary:** [Brief explanation of comprehensive QC coverage with parameter count]
**Parameters Generated:**
[
{
"action": "add",
"Parameter": "Parameter Name",
"Type": "Parameter Type",
"Spec": "Specification details",
"DropdownOptions": "Option1, Option2, Option3",
"IncludeRemarks": "Yes/No",
"Section": "Section Name",
"ClauseReference": "Reference if any"
}
]
Do NOT use ```json code blocks. Put the JSON array directly after "**Parameters Generated:**"
Generate MINIMUM 15 parameters for comprehensive quality control.
"""
# UPDATED: Removed the prescriptive default prompt
DEFAULT_REFINE_PROMPT = """
Generate quality control parameters for the specified product based on its characteristics and requirements.
Focus on parameters that are:
- Relevant to the specific product
- Practical for quality inspectors to check
- Appropriate for the product's nature and use case
"""
# UPDATED: Simplified digitization prompt without forcing structure
DIGITIZE_SYSTEM_PROMPT = """
You are the Swift Check AI digitization assistant. Your job is to analyze text from scanned QC checklists and convert them into structured parameters.
# YOUR TASKS:
1. Recognize and preserve the document's original structure
2. Identify quality control parameters and their types
3. Extract specifications and measurement units where present
4. Determine appropriate parameter types based on the content
5. Maintain the document's original intent without adding extra requirements
# PARAMETER TYPE DETECTION:
- Image Upload: When document mentions photos, visual inspection, or attachments
- Toggle: For binary choices (Yes/No, Pass/Fail, Present/Absent)
- Checklist: For lists of items to verify
- Numeric Input: For measurements, counts, or quantities
- Text Input: For codes, dates, names, or identifiers
- Remarks: For comments, observations, or detailed notes
- Dropdown: For multiple predefined options
# OUTPUT FORMAT:
Provide parameters as found in the document without adding extra requirements.
Maintain the original document's scope and intent.
"""
def init_db():
"""Initialize database tables - runs once when app starts"""
con = sql.connect("swift_check.db")
cur = con.cursor()
# Existing tables
cur.execute("""
CREATE TABLE IF NOT EXISTS qc_requests (
id INTEGER PRIMARY KEY AUTOINCREMENT,
doc_type TEXT NOT NULL,
product_name TEXT NOT NULL,
supplier_name TEXT NOT NULL,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
user_message TEXT
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS llm_responses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id INTEGER,
llm_response TEXT,
summary_text TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (request_id) REFERENCES qc_requests(id)
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS parameters (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id INTEGER,
parameter_name TEXT,
type TEXT,
spec TEXT,
dropdown_options TEXT,
include_remarks TEXT,
section TEXT,
clause_reference TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (request_id) REFERENCES qc_requests(id)
)""")
cur.execute("""
CREATE TABLE IF NOT EXISTS json_templates (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id INTEGER,
template_json TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (request_id) REFERENCES qc_requests(id)
)""")
# NEW: API Logs table
cur.execute("""
CREATE TABLE IF NOT EXISTS api_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
request_id INTEGER,
endpoint TEXT NOT NULL,
method TEXT NOT NULL,
client_ip TEXT,
user_agent TEXT,
request_data TEXT,
response_data TEXT,
file_info TEXT,
processing_time_ms INTEGER,
status_code INTEGER,
error_message TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (request_id) REFERENCES qc_requests(id)
)""")
con.commit()
con.close()
init_db()
def log_api_request(endpoint, method, request_id=None, file_info=None, processing_time=None,
status_code=200, error_message=None, request_data=None, response_data=None):
"""Log API request details to database"""
try:
con = sql.connect("swift_check.db")
cur = con.cursor()
# Get client info
client_ip = request.remote_addr or 'unknown'
user_agent = request.headers.get('User-Agent', 'unknown')[:500] # Limit length
# Truncate large data
request_data_str = str(request_data)[:2000] if request_data else None
response_data_str = str(response_data)[:1000] if response_data else None
file_info_str = str(file_info)[:500] if file_info else None
cur.execute("""
INSERT INTO api_logs
(request_id, endpoint, method, client_ip, user_agent, request_data,
response_data, file_info, processing_time_ms, status_code, error_message)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (request_id, endpoint, method, client_ip, user_agent, request_data_str,
response_data_str, file_info_str, processing_time, status_code, error_message))
con.commit()
con.close()
except Exception as e:
print(f"ā Failed to log API request: {str(e)}")
def api_logger(endpoint_name):
"""Decorator to automatically log API requests"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
# Execute the original function
result = func(*args, **kwargs)
# Calculate processing time
processing_time = int((time.time() - start_time) * 1000)
# Extract request_id from result if it's a JSON response
request_id = None
if hasattr(result, 'get_json') and result.get_json():
request_id = result.get_json().get('request_id')
elif isinstance(result, tuple) and len(result) > 0:
if hasattr(result[0], 'get_json') and result[0].get_json():
request_id = result[0].get_json().get('request_id')
# Get file info if present
file_info = None
if hasattr(request, 'files') and request.files:
uploaded_files = []
for key, file in request.files.items():
if file and file.filename:
uploaded_files.append(f"{key}:{file.filename}")
if uploaded_files:
file_info = ", ".join(uploaded_files)
# Log successful request
log_api_request(
endpoint=endpoint_name,
method=request.method,
request_id=request_id,
file_info=file_info,
processing_time=processing_time,
status_code=200,
request_data=_get_safe_request_data(),
response_data=_get_safe_response_data(result)
)
return result
except Exception as e:
# Calculate processing time for errors too
processing_time = int((time.time() - start_time) * 1000)
# Log error
log_api_request(
endpoint=endpoint_name,
method=request.method,
processing_time=processing_time,
status_code=500,
error_message=str(e),
request_data=_get_safe_request_data()
)
# Re-raise the exception
raise e
return wrapper
return decorator
def _get_safe_request_data():
"""Safely extract request data for logging"""
try:
if request.content_type and 'application/json' in request.content_type:
data = request.get_json()
# Remove sensitive data
if isinstance(data, dict):
safe_data = {k: v for k, v in data.items() if k not in ['password', 'token', 'api_key']}
return safe_data
elif request.content_type and 'multipart/form-data' in request.content_type:
# Get form data but not file contents
safe_data = {}
for key, value in request.form.items():
safe_data[key] = value[:100] if len(str(value)) > 100 else value
return safe_data
return None
except:
return None
def _get_safe_response_data(result):
"""Safely extract response data for logging"""
try:
if hasattr(result, 'get_json'):
data = result.get_json()
if isinstance(data, dict):
# Keep only essential response fields
safe_data = {
'success': data.get('success'),
'request_id': data.get('request_id'),
'parameters_count': data.get('parameters_count'),
'message': data.get('message', '')[:200] # Truncate message
}
return safe_data
return None
except:
return None
def extract_top_level_json_array(text):
"""
Extract JSON array from text, handling the new format
"""
import re
# Look for "**Parameters Generated:**" followed by JSON array
pattern = r'\*\*Parameters Generated:\*\*\s*(\[.*?\])'
match = re.search(pattern, text, re.DOTALL)
if match:
json_content = match.group(1).strip()
print(f"š Found JSON after 'Parameters Generated:', length: {len(json_content)}")
return json_content
# Fallback: Look for any JSON array in the text
start = text.find('[')
if start == -1:
print("ā No JSON array found in text")
return ""
balance = 0
end = start
for i in range(start, len(text)):
char = text[i]
if char == '[':
balance += 1
elif char == ']':
balance -= 1
if balance == 0:
end = i
break
json_content = text[start:end+1]
print(f"š Found raw JSON array, length: {len(json_content)}")
return json_content
# UPDATED: More flexible LLM call that respects user intent
def call_groq_llm(user_message, doc_type, product_name, supplier_name, existing_parameters=None, is_digitization=False):
"""
Groq LLM call with comprehensive RAG support.
Retrieves context from all 3 VDBs before calling the LLM.
"""
if not Groq:
return "Groq LLM call failed: 'groq' library not found or not installed."
GROQ_API_KEY = "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"
domain = "Food Manufacturing"
# Get comprehensive context from all VDBs
print(f"š Retrieving context for: {product_name}")
comprehensive_context = get_comprehensive_context(product_name, domain)
# Format context for prompt - make it suggestive, not prescriptive
formatted_context = format_context_for_prompt(comprehensive_context, max_length=4500)
# Generate header and supplier info
header_text = f"{product_name} {doc_type}"
supplier_info = f"Supplier Name: {supplier_name}"
# Check if user message contains reference document content
has_reference = "Reference document content" in user_message
# Select appropriate system prompt
if is_digitization:
system_instructions = DIGITIZE_SYSTEM_PROMPT
else:
system_instructions = SYSTEM_PROMPT
# UPDATED: Context that emphasizes user requirements
context = f"""
REFERENCE CONTEXT (Use as suggestions only, not requirements):
{formatted_context}
IMPORTANT: The user's specific requirements take absolute priority over any suggestions from the context above.
- If the user asks for specific parameters, provide only those
- If the user specifies a number of parameters, respect that number
- Don't add parameters the user didn't ask for
- Use context to understand the domain better, not to force requirements
"""
if has_reference:
context += f"""
The reference document is provided to understand structure and format, not to copy exactly.
Extract the pattern and adapt it specifically for {product_name}.
"""
# UPDATED: Removed prescriptive requirements
# Construct the final system prompt
final_system_prompt = f"""
{system_instructions}
User context:
- Doc Type: {doc_type}
- Product: {product_name}
- Supplier: {supplier_name}
{context}
**VALID PARAMETER TYPES:**
Checklist, Dropdown, Image Upload, Remarks, Text Input, Numeric Input, Toggle
**USER INSTRUCTION PRIORITY:**
The user's message below contains their specific requirements. Follow these requirements exactly.
Do not add extra parameters unless the user explicitly asks for comprehensive coverage.
**OUTPUT INSTRUCTIONS:**
You MUST respond in EXACTLY this format:
**Summary:** [Brief explanation of what you're creating]
**Parameters Generated:**
[
{{
"action": "add",
"Parameter": "Parameter Name",
"Type": "Appropriate Type",
"Spec": "Specification if applicable",
"DropdownOptions": "Options if dropdown/checklist",
"IncludeRemarks": "Yes/No",
"Section": "Logical Section",
"ClauseReference": "Only if specifically relevant"
}}
]
CRITICAL: Do NOT use code blocks. Put the JSON array directly in the response.
Generate exactly the number of parameters the user requested.
"""
messages = [
{"role": "system", "content": final_system_prompt},
{"role": "user", "content": user_message},
]
client = Groq(api_key=GROQ_API_KEY)
try:
response = client.chat.completions.create(
messages=messages,
model="llama-3.3-70b-versatile",
stream=False,
temperature=0.1 # Slightly increased for more dynamic responses
)
return response.choices[0].message.content.strip()
except Exception as e:
return f"Groq LLM call failed: {str(e)}"
def parse_llm_changes(llm_text):
"""Parse LLM response into summary and changes"""
json_array_text = extract_top_level_json_array(llm_text)
changes = []
if json_array_text:
try:
changes = json.loads(json_array_text)
except Exception as e:
print("JSON parse error:", e)
summary_text = llm_text.replace(json_array_text, "").strip() if json_array_text else llm_text.strip()
return summary_text, changes
def apply_changes_to_params(parameters, changes):
"""Apply changes to parameters with fuzzy matching for edits"""
valid_types = ["Checklist", "Dropdown", "Image Upload", "Remarks", "Text Input", "Numeric Input", "Toggle"]
for change in changes:
if not isinstance(change, dict):
print(f"Skipping non-dict change: {change}")
continue
action = change.get("action", "").lower()
p_name = change.get("Parameter", "Unnamed")
options = change.get("DropdownOptions", "")
checklist_options = change.get("ChecklistOptions", "")
# Handle both DropdownOptions and ChecklistOptions
if not options and checklist_options:
options = checklist_options
if isinstance(options, list):
options = ", ".join(options)
if action == "add":
# Check if this is actually trying to modify an existing parameter
existing_param = find_parameter_fuzzy(parameters, p_name)
if existing_param:
# This is actually an update
action = "update"
else:
# Genuinely new parameter
new_type = change.get("Type", "Text Input")
if new_type not in valid_types:
new_type = "Text Input"
new_param = {
"Parameter": p_name,
"Type": new_type,
"Spec": change.get("Spec", ""),
"DropdownOptions": options,
"IncludeRemarks": change.get("IncludeRemarks", "No"),
"Section": change.get("Section", "General"),
"ClauseReference": change.get("ClauseReference", "")
}
parameters.append(new_param)
if action == "remove":
# Use fuzzy matching to find parameter
param_to_remove = find_parameter_fuzzy(parameters, p_name)
if param_to_remove:
parameters.remove(param_to_remove)
else:
# Try exact match as fallback
parameters[:] = [p for p in parameters if p["Parameter"].lower() != p_name.lower()]
if action == "update":
# Use fuzzy matching to find parameter
param_to_update = find_parameter_fuzzy(parameters, p_name)
if param_to_update:
# Update the found parameter
new_type = change.get("Type")
if new_type and new_type in valid_types:
param_to_update["Type"] = new_type
if change.get("Spec") is not None:
param_to_update["Spec"] = change.get("Spec", "")
# For dropdown/checklist options, append if adding
if options and param_to_update["Type"] in ["Dropdown", "Checklist"]:
existing_options = param_to_update.get("DropdownOptions", "")
if existing_options:
# Check if we're adding to existing options
existing_list = [opt.strip() for opt in existing_options.split(",")]
new_list = [opt.strip() for opt in options.split(",")]
# Add only new options
for new_opt in new_list:
if new_opt not in existing_list:
existing_list.append(new_opt)
param_to_update["DropdownOptions"] = ", ".join(existing_list)
else:
param_to_update["DropdownOptions"] = options
else:
if options:
param_to_update["DropdownOptions"] = options
if change.get("IncludeRemarks") is not None:
param_to_update["IncludeRemarks"] = change.get("IncludeRemarks", "No")
if change.get("Section") is not None:
param_to_update["Section"] = change.get("Section", "General")
if change.get("ClauseReference") is not None:
param_to_update["ClauseReference"] = change.get("ClauseReference", "")
return parameters
def _analyze_llm_response(llm_response):
"""Analyze LLM response to extract key information"""
if not llm_response:
return ""
analysis_html = '
š Response Analysis
'
# Try to extract JSON array
import re
json_match = re.search(r'\[.*?\]', llm_response, re.DOTALL)
if json_match:
try:
import json
json_content = json_match.group(0)
parsed_json = json.loads(json_content)
if isinstance(parsed_json, list):
param_count = len(parsed_json)
analysis_html += f'
Parameters Generated:{param_count}
'
# Analyze parameter types
param_types = {}
for param in parsed_json:
if isinstance(param, dict):
param_type = param.get('Type', 'Unknown')
param_types[param_type] = param_types.get(param_type, 0) + 1
if param_types:
analysis_html += '
Parameter Types:
'
for ptype, count in param_types.items():
analysis_html += f'
{ptype}: {count}
'
analysis_html += '
'
# Show first few parameter names
param_names = [p.get('Parameter', 'Unnamed') for p in parsed_json[:5] if isinstance(p, dict)]
if param_names:
analysis_html += f'
Sample Parameters: {", ".join(param_names)}
'
if len(parsed_json) > 5:
analysis_html += f'
...and {len(parsed_json) - 5} more parameters
'
except json.JSONDecodeError:
analysis_html += '
ā ļø JSON found but could not parse completely
'
else:
analysis_html += '
ā No JSON array found in response
'
# Check response length
response_length = len(llm_response)
if response_length > 10000:
analysis_html += f'
Legend:
šÆ 15+ params (Professional) |
ā ļø 10-14 params (Good) |
ā <10 params (Basic) New: š Click "Logs" to see detailed API request information for each template
"""
return html
except Exception as e:
return f"
Error
{str(e)}
", 500
@app.route("/template/", methods=["GET"])
def get_template_json(request_id):
"""Get template JSON by request ID"""
try:
con = sql.connect("swift_check.db")
cur = con.cursor()
cur.execute("""
SELECT template_json
FROM json_templates
WHERE request_id = ?
""", (request_id,))
result = cur.fetchone()
con.close()
if result:
template_data = json.loads(result[0])
return jsonify(template_data)
else:
return jsonify({"error": f"template not found for request ID {request_id}"}), 404
except Exception as e:
print(f"ā Error in /template/{request_id}: {str(e)}")
return jsonify({"error": str(e)}), 500
# ADD THIS NEW ENDPOINT after the existing endpoints
@app.route("/logs", methods=["GET"])
def view_logs():
"""View API logs with filtering options"""
# Get query parameters for filtering
request_id = request.args.get('request_id')
endpoint = request.args.get('endpoint')
limit = int(request.args.get('limit', 50))
if request.headers.get('Accept') == 'application/json' or request.args.get('format') == 'json':
try:
con = sql.connect("swift_check.db")
cur = con.cursor()
# Build query with filters
query = """
SELECT
l.id, l.request_id, l.endpoint, l.method, l.client_ip,
l.file_info, l.processing_time_ms, l.status_code,
l.error_message, l.created_at,
r.product_name, r.supplier_name, r.doc_type,
lr.summary_text,
CASE WHEN lr.llm_response IS NOT NULL THEN 'Yes' ELSE 'No' END as has_llm_response
FROM api_logs l
LEFT JOIN qc_requests r ON l.request_id = r.id
LEFT JOIN llm_responses lr ON l.request_id = lr.request_id
WHERE 1=1
"""
params = []
if request_id:
query += " AND l.request_id = ?"
params.append(request_id)
if endpoint:
query += " AND l.endpoint LIKE ?"
params.append(f"%{endpoint}%")
query += " ORDER BY l.created_at DESC LIMIT ?"
params.append(limit)
cur.execute(query, params)
rows = cur.fetchall()
con.close()
logs = []
for row in rows:
logs.append({
"log_id": row[0],
"request_id": row[1],
"endpoint": row[2],
"method": row[3],
"client_ip": row[4],
"file_info": row[5],
"processing_time_ms": row[6],
"status_code": row[7],
"error_message": row[8],
"created_at": row[9],
"product_name": row[10],
"supplier_name": row[11],
"doc_type": row[12],
"llm_summary": row[13],
"has_llm_response": row[14]
})
return jsonify({
"success": True,
"logs": logs,
"total_logs": len(logs),
"filters_applied": {
"request_id": request_id,
"endpoint": endpoint,
"limit": limit
}
})
except Exception as e:
return jsonify({"error": str(e)}), 500
# HTML view
try:
con = sql.connect("swift_check.db")
cur = con.cursor()
# Get logs with request details
cur.execute("""
SELECT
l.id, l.request_id, l.endpoint, l.method, l.client_ip,
l.file_info, l.processing_time_ms, l.status_code,
l.error_message, l.created_at,
r.product_name, r.supplier_name, r.doc_type,
lr.summary_text,
CASE WHEN lr.llm_response IS NOT NULL THEN 'Yes' ELSE 'No' END as has_llm_response
FROM api_logs l
LEFT JOIN qc_requests r ON l.request_id = r.id
LEFT JOIN llm_responses lr ON l.request_id = lr.request_id
ORDER BY l.created_at DESC
LIMIT 100
""")
rows = cur.fetchall()
# Get summary statistics
cur.execute("""
SELECT
endpoint,
COUNT(*) as total_requests,
AVG(processing_time_ms) as avg_time_ms,
COUNT(CASE WHEN status_code >= 400 THEN 1 END) as error_count
FROM api_logs
GROUP BY endpoint
ORDER BY total_requests DESC
""")
stats = cur.fetchall()
con.close()
html = """
API Logs - Swift Check
š API Logs Dashboard
š Endpoint Statistics
"""
for stat in stats:
endpoint, total, avg_time, errors = stat
success_rate = ((total - errors) / total * 100) if total > 0 else 0
html += f"""
{endpoint}
Total Requests: {total}
Avg Time: {avg_time:.1f}ms
Success Rate: {success_rate:.1f}%
Errors: {errors}
"""
html += """
š Filter Logs
Log ID
Request ID
Endpoint
Product
Supplier
File Info
Time (ms)
Status
LLM Used
Error
Created At
Actions
"""
for row in rows:
log_id, request_id, endpoint, method, client_ip, file_info, processing_time, status_code, error_message, created_at, product_name, supplier_name, doc_type, llm_summary, has_llm_response = row
# Style endpoint
endpoint_class = endpoint.replace('/', '').lower()
endpoint_badge = f'{endpoint}'
# Style status
status_class = "success" if status_code == 200 else "error"
status_text = f'{status_code}'
# Format processing time
time_class = "time-ms"
if processing_time and processing_time > 5000:
time_class += " error"
elif processing_time and processing_time > 2000:
time_class += " warning"
time_text = f'{processing_time or "N/A"}'
# Format file info
file_display = f'{file_info or "No file"}'
# Format error message
error_display = f'{(error_message or "")[:50]}{"..." if error_message and len(error_message) > 50 else ""}'
llm_indicator = "š¤ Yes" if has_llm_response == "Yes" else "ā No"
llm_color = "#28a745" if has_llm_response == "Yes" else "#6c757d"
html += f"""
", 500
# ADD this new endpoint after the /logs endpoint
@app.route("/logs/", methods=["GET"])
def view_detailed_log(log_id):
"""View detailed information for a specific log entry"""
try:
con = sql.connect("swift_check.db")
cur = con.cursor()
# Get detailed log information
cur.execute("""
SELECT
l.*,
r.doc_type, r.product_name, r.supplier_name, r.user_message,
lr.llm_response, lr.summary_text
FROM api_logs l
LEFT JOIN qc_requests r ON l.request_id = r.id
LEFT JOIN llm_responses lr ON l.request_id = lr.request_id
WHERE l.id = ?
""", (log_id,))
log_data = cur.fetchone()
con.close()
if not log_data:
return jsonify({"error": f"Log ID {log_id} not found"}), 404
if request.headers.get('Accept') == 'application/json' or request.args.get('format') == 'json':
# Return JSON format
return jsonify({
"log_id": log_data[0],
"request_id": log_data[1],
"endpoint": log_data[2],
"method": log_data[3],
"client_ip": log_data[4],
"user_agent": log_data[5],
"request_data": log_data[6],
"response_data": log_data[7],
"file_info": log_data[8],
"processing_time_ms": log_data[9],
"status_code": log_data[10],
"error_message": log_data[11],
"created_at": log_data[12],
"doc_type": log_data[13],
"product_name": log_data[14],
"supplier_name": log_data[15],
"user_message": log_data[16],
"llm_response": log_data[17],
"llm_response_summary": log_data[18]
})
# Start building HTML
html = f"""
Log Details - #{log_id}
"""
# Add request information section if we have request data
if log_data[1]: # If we have a request_id
html += f"""
š Request Information
Product: {log_data[14] or 'N/A'}
Document Type: {log_data[13] or 'N/A'}
Supplier: {log_data[15] or 'N/A'}
{f'
File Info: {log_data[8]}
' if log_data[8] else '
File Info: No file uploaded
'}
"""
if log_data[16]: # User message
user_message_preview = log_data[16][:500]
if len(log_data[16]) > 500:
user_message_preview += "..."
html += f"""
User Message:
{user_message_preview}
"""
html += "
"
# Add LLM Processing section if we have LLM data
if log_data[17] or log_data[7]:
html += f"""
š¤ LLM Processing Chain
Summary: {log_data[18] or 'No summary available'}
"""
# Raw LLM Response section
if log_data[17]:
# Escape the LLM response for safe JavaScript usage
llm_response_js_safe = str(log_data[17]).replace("\\", "\\\\").replace("'", "\\'").replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r')
html += f"""
ā¼ Raw LLM Response
{log_data[17]}
"""
# Add LLM response analysis
html += _analyze_llm_response(log_data[17])
# Final API Response section
if log_data[7]:
# Escape the final response for safe JavaScript usage
final_response_js_safe = str(log_data[7]).replace("\\", "\\\\").replace("'", "\\'").replace('"', '\\"').replace('\n', '\\n').replace('\r', '\\r')
html += f"""
ā¼ Final API Response
{log_data[7]}
"""
html += "
"
# Add error information if present
if log_data[11]:
html += f"""
ā Error Information
Error Message:
{log_data[11]}
"""
# Add technical details section
html += f"""
š Technical Details
User Agent: {log_data[5] or 'Not available'}
"""
if log_data[6]:
html += f"""
Request Data:
{log_data[6]}
"""
else:
html += "
No request data captured
"
html += "
"
# Add JavaScript section safely
html += """
"""
return html
except Exception as e:
return f"
Error
{str(e)}
", 500
@app.route("/preview/", methods=["GET"])
def preview_page(request_id):
"""preview with better formatting and metadata"""
try:
con = sql.connect("swift_check.db")
cur = con.cursor()
# Get template JSON
cur.execute("""
SELECT template_json
FROM json_templates
WHERE request_id = ?
""", (request_id,))
template_result = cur.fetchone()
# Get parameters
cur.execute("""
SELECT parameter_name, type, spec, dropdown_options, include_remarks, section, clause_reference
FROM parameters
WHERE request_id = ?
ORDER BY id
""", (request_id,))
parameters = cur.fetchall()
# Get request details
cur.execute("""
SELECT doc_type, product_name, supplier_name
FROM qc_requests
WHERE id = ?
""", (request_id,))
request_details = cur.fetchone()
con.close()
if not template_result:
return f"""
Not Found
Template not found
No template exists for request ID {request_id}
View History
""", 404
json_template = json.loads(template_result[0])
# Generate ASCII preview with sections
ascii_preview = "āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā\n"
if request_details:
header = f"{request_details[1]} {request_details[0]}"
else:
header = "QC Template"
header_padding = (70 - len(header)) // 2
ascii_preview += f"ā{' ' * header_padding}{header}{' ' * (70 - header_padding - len(header))}ā\n"
if request_details and request_details[2]:
supplier = f"Supplier: {request_details[2]}"
supplier_padding = (70 - len(supplier)) // 2
ascii_preview += f"ā{' ' * supplier_padding}{supplier}{' ' * (70 - supplier_padding - len(supplier))}ā\n"
ascii_preview += "āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā\n\n"
# Group parameters by section
sections = {}
for param in parameters:
param_name, param_type, spec, options, include_remarks, section, clause_ref = param
section = section or "General Parameters"
if section not in sections:
sections[section] = []
sections[section].append(param)
# Add parameters organized by sections
for section_name, section_params in sections.items():
ascii_preview += f"\nš¹ {section_name.upper()}\n"
ascii_preview += "ā" * 60 + "\n"
for param in section_params:
param_name, param_type, spec, options, include_remarks, section, clause_ref = param
# Add clause reference if available
display_name = param_name
if clause_ref:
display_name += f" ({clause_ref})"
if param_type == "Image Upload":
ascii_preview += f"[š·] {display_name}: [ Upload Photo ] + Toggle Assessment\n"
elif param_type == "Toggle":
ascii_preview += f"[ā] {display_name}: ā Acceptable ā Not Acceptable\n"
elif param_type == "Dropdown":
ascii_preview += f"[ā¼] {display_name}: _________________ "
if options:
option_list = [opt.strip() for opt in options.split(",")[:3]]
ascii_preview += f"({', '.join(option_list)}{'...' if len(options.split(',')) > 3 else ''})\n"
else:
ascii_preview += "\n"
elif param_type == "Checklist":
ascii_preview += f" {display_name}:\n"
if options:
option_list = [opt.strip() for opt in options.split(",")]
for opt in option_list[:5]:
ascii_preview += f" ā {opt}\n"
if len(option_list) > 5:
ascii_preview += f" ... and {len(option_list) - 5} more items\n"
else:
ascii_preview += " ā Item 1\n"
elif param_type == "Numeric Input":
ascii_preview += f"[#ļøā£] {display_name}: _____________"
if spec:
ascii_preview += f" (Spec: {spec})\n"
else:
ascii_preview += "\n"
elif param_type == "Text Input":
ascii_preview += f"[āļø] {display_name}: _____________________________\n"
elif param_type == "Remarks":
ascii_preview += f"[š] {display_name}:\n"
ascii_preview += " āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā\n"
ascii_preview += " ā ā\n"
ascii_preview += " ā ā\n"
ascii_preview += " āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā\n"
if include_remarks == "Yes" and param_type != "Remarks":
ascii_preview += f" āā Additional Remarks: _______________________\n"
ascii_preview += "\n"
# Add final assessment
ascii_preview += "ā" * 70 + "\n"
ascii_preview += "šÆ FINAL ASSESSMENT\n"
ascii_preview += "ā" * 70 + "\n"
ascii_preview += "[ā ] Overall Quality Assessment: ā APPROVED ā REJECTED\n\n"
ascii_preview += "[š¤] Inspector Name & Signature: _________________________________\n\n"
ascii_preview += "[š] Final Comprehensive Remarks:\n"
ascii_preview += " āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā\n"
ascii_preview += " ā Overall assessment, corrective actions, and observations ā\n"
ascii_preview += " ā ā\n"
ascii_preview += " ā ā\n"
ascii_preview += " āāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāāā\n"
# statistics
total_params = len(parameters)
param_types = {}
sections_count = len(sections)
regulatory_refs = sum(1 for param in parameters if param[6]) # clause references
for param in parameters:
param_type = param[1]
param_types[param_type] = param_types.get(param_type, 0) + 1
html = f"""
QC Template Preview - Request #{request_id}
QC Template Preview - Request #{request_id}
š„ļø ASCII Preview
{ascii_preview}
š JSON Template
{json.dumps(json_template, indent=2)}
"""
return html
except Exception as e:
print(f"ā Error in /preview/{request_id}: {str(e)}")
return f"
Error
{str(e)}
", 500
if __name__ == "__main__":
print("š Starting Swift Check API v2.0...")
app.run(host="127.0.0.1", port=5000, debug=True)