import uuid from datetime import datetime from typing import Optional, List from .models import AnalysisResult, Session import json class DatabaseManager: """Manager for database operations""" @staticmethod def save_analysis_result( filename: str, file_format: str, file_size: int, text_length: int, word_count: int, overall_ai_score: float, overall_confidence: str, detector_results: dict, text_preview: str = "", user_id: Optional[str] = None, analysis_status: str = "completed", error_message: Optional[str] = None, report_html_path: Optional[str] = None ) -> Optional[AnalysisResult]: """ Save an analysis result to the database. Returns: AnalysisResult object or None if save failed """ try: session = Session.get_session() # Generate unique file ID file_id = str(uuid.uuid4()) result = AnalysisResult( file_id=file_id, filename=filename, file_format=file_format, file_size=file_size, text_length=text_length, word_count=word_count, overall_ai_score=overall_ai_score, overall_confidence=overall_confidence, detector_results=detector_results, text_preview=text_preview[:500], user_id=user_id, analysis_status=analysis_status, error_message=error_message, report_html_path=report_html_path, upload_timestamp=datetime.utcnow() ) session.add(result) session.commit() # Refresh to get ID session.refresh(result) return result except Exception as e: print(f"Error saving analysis result: {e}") return None finally: session.close() @staticmethod def get_result_by_id(result_id: int) -> Optional[AnalysisResult]: """Get analysis result by ID""" try: session = Session.get_session() result = session.query(AnalysisResult).filter( AnalysisResult.id == result_id ).first() return result except Exception as e: print(f"Error retrieving result: {e}") return None finally: session.close() @staticmethod def get_result_by_file_id(file_id: str) -> Optional[AnalysisResult]: """Get analysis result by file ID""" try: session = Session.get_session() result = session.query(AnalysisResult).filter( AnalysisResult.file_id == file_id ).first() return result except Exception as e: print(f"Error retrieving result: {e}") return None finally: session.close() @staticmethod def get_all_results( limit: int = 100, offset: int = 0, order_by: str = "upload_timestamp_desc" ) -> List[AnalysisResult]: """ Get all analysis results with pagination. Args: limit: Number of results to return offset: Number of results to skip order_by: Sort order (upload_timestamp_desc, upload_timestamp_asc, score_desc, score_asc) Returns: List of AnalysisResult objects """ try: session = Session.get_session() query = session.query(AnalysisResult) # Apply ordering if order_by == "upload_timestamp_desc": query = query.order_by(AnalysisResult.upload_timestamp.desc()) elif order_by == "upload_timestamp_asc": query = query.order_by(AnalysisResult.upload_timestamp.asc()) elif order_by == "score_desc": query = query.order_by(AnalysisResult.overall_ai_score.desc()) elif order_by == "score_asc": query = query.order_by(AnalysisResult.overall_ai_score.asc()) results = query.limit(limit).offset(offset).all() return results except Exception as e: print(f"Error retrieving results: {e}") return [] finally: session.close() @staticmethod def get_results_summary() -> dict: """Get summary statistics of all results""" try: session = Session.get_session() total = session.query(AnalysisResult).count() if total == 0: return { "total_analyses": 0, "average_ai_score": 0, "total_text_analyzed": 0, "likely_human": 0, "suspicious": 0, "likely_ai": 0, } from sqlalchemy import func # Calculate averages avg_score = session.query(func.avg(AnalysisResult.overall_ai_score)).scalar() or 0 total_text = session.query(func.sum(AnalysisResult.text_length)).scalar() or 0 # Count by confidence likely_human = session.query(AnalysisResult).filter( AnalysisResult.overall_ai_score < 0.3 ).count() suspicious = session.query(AnalysisResult).filter( (AnalysisResult.overall_ai_score >= 0.3) & (AnalysisResult.overall_ai_score < 0.7) ).count() likely_ai = session.query(AnalysisResult).filter( AnalysisResult.overall_ai_score >= 0.7 ).count() return { "total_analyses": total, "average_ai_score": round(avg_score, 3), "total_text_analyzed": total_text, "likely_human": likely_human, "suspicious": suspicious, "likely_ai": likely_ai, } except Exception as e: print(f"Error getting summary: {e}") return {} finally: session.close() @staticmethod def delete_result(result_id: int) -> bool: """Delete an analysis result""" try: session = Session.get_session() result = session.query(AnalysisResult).filter( AnalysisResult.id == result_id ).first() if result: session.delete(result) session.commit() return True return False except Exception as e: print(f"Error deleting result: {e}") return False finally: session.close() @staticmethod def update_result(result_id: int, **kwargs) -> Optional[AnalysisResult]: """Update an analysis result""" try: session = Session.get_session() result = session.query(AnalysisResult).filter( AnalysisResult.id == result_id ).first() if result: for key, value in kwargs.items(): if hasattr(result, key): setattr(result, key, value) session.commit() session.refresh(result) return result except Exception as e: print(f"Error updating result: {e}") return None finally: session.close()