AIService / app /utils /utils.py
Gaykar's picture
kka
f3b2b2d
from sqlalchemy import and_
from app.database.connection import Complaint, ComplaintUser
import numpy as np
def handle_duplicate_complaint(db_session, db_id, current_user_id):
"""
Helper function to update an existing complaint's frequency and
link a new user if they haven't already reported this specific issue.
Returns the updated frequency count.
"""
try:
# Fetch the existing complaint from SQL
complaint = db_session.query(Complaint).filter(Complaint.id == db_id).first()
if not complaint:
print(f"Error: Complaint ID {db_id} not found in SQL.")
return None
# 1. Update global frequency
complaint.frequency += 1
current_freq = complaint.frequency
# 2. Add user if not already linked (prevents duplicate entries for the same user)
user_exists = db_session.query(ComplaintUser).filter(
and_(
ComplaintUser.complaint_id == db_id,
ComplaintUser.user_id == current_user_id
)
).first()
if not user_exists:
db_session.add(ComplaintUser(complaint_id=db_id, user_id=current_user_id))
# Commit changes to persist frequency and the user link
db_session.commit()
return current_freq
except Exception as sql_err:
db_session.rollback()
print(f"SQL Update Error in handle_duplicate_complaint: {sql_err}")
raise sql_err
#----------------------------------------------------------------------------
def match_complaints(db_session, complaint_text, latitude, longitude, department, current_user_id, retriever):
"""
Returns: (is_duplicate, db_id, frequency, similarity)
"""
lat_filter = round(float(latitude), 3)
lng_filter = round(float(longitude), 3)
try:
results = retriever.invoke(
complaint_text,
filter={
"department": department,
"latitude": lat_filter,
"longitude": lng_filter
}
)
except Exception as e:
print(f"Retrieval error: {e}")
return False, None, 0, 0.0 #
if not results:
return False, None, 0, 0.0 #
# Top match
top_match = results[0]
similarity_score = top_match.metadata.get("score", 0.0)
db_id = top_match.metadata.get("db_id")
# Duplicate case
if similarity_score >= 0.80 and db_id:
try:
updated_freq = handle_duplicate_complaint(db_session, db_id, current_user_id)
if updated_freq is not None:
return True, db_id, updated_freq, similarity_score
except Exception:
return False, None, 0, similarity_score # already correct
# Not duplicate
return False, None, 0, similarity_score
#------------------------------------------------------------------------------------
def priority_calculator(complaint_text: str, department: str, frequency: int, retriever):
"""
Core logic to calculate priority based on semantic similarity to
departmental urgency benchmarks and frequency volume.
"""
sub_classes = ["High", "Medium", "Low"]
base_weights = {"High": 6.0, "Medium": 3.0, "Low": 1.0}
winning_label = "Low"
max_sim = 0.0
# Iterate through classes to find where this complaint fits best semantically
for subclass in sub_classes:
try:
results = retriever.invoke(
complaint_text,
filter={
"department": department,
"urgency_level": subclass
}
)
print(len(results))
if results:
# LangChain Document metadata usually holds the score
# This depends on your retriever configuration (e.g., Pinecone/similarity_search_with_score)
sim = results[0].metadata['score']
else:
sim = 0.0
except Exception as e:
print(f"Retriever error in subclass {subclass}: {e}")
sim = 0.0
if sim > max_sim:
max_sim = sim
winning_label = subclass
# --- The Formula ---
# 1. Start with the base weight of the semantic match
base = base_weights[winning_label]
# 2. Add a boost based on similarity strength (max +2.0)
similarity_boost = max_sim * 2.0
# 3. Add a log-based frequency boost (diminishing returns)
# log1p handles frequency=1 (log(2) = ~0.69)
if frequency > 1:
frequency_boost = np.log1p(frequency)
else:
frequency_boost = 0.0
# 4. Sum and Clamp
final_score = base + similarity_boost + frequency_boost
final_score = min(final_score, 10.0)
return {
"final_score": round(float(final_score), 2),
"winning_label": winning_label,
"similarity": round(float(max_sim), 3),
}
#------------------------------------------------------------------------------------