Spaces:
Sleeping
Sleeping
| from database import ( | |
| get_session, BrandAnalysis, Article, BrandMention, | |
| ScheduledMonitoring, CoMention | |
| ) | |
| from datetime import datetime | |
| from typing import List, Dict | |
| import streamlit as st | |
| def save_analysis_to_db(search_query: str, brand_name: str, search_engine: str, | |
| analysis_results: List[Dict]) -> int: | |
| """ | |
| Save brand analysis results to database | |
| Returns: analysis_id | |
| """ | |
| session = None | |
| try: | |
| session = get_session() | |
| # Calculate aggregates | |
| total_articles = len(analysis_results) | |
| articles_with_mentions = sum(1 for r in analysis_results if r.get('total_mentions', 0) > 0) | |
| total_mentions = sum(r.get('total_mentions', 0) for r in analysis_results) | |
| # Count sentiments | |
| positive_count = 0 | |
| negative_count = 0 | |
| neutral_count = 0 | |
| for result in analysis_results: | |
| analysis = result.get('analysis', {}) | |
| for mention in analysis.get('explicit_mentions', []): | |
| sentiment = mention.get('sentiment', 'neutral') | |
| if sentiment == 'positive': | |
| positive_count += 1 | |
| elif sentiment == 'negative': | |
| negative_count += 1 | |
| else: | |
| neutral_count += 1 | |
| for mention in analysis.get('indirect_mentions', []): | |
| sentiment = mention.get('sentiment', 'neutral') | |
| if sentiment == 'positive': | |
| positive_count += 1 | |
| elif sentiment == 'negative': | |
| negative_count += 1 | |
| else: | |
| neutral_count += 1 | |
| # Create brand analysis record | |
| brand_analysis = BrandAnalysis( | |
| search_query=search_query, | |
| brand_name=brand_name, | |
| search_engine=search_engine, | |
| total_articles=total_articles, | |
| articles_with_mentions=articles_with_mentions, | |
| total_mentions=total_mentions, | |
| positive_count=positive_count, | |
| negative_count=negative_count, | |
| neutral_count=neutral_count | |
| ) | |
| session.add(brand_analysis) | |
| session.flush() # Get the ID | |
| # Save articles and mentions | |
| for result in analysis_results: | |
| article = Article( | |
| analysis_id=brand_analysis.id, | |
| url=result.get('url', ''), | |
| title=result.get('title', ''), | |
| content=result.get('content', '')[:1000], # Limit content size | |
| overall_sentiment=result.get('analysis', {}).get('overall_sentiment', 'neutral'), | |
| summary=result.get('analysis', {}).get('summary', '') | |
| ) | |
| session.add(article) | |
| session.flush() | |
| # Save mentions | |
| analysis_data = result.get('analysis', {}) | |
| for mention in analysis_data.get('explicit_mentions', []): | |
| brand_mention = BrandMention( | |
| analysis_id=brand_analysis.id, | |
| article_id=article.id, | |
| brand_name=brand_name, | |
| mention_type='explicit', | |
| mention_text=mention.get('mention', ''), | |
| context=mention.get('context', ''), | |
| sentiment=mention.get('sentiment', 'neutral'), | |
| confidence=0.8, # Default confidence for explicit mentions | |
| explanation=mention.get('explanation', '') | |
| ) | |
| session.add(brand_mention) | |
| for mention in analysis_data.get('indirect_mentions', []): | |
| brand_mention = BrandMention( | |
| analysis_id=brand_analysis.id, | |
| article_id=article.id, | |
| brand_name=brand_name, | |
| mention_type='indirect', | |
| mention_text=mention.get('reference', ''), | |
| context=mention.get('context', ''), | |
| sentiment=mention.get('sentiment', 'neutral'), | |
| confidence=0.6, # Lower confidence for indirect mentions | |
| explanation=mention.get('explanation', '') | |
| ) | |
| session.add(brand_mention) | |
| session.commit() | |
| analysis_id = brand_analysis.id | |
| session.close() | |
| return analysis_id | |
| except Exception as e: | |
| st.error(f"Database error: {str(e)}") | |
| if session: | |
| session.rollback() | |
| session.close() | |
| return None | |
| def get_historical_analyses(brand_name: str = None, limit: int = 100): | |
| """Get historical analyses, optionally filtered by brand name""" | |
| session = None | |
| try: | |
| session = get_session() | |
| query = session.query(BrandAnalysis) | |
| if brand_name: | |
| query = query.filter(BrandAnalysis.brand_name == brand_name) | |
| analyses = query.order_by(BrandAnalysis.created_at.desc()).limit(limit).all() | |
| session.close() | |
| return analyses | |
| except Exception as e: | |
| st.error(f"Database query error: {str(e)}") | |
| return [] | |
| def get_all_mentions(analysis_id: int = None, sentiment: str = None): | |
| """Get mentions, optionally filtered by analysis_id and sentiment""" | |
| try: | |
| session = get_session() | |
| query = session.query(BrandMention) | |
| if analysis_id: | |
| query = query.filter(BrandMention.analysis_id == analysis_id) | |
| if sentiment: | |
| query = query.filter(BrandMention.sentiment == sentiment) | |
| mentions = query.order_by(BrandMention.created_at.desc()).all() | |
| session.close() | |
| return mentions | |
| except Exception as e: | |
| st.error(f"Database query error: {str(e)}") | |
| return [] | |
| def save_co_mentions(article_id: int, brands: List[str]): | |
| """Save co-mention relationships for brands in the same article""" | |
| try: | |
| session = get_session() | |
| # Create co-mentions for each pair of brands | |
| for i, brand1 in enumerate(brands): | |
| for brand2 in brands[i+1:]: | |
| # Ensure consistent ordering (alphabetical) | |
| b1, b2 = sorted([brand1, brand2]) | |
| # Check if co-mention already exists | |
| existing = session.query(CoMention).filter( | |
| CoMention.brand1 == b1, | |
| CoMention.brand2 == b2, | |
| CoMention.article_id == article_id | |
| ).first() | |
| if existing: | |
| existing.co_occurrence_count += 1 | |
| else: | |
| co_mention = CoMention( | |
| brand1=b1, | |
| brand2=b2, | |
| article_id=article_id, | |
| co_occurrence_count=1 | |
| ) | |
| session.add(co_mention) | |
| session.commit() | |
| session.close() | |
| except Exception as e: | |
| st.error(f"Error saving co-mentions: {str(e)}") | |
| if session: | |
| session.rollback() | |
| session.close() | |
| def get_co_mention_network(): | |
| """Get all co-mention relationships for network visualization""" | |
| try: | |
| session = get_session() | |
| co_mentions = session.query(CoMention).all() | |
| session.close() | |
| return co_mentions | |
| except Exception as e: | |
| st.error(f"Database query error: {str(e)}") | |
| return [] | |
| def create_scheduled_job(search_query: str, brand_names: List[str], | |
| search_engines: List[str], schedule_type: str = 'weekly'): | |
| """Create a new scheduled monitoring job""" | |
| try: | |
| session = get_session() | |
| job = ScheduledMonitoring( | |
| search_query=search_query, | |
| brand_names=','.join(brand_names), | |
| search_engines=','.join(search_engines), | |
| schedule_type=schedule_type, | |
| is_active=True | |
| ) | |
| session.add(job) | |
| session.commit() | |
| job_id = job.id | |
| session.close() | |
| return job_id | |
| except Exception as e: | |
| st.error(f"Error creating scheduled job: {str(e)}") | |
| if session: | |
| session.rollback() | |
| session.close() | |
| return None | |
| def get_scheduled_jobs(active_only: bool = True): | |
| """Get all scheduled monitoring jobs""" | |
| try: | |
| session = get_session() | |
| query = session.query(ScheduledMonitoring) | |
| if active_only: | |
| query = query.filter(ScheduledMonitoring.is_active == True) | |
| jobs = query.order_by(ScheduledMonitoring.created_at.desc()).all() | |
| session.close() | |
| return jobs | |
| except Exception as e: | |
| st.error(f"Database query error: {str(e)}") | |
| return [] | |
| def update_job_schedule(job_id: int, last_run: datetime, next_run: datetime): | |
| """Update job schedule after execution""" | |
| try: | |
| session = get_session() | |
| job = session.query(ScheduledMonitoring).filter( | |
| ScheduledMonitoring.id == job_id | |
| ).first() | |
| if job: | |
| job.last_run = last_run | |
| job.next_run = next_run | |
| job.updated_at = datetime.utcnow() | |
| session.commit() | |
| session.close() | |
| except Exception as e: | |
| st.error(f"Error updating job schedule: {str(e)}") | |
| if session: | |
| session.rollback() | |
| session.close() | |