from database import ( get_session, BrandAnalysis, Article, BrandMention, ScheduledMonitoring, CoMention ) from datetime import datetime from typing import List, Dict import streamlit as st def save_analysis_to_db(search_query: str, brand_name: str, search_engine: str, analysis_results: List[Dict]) -> int: """ Save brand analysis results to database Returns: analysis_id """ session = None try: session = get_session() # Calculate aggregates total_articles = len(analysis_results) articles_with_mentions = sum(1 for r in analysis_results if r.get('total_mentions', 0) > 0) total_mentions = sum(r.get('total_mentions', 0) for r in analysis_results) # Count sentiments positive_count = 0 negative_count = 0 neutral_count = 0 for result in analysis_results: analysis = result.get('analysis', {}) for mention in analysis.get('explicit_mentions', []): sentiment = mention.get('sentiment', 'neutral') if sentiment == 'positive': positive_count += 1 elif sentiment == 'negative': negative_count += 1 else: neutral_count += 1 for mention in analysis.get('indirect_mentions', []): sentiment = mention.get('sentiment', 'neutral') if sentiment == 'positive': positive_count += 1 elif sentiment == 'negative': negative_count += 1 else: neutral_count += 1 # Create brand analysis record brand_analysis = BrandAnalysis( search_query=search_query, brand_name=brand_name, search_engine=search_engine, total_articles=total_articles, articles_with_mentions=articles_with_mentions, total_mentions=total_mentions, positive_count=positive_count, negative_count=negative_count, neutral_count=neutral_count ) session.add(brand_analysis) session.flush() # Get the ID # Save articles and mentions for result in analysis_results: article = Article( analysis_id=brand_analysis.id, url=result.get('url', ''), title=result.get('title', ''), content=result.get('content', '')[:1000], # Limit content size overall_sentiment=result.get('analysis', {}).get('overall_sentiment', 'neutral'), summary=result.get('analysis', {}).get('summary', '') ) session.add(article) session.flush() # Save mentions analysis_data = result.get('analysis', {}) for mention in analysis_data.get('explicit_mentions', []): brand_mention = BrandMention( analysis_id=brand_analysis.id, article_id=article.id, brand_name=brand_name, mention_type='explicit', mention_text=mention.get('mention', ''), context=mention.get('context', ''), sentiment=mention.get('sentiment', 'neutral'), confidence=0.8, # Default confidence for explicit mentions explanation=mention.get('explanation', '') ) session.add(brand_mention) for mention in analysis_data.get('indirect_mentions', []): brand_mention = BrandMention( analysis_id=brand_analysis.id, article_id=article.id, brand_name=brand_name, mention_type='indirect', mention_text=mention.get('reference', ''), context=mention.get('context', ''), sentiment=mention.get('sentiment', 'neutral'), confidence=0.6, # Lower confidence for indirect mentions explanation=mention.get('explanation', '') ) session.add(brand_mention) session.commit() analysis_id = brand_analysis.id session.close() return analysis_id except Exception as e: st.error(f"Database error: {str(e)}") if session: session.rollback() session.close() return None def get_historical_analyses(brand_name: str = None, limit: int = 100): """Get historical analyses, optionally filtered by brand name""" session = None try: session = get_session() query = session.query(BrandAnalysis) if brand_name: query = query.filter(BrandAnalysis.brand_name == brand_name) analyses = query.order_by(BrandAnalysis.created_at.desc()).limit(limit).all() session.close() return analyses except Exception as e: st.error(f"Database query error: {str(e)}") return [] def get_all_mentions(analysis_id: int = None, sentiment: str = None): """Get mentions, optionally filtered by analysis_id and sentiment""" try: session = get_session() query = session.query(BrandMention) if analysis_id: query = query.filter(BrandMention.analysis_id == analysis_id) if sentiment: query = query.filter(BrandMention.sentiment == sentiment) mentions = query.order_by(BrandMention.created_at.desc()).all() session.close() return mentions except Exception as e: st.error(f"Database query error: {str(e)}") return [] def save_co_mentions(article_id: int, brands: List[str]): """Save co-mention relationships for brands in the same article""" try: session = get_session() # Create co-mentions for each pair of brands for i, brand1 in enumerate(brands): for brand2 in brands[i+1:]: # Ensure consistent ordering (alphabetical) b1, b2 = sorted([brand1, brand2]) # Check if co-mention already exists existing = session.query(CoMention).filter( CoMention.brand1 == b1, CoMention.brand2 == b2, CoMention.article_id == article_id ).first() if existing: existing.co_occurrence_count += 1 else: co_mention = CoMention( brand1=b1, brand2=b2, article_id=article_id, co_occurrence_count=1 ) session.add(co_mention) session.commit() session.close() except Exception as e: st.error(f"Error saving co-mentions: {str(e)}") if session: session.rollback() session.close() def get_co_mention_network(): """Get all co-mention relationships for network visualization""" try: session = get_session() co_mentions = session.query(CoMention).all() session.close() return co_mentions except Exception as e: st.error(f"Database query error: {str(e)}") return [] def create_scheduled_job(search_query: str, brand_names: List[str], search_engines: List[str], schedule_type: str = 'weekly'): """Create a new scheduled monitoring job""" try: session = get_session() job = ScheduledMonitoring( search_query=search_query, brand_names=','.join(brand_names), search_engines=','.join(search_engines), schedule_type=schedule_type, is_active=True ) session.add(job) session.commit() job_id = job.id session.close() return job_id except Exception as e: st.error(f"Error creating scheduled job: {str(e)}") if session: session.rollback() session.close() return None def get_scheduled_jobs(active_only: bool = True): """Get all scheduled monitoring jobs""" try: session = get_session() query = session.query(ScheduledMonitoring) if active_only: query = query.filter(ScheduledMonitoring.is_active == True) jobs = query.order_by(ScheduledMonitoring.created_at.desc()).all() session.close() return jobs except Exception as e: st.error(f"Database query error: {str(e)}") return [] def update_job_schedule(job_id: int, last_run: datetime, next_run: datetime): """Update job schedule after execution""" try: session = get_session() job = session.query(ScheduledMonitoring).filter( ScheduledMonitoring.id == job_id ).first() if job: job.last_run = last_run job.next_run = next_run job.updated_at = datetime.utcnow() session.commit() session.close() except Exception as e: st.error(f"Error updating job schedule: {str(e)}") if session: session.rollback() session.close()