File size: 5,048 Bytes
da3b695 f3b2b2d da3b695 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 | from sqlalchemy import and_
from app.database.connection import Complaint, ComplaintUser
import numpy as np
def handle_duplicate_complaint(db_session, db_id, current_user_id):
"""
Helper function to update an existing complaint's frequency and
link a new user if they haven't already reported this specific issue.
Returns the updated frequency count.
"""
try:
# Fetch the existing complaint from SQL
complaint = db_session.query(Complaint).filter(Complaint.id == db_id).first()
if not complaint:
print(f"Error: Complaint ID {db_id} not found in SQL.")
return None
# 1. Update global frequency
complaint.frequency += 1
current_freq = complaint.frequency
# 2. Add user if not already linked (prevents duplicate entries for the same user)
user_exists = db_session.query(ComplaintUser).filter(
and_(
ComplaintUser.complaint_id == db_id,
ComplaintUser.user_id == current_user_id
)
).first()
if not user_exists:
db_session.add(ComplaintUser(complaint_id=db_id, user_id=current_user_id))
# Commit changes to persist frequency and the user link
db_session.commit()
return current_freq
except Exception as sql_err:
db_session.rollback()
print(f"SQL Update Error in handle_duplicate_complaint: {sql_err}")
raise sql_err
#----------------------------------------------------------------------------
def match_complaints(db_session, complaint_text, latitude, longitude, department, current_user_id, retriever):
"""
Returns: (is_duplicate, db_id, frequency, similarity)
"""
lat_filter = round(float(latitude), 3)
lng_filter = round(float(longitude), 3)
try:
results = retriever.invoke(
complaint_text,
filter={
"department": department,
"latitude": lat_filter,
"longitude": lng_filter
}
)
except Exception as e:
print(f"Retrieval error: {e}")
return False, None, 0, 0.0 #
if not results:
return False, None, 0, 0.0 #
# Top match
top_match = results[0]
similarity_score = top_match.metadata.get("score", 0.0)
db_id = top_match.metadata.get("db_id")
# Duplicate case
if similarity_score >= 0.80 and db_id:
try:
updated_freq = handle_duplicate_complaint(db_session, db_id, current_user_id)
if updated_freq is not None:
return True, db_id, updated_freq, similarity_score
except Exception:
return False, None, 0, similarity_score # already correct
# Not duplicate
return False, None, 0, similarity_score
#------------------------------------------------------------------------------------
def priority_calculator(complaint_text: str, department: str, frequency: int, retriever):
"""
Core logic to calculate priority based on semantic similarity to
departmental urgency benchmarks and frequency volume.
"""
sub_classes = ["High", "Medium", "Low"]
base_weights = {"High": 6.0, "Medium": 3.0, "Low": 1.0}
winning_label = "Low"
max_sim = 0.0
# Iterate through classes to find where this complaint fits best semantically
for subclass in sub_classes:
try:
results = retriever.invoke(
complaint_text,
filter={
"department": department,
"urgency_level": subclass
}
)
print(len(results))
if results:
# LangChain Document metadata usually holds the score
# This depends on your retriever configuration (e.g., Pinecone/similarity_search_with_score)
sim = results[0].metadata['score']
else:
sim = 0.0
except Exception as e:
print(f"Retriever error in subclass {subclass}: {e}")
sim = 0.0
if sim > max_sim:
max_sim = sim
winning_label = subclass
# --- The Formula ---
# 1. Start with the base weight of the semantic match
base = base_weights[winning_label]
# 2. Add a boost based on similarity strength (max +2.0)
similarity_boost = max_sim * 2.0
# 3. Add a log-based frequency boost (diminishing returns)
# log1p handles frequency=1 (log(2) = ~0.69)
if frequency > 1:
frequency_boost = np.log1p(frequency)
else:
frequency_boost = 0.0
# 4. Sum and Clamp
final_score = base + similarity_boost + frequency_boost
final_score = min(final_score, 10.0)
return {
"final_score": round(float(final_score), 2),
"winning_label": winning_label,
"similarity": round(float(max_sim), 3),
}
#------------------------------------------------------------------------------------
|