BrandScanAI / db_operations.py
Arun21102003
Deployment preparation (removed binary files)
90fe073
from database import (
get_session, BrandAnalysis, Article, BrandMention,
ScheduledMonitoring, CoMention
)
from datetime import datetime
from typing import List, Dict
import streamlit as st
def save_analysis_to_db(search_query: str, brand_name: str, search_engine: str,
analysis_results: List[Dict]) -> int:
"""
Save brand analysis results to database
Returns: analysis_id
"""
session = None
try:
session = get_session()
# Calculate aggregates
total_articles = len(analysis_results)
articles_with_mentions = sum(1 for r in analysis_results if r.get('total_mentions', 0) > 0)
total_mentions = sum(r.get('total_mentions', 0) for r in analysis_results)
# Count sentiments
positive_count = 0
negative_count = 0
neutral_count = 0
for result in analysis_results:
analysis = result.get('analysis', {})
for mention in analysis.get('explicit_mentions', []):
sentiment = mention.get('sentiment', 'neutral')
if sentiment == 'positive':
positive_count += 1
elif sentiment == 'negative':
negative_count += 1
else:
neutral_count += 1
for mention in analysis.get('indirect_mentions', []):
sentiment = mention.get('sentiment', 'neutral')
if sentiment == 'positive':
positive_count += 1
elif sentiment == 'negative':
negative_count += 1
else:
neutral_count += 1
# Create brand analysis record
brand_analysis = BrandAnalysis(
search_query=search_query,
brand_name=brand_name,
search_engine=search_engine,
total_articles=total_articles,
articles_with_mentions=articles_with_mentions,
total_mentions=total_mentions,
positive_count=positive_count,
negative_count=negative_count,
neutral_count=neutral_count
)
session.add(brand_analysis)
session.flush() # Get the ID
# Save articles and mentions
for result in analysis_results:
article = Article(
analysis_id=brand_analysis.id,
url=result.get('url', ''),
title=result.get('title', ''),
content=result.get('content', '')[:1000], # Limit content size
overall_sentiment=result.get('analysis', {}).get('overall_sentiment', 'neutral'),
summary=result.get('analysis', {}).get('summary', '')
)
session.add(article)
session.flush()
# Save mentions
analysis_data = result.get('analysis', {})
for mention in analysis_data.get('explicit_mentions', []):
brand_mention = BrandMention(
analysis_id=brand_analysis.id,
article_id=article.id,
brand_name=brand_name,
mention_type='explicit',
mention_text=mention.get('mention', ''),
context=mention.get('context', ''),
sentiment=mention.get('sentiment', 'neutral'),
confidence=0.8, # Default confidence for explicit mentions
explanation=mention.get('explanation', '')
)
session.add(brand_mention)
for mention in analysis_data.get('indirect_mentions', []):
brand_mention = BrandMention(
analysis_id=brand_analysis.id,
article_id=article.id,
brand_name=brand_name,
mention_type='indirect',
mention_text=mention.get('reference', ''),
context=mention.get('context', ''),
sentiment=mention.get('sentiment', 'neutral'),
confidence=0.6, # Lower confidence for indirect mentions
explanation=mention.get('explanation', '')
)
session.add(brand_mention)
session.commit()
analysis_id = brand_analysis.id
session.close()
return analysis_id
except Exception as e:
st.error(f"Database error: {str(e)}")
if session:
session.rollback()
session.close()
return None
def get_historical_analyses(brand_name: str = None, limit: int = 100):
"""Get historical analyses, optionally filtered by brand name"""
session = None
try:
session = get_session()
query = session.query(BrandAnalysis)
if brand_name:
query = query.filter(BrandAnalysis.brand_name == brand_name)
analyses = query.order_by(BrandAnalysis.created_at.desc()).limit(limit).all()
session.close()
return analyses
except Exception as e:
st.error(f"Database query error: {str(e)}")
return []
def get_all_mentions(analysis_id: int = None, sentiment: str = None):
"""Get mentions, optionally filtered by analysis_id and sentiment"""
try:
session = get_session()
query = session.query(BrandMention)
if analysis_id:
query = query.filter(BrandMention.analysis_id == analysis_id)
if sentiment:
query = query.filter(BrandMention.sentiment == sentiment)
mentions = query.order_by(BrandMention.created_at.desc()).all()
session.close()
return mentions
except Exception as e:
st.error(f"Database query error: {str(e)}")
return []
def save_co_mentions(article_id: int, brands: List[str]):
"""Save co-mention relationships for brands in the same article"""
try:
session = get_session()
# Create co-mentions for each pair of brands
for i, brand1 in enumerate(brands):
for brand2 in brands[i+1:]:
# Ensure consistent ordering (alphabetical)
b1, b2 = sorted([brand1, brand2])
# Check if co-mention already exists
existing = session.query(CoMention).filter(
CoMention.brand1 == b1,
CoMention.brand2 == b2,
CoMention.article_id == article_id
).first()
if existing:
existing.co_occurrence_count += 1
else:
co_mention = CoMention(
brand1=b1,
brand2=b2,
article_id=article_id,
co_occurrence_count=1
)
session.add(co_mention)
session.commit()
session.close()
except Exception as e:
st.error(f"Error saving co-mentions: {str(e)}")
if session:
session.rollback()
session.close()
def get_co_mention_network():
"""Get all co-mention relationships for network visualization"""
try:
session = get_session()
co_mentions = session.query(CoMention).all()
session.close()
return co_mentions
except Exception as e:
st.error(f"Database query error: {str(e)}")
return []
def create_scheduled_job(search_query: str, brand_names: List[str],
search_engines: List[str], schedule_type: str = 'weekly'):
"""Create a new scheduled monitoring job"""
try:
session = get_session()
job = ScheduledMonitoring(
search_query=search_query,
brand_names=','.join(brand_names),
search_engines=','.join(search_engines),
schedule_type=schedule_type,
is_active=True
)
session.add(job)
session.commit()
job_id = job.id
session.close()
return job_id
except Exception as e:
st.error(f"Error creating scheduled job: {str(e)}")
if session:
session.rollback()
session.close()
return None
def get_scheduled_jobs(active_only: bool = True):
"""Get all scheduled monitoring jobs"""
try:
session = get_session()
query = session.query(ScheduledMonitoring)
if active_only:
query = query.filter(ScheduledMonitoring.is_active == True)
jobs = query.order_by(ScheduledMonitoring.created_at.desc()).all()
session.close()
return jobs
except Exception as e:
st.error(f"Database query error: {str(e)}")
return []
def update_job_schedule(job_id: int, last_run: datetime, next_run: datetime):
"""Update job schedule after execution"""
try:
session = get_session()
job = session.query(ScheduledMonitoring).filter(
ScheduledMonitoring.id == job_id
).first()
if job:
job.last_run = last_run
job.next_run = next_run
job.updated_at = datetime.utcnow()
session.commit()
session.close()
except Exception as e:
st.error(f"Error updating job schedule: {str(e)}")
if session:
session.rollback()
session.close()