| from sqlalchemy import and_ |
| from app.database.connection import Complaint, ComplaintUser |
| import numpy as np |
|
|
|
|
| def handle_duplicate_complaint(db_session, db_id, current_user_id): |
| """ |
| Helper function to update an existing complaint's frequency and |
| link a new user if they haven't already reported this specific issue. |
| Returns the updated frequency count. |
| """ |
| try: |
| |
| complaint = db_session.query(Complaint).filter(Complaint.id == db_id).first() |
| |
| if not complaint: |
| print(f"Error: Complaint ID {db_id} not found in SQL.") |
| return None |
|
|
| |
| complaint.frequency += 1 |
| current_freq = complaint.frequency |
| |
| |
| user_exists = db_session.query(ComplaintUser).filter( |
| and_( |
| ComplaintUser.complaint_id == db_id, |
| ComplaintUser.user_id == current_user_id |
| ) |
| ).first() |
|
|
| if not user_exists: |
| db_session.add(ComplaintUser(complaint_id=db_id, user_id=current_user_id)) |
| |
| |
| db_session.commit() |
| return current_freq |
| |
| except Exception as sql_err: |
| db_session.rollback() |
| print(f"SQL Update Error in handle_duplicate_complaint: {sql_err}") |
| raise sql_err |
|
|
| |
| def match_complaints(db_session, complaint_text, latitude, longitude, department, current_user_id, retriever): |
| """ |
| Returns: (is_duplicate, db_id, frequency, similarity) |
| """ |
|
|
| lat_filter = round(float(latitude), 3) |
| lng_filter = round(float(longitude), 3) |
|
|
| try: |
| results = retriever.invoke( |
| complaint_text, |
| filter={ |
| "department": department, |
| "latitude": lat_filter, |
| "longitude": lng_filter |
| } |
| ) |
| except Exception as e: |
| print(f"Retrieval error: {e}") |
| return False, None, 0, 0.0 |
|
|
| if not results: |
| return False, None, 0, 0.0 |
|
|
| |
| top_match = results[0] |
| similarity_score = top_match.metadata.get("score", 0.0) |
| db_id = top_match.metadata.get("db_id") |
|
|
| |
| if similarity_score >= 0.80 and db_id: |
| try: |
| updated_freq = handle_duplicate_complaint(db_session, db_id, current_user_id) |
| if updated_freq is not None: |
| return True, db_id, updated_freq, similarity_score |
| except Exception: |
| return False, None, 0, similarity_score |
|
|
| |
| return False, None, 0, similarity_score |
|
|
|
|
| |
|
|
| def priority_calculator(complaint_text: str, department: str, frequency: int, retriever): |
| """ |
| Core logic to calculate priority based on semantic similarity to |
| departmental urgency benchmarks and frequency volume. |
| """ |
| sub_classes = ["High", "Medium", "Low"] |
| base_weights = {"High": 6.0, "Medium": 3.0, "Low": 1.0} |
|
|
| winning_label = "Low" |
| max_sim = 0.0 |
|
|
| |
| for subclass in sub_classes: |
| try: |
| results = retriever.invoke( |
| complaint_text, |
| filter={ |
| "department": department, |
| "urgency_level": subclass |
| } |
| ) |
|
|
| print(len(results)) |
| |
| if results: |
| |
| |
| sim = results[0].metadata['score'] |
| else: |
| sim = 0.0 |
| except Exception as e: |
| print(f"Retriever error in subclass {subclass}: {e}") |
| sim = 0.0 |
|
|
| if sim > max_sim: |
| max_sim = sim |
| winning_label = subclass |
|
|
| |
| |
| base = base_weights[winning_label] |
| |
| |
| similarity_boost = max_sim * 2.0 |
| |
| |
| |
| if frequency > 1: |
| frequency_boost = np.log1p(frequency) |
| else: |
| frequency_boost = 0.0 |
|
|
| |
| final_score = base + similarity_boost + frequency_boost |
| final_score = min(final_score, 10.0) |
|
|
| return { |
| "final_score": round(float(final_score), 2), |
| "winning_label": winning_label, |
| "similarity": round(float(max_sim), 3), |
| } |
| |
|
|
|
|