"""
HuggingFace Spaces - Review Intelligence System (Streamlit)
Complete app with URL input, progress tracking, and interactive dashboard
FIXED VERSION - Better UI contrast + Proper field mapping
"""
import streamlit as st
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import os
from datetime import datetime
from typing import List, Dict, Optional
import time
from gradio_pipeline import GradioPipeline
# ============================================================================
# PAGE CONFIGURATION
# ============================================================================
st.set_page_config(
page_title="Review Intelligence System",
page_icon="đ¯",
layout="wide",
initial_sidebar_state="expanded"
)
# FIXED Custom CSS - Better Contrast
st.markdown("""
""", unsafe_allow_html=True)
# ============================================================================
# SESSION STATE INITIALIZATION
# ============================================================================
if 'processing_complete' not in st.session_state:
st.session_state.processing_complete = False
if 'results' not in st.session_state:
st.session_state.results = None
if 'insights' not in st.session_state:
st.session_state.insights = None
if 'scraped_count' not in st.session_state:
st.session_state.scraped_count = 0
# ============================================================================
# PROCESSING FUNCTIONS
# ============================================================================
def process_reviews_streamlit(app_store_urls: str, play_store_urls: str,
hf_api_key: str, review_limit: int):
"""
Process reviews with Streamlit progress tracking
"""
# Validate inputs
if not hf_api_key or not hf_api_key.strip():
st.error("â Please provide your HuggingFace API key")
return False
if not app_store_urls.strip() and not play_store_urls.strip():
st.error("â Please provide at least one App Store or Play Store URL")
return False
try:
# Set API key
os.environ['HUGGINGFACE_API_KEY'] = hf_api_key.strip()
# Progress indicators
progress_bar = st.progress(0)
status_text = st.empty()
# Initialize pipeline
status_text.text("đ Initializing pipeline...")
progress_bar.progress(5)
pipeline = GradioPipeline(review_limit=review_limit)
# Parse URLs
app_urls = [url.strip() for url in app_store_urls.split('\n') if url.strip()]
play_urls = [url.strip() for url in play_store_urls.split('\n') if url.strip()]
# Stage 0: Scraping
status_text.text("đˇī¸ Scraping reviews from stores...")
progress_bar.progress(10)
scraped_count = 0
total_apps = len(app_urls) + len(play_urls)
for i, app_id in enumerate(app_urls, 1):
status_text.text(f"đ Scraping App Store ({i}/{total_apps}): {app_id}")
reviews = pipeline.scraper.scrape_app_store_rss(app_id, country="ae", limit=review_limit)
saved = pipeline.scraper.save_reviews_to_db(reviews)
scraped_count += saved
progress_bar.progress(10 + int(20 * i / total_apps))
time.sleep(1)
for i, package in enumerate(play_urls, 1):
status_text.text(f"đ¤ Scraping Play Store ({i}/{total_apps}): {package}")
reviews = pipeline.scraper.scrape_play_store_api(package, country="ae", limit=review_limit)
saved = pipeline.scraper.save_reviews_to_db(reviews)
scraped_count += saved
progress_bar.progress(10 + int(20 * (len(app_urls) + i) / total_apps))
time.sleep(1)
if scraped_count == 0:
st.warning("â ī¸ No reviews scraped. Please check your URLs and try again.")
progress_bar.empty()
status_text.empty()
return False
st.session_state.scraped_count = scraped_count
# Stage 1-3: Processing
status_text.text("đ¤ Processing reviews with AI models...")
progress_bar.progress(30)
reviews = pipeline.db.get_pending_reviews(limit=review_limit)
total_reviews = len(reviews)
print(f"đ DEBUG: Found {total_reviews} reviews to process")
processed_states = []
for i, review in enumerate(reviews, 1):
review_id = review.get('review_id', 'unknown')[:20]
status_text.text(f"đ¤ Processing review {i}/{total_reviews}: {review_id}...")
progress_bar.progress(30 + int(60 * i / total_reviews))
try:
from langgraph_state import create_initial_state
state = create_initial_state(review)
config = {"configurable": {"thread_id": f"review_{review.get('review_id')}"}}
final_state = pipeline.review_graph.invoke(state, config=config)
# Convert to dict
state_dict = dict(final_state)
processed_states.append(state_dict)
# DEBUG: Print what we got
print(f"â
Processed {review_id}:")
print(f" Type: {state_dict.get('classification_type', 'MISSING')}")
print(f" Dept: {state_dict.get('department', 'MISSING')}")
print(f" Sentiment: {state_dict.get('final_sentiment', 'MISSING')}")
except Exception as e:
st.warning(f"â ī¸ Error processing review: {str(e)}")
print(f"â ERROR: {e}")
import traceback
print(traceback.format_exc())
continue
if len(processed_states) == 0:
st.error("â No reviews were processed successfully.")
progress_bar.empty()
status_text.empty()
return False
# Stage 4: Batch Analysis
status_text.text("đ Generating batch insights...")
progress_bar.progress(90)
insights = pipeline.analyze_batch(processed_states)
# Store in session state
st.session_state.results = processed_states
st.session_state.insights = insights
st.session_state.processing_complete = True
# Complete
progress_bar.progress(100)
status_text.text("â
Analysis complete!")
time.sleep(1)
progress_bar.empty()
status_text.empty()
return True
except Exception as e:
st.error(f"â Error during processing: {str(e)}")
import traceback
st.code(traceback.format_exc())
return False
# ============================================================================
# VISUALIZATION FUNCTIONS
# ============================================================================
def create_summary_section(scraped_count: int, results: List[Dict], insights: Dict):
"""Create summary metrics section"""
total = len(results)
positive = insights.get('sentiment_distribution', {}).get('POSITIVE', 0)
neutral = insights.get('sentiment_distribution', {}).get('NEUTRAL', 0)
negative = insights.get('sentiment_distribution', {}).get('NEGATIVE', 0)
critical = insights.get('priority_distribution', {}).get('critical', 0)
churn_risk = insights.get('churn_risk', 0)
# Success header
st.markdown(
f"""
â
Analysis Complete!
Review Intelligence System Results
""",
unsafe_allow_html=True
)
# Metrics with better styling
col1, col2, col3, col4, col5 = st.columns(5)
with col1:
st.metric("đ Total Reviews", total, f"Scraped: {scraped_count}")
with col2:
pos_pct = (positive / total * 100) if total > 0 else 0
st.metric("đ Positive", positive, f"{pos_pct:.1f}%")
with col3:
neg_pct = (negative / total * 100) if total > 0 else 0
st.metric("đ Negative", negative, f"{neg_pct:.1f}%")
with col4:
st.metric("đ¨ Critical", critical, "â ī¸" if critical > 0 else "â
")
with col5:
st.metric("đ Churn Risk", f"{churn_risk:.1f}%",
"đ´ High" if churn_risk > 30 else "đĸ Low")
# Recommendations
if insights.get('recommendations'):
st.markdown("### đĄ Key Recommendations")
for rec in insights.get('recommendations', []):
st.info(rec)
def create_sentiment_chart(insights: Dict):
"""Create sentiment distribution donut chart"""
sentiment_dist = insights.get('sentiment_distribution', {})
labels = list(sentiment_dist.keys())
values = list(sentiment_dist.values())
colors = ['#10b981', '#f59e0b', '#ef4444']
fig = go.Figure(data=[go.Pie(
labels=labels,
values=values,
hole=0.5,
marker_colors=colors,
textinfo='label+percent',
textposition='outside',
textfont_size=14
)])
fig.update_layout(
title="đ Sentiment Distribution",
showlegend=True,
height=400
)
return fig
def create_priority_chart(insights: Dict):
"""Create priority distribution bar chart"""
priority_dist = insights.get('priority_distribution', {})
priority_order = ['critical', 'high', 'medium', 'low']
labels = [p for p in priority_order if p in priority_dist]
values = [priority_dist.get(p, 0) for p in labels]
colors = ['#dc2626', '#f59e0b', '#3b82f6', '#10b981']
fig = go.Figure(data=[go.Bar(
x=labels,
y=values,
marker_color=colors[:len(labels)],
text=values,
textposition='auto'
)])
fig.update_layout(
title="đ¯ Priority Levels",
xaxis_title="Priority",
yaxis_title="Count",
height=400
)
return fig
def create_department_chart(insights: Dict):
"""Create department routing horizontal bar chart"""
dept_dist = insights.get('department_distribution', {})
labels = list(dept_dist.keys())
values = list(dept_dist.values())
fig = go.Figure(data=[go.Bar(
x=values,
y=labels,
orientation='h',
marker_color='#667eea',
text=values,
textposition='auto'
)])
fig.update_layout(
title="đĸ Department Routing",
xaxis_title="Number of Issues",
yaxis_title="Department",
height=400
)
return fig
def create_emotion_chart(insights: Dict):
"""Create emotion distribution chart"""
emotion_dist = insights.get('emotion_distribution', {})
labels = list(emotion_dist.keys())
values = list(emotion_dist.values())
fig = px.bar(
x=labels,
y=values,
labels={'x': 'Emotion', 'y': 'Count'},
color=values,
color_continuous_scale='Viridis'
)
fig.update_layout(
title="đ Emotional Analysis",
xaxis_title="Emotion Type",
yaxis_title="Number of Reviews",
height=300,
showlegend=False
)
return fig
def create_reviews_dataframe(results: List[Dict]) -> pd.DataFrame:
"""
FIXED: Create DataFrame with proper field mapping
Checks both state field names AND database field names
"""
df_data = []
for review in results:
# FIXED: Check state fields FIRST, fall back to database fields
df_data.append({
'Review ID': review.get('review_id', 'N/A')[:20],
'Rating': review.get('rating', 0),
'Review': (review.get('review_text', 'N/A') or '')[:100] + '...',
'Sentiment': review.get('final_sentiment', review.get('stage3_final_sentiment', 'N/A')),
'Type': review.get('classification_type', review.get('stage1_llm1_type', 'N/A')),
'Department': review.get('department', review.get('stage1_llm1_department', 'N/A')),
'Priority': review.get('priority', review.get('stage1_llm1_priority', 'N/A')),
'Emotion': review.get('emotion', review.get('stage1_llm2_emotion', 'N/A')),
'Needs Review': 'đ¨ Yes' if review.get('needs_human_review', review.get('stage3_needs_human_review')) else 'â
No'
})
return pd.DataFrame(df_data)
# ============================================================================
# MAIN APP
# ============================================================================
def main():
"""Main Streamlit app"""
# Title
st.title("đ¯ Review Intelligence System")
st.markdown("### Multi-Stage AI Analysis Dashboard")
st.markdown("Powered by **LangGraph** + **HuggingFace** âĸ 4-Stage Processing Pipeline")
st.markdown("---")
# Sidebar - Input or View Mode
with st.sidebar:
st.header("đī¸ Control Panel")
if st.session_state.processing_complete:
st.success("â
Analysis Complete!")
if st.button("đ Start New Analysis", use_container_width=True):
st.session_state.processing_complete = False
st.session_state.results = None
st.session_state.insights = None
st.rerun()
else:
st.info("đ Enter URLs below to start")
# Database Management Section
st.markdown("---")
st.markdown("### đī¸ Database Management")
# Show current database stats
try:
from database_enhanced import EnhancedDatabase
db = EnhancedDatabase()
db.connect()
cursor = db.conn.execute("SELECT COUNT(*) FROM reviews")
count = cursor.fetchone()[0]
db.close()
st.metric("Total Reviews in DB", count)
if count > 0:
st.caption(f"đĄ Database contains {count} reviews from previous analyses")
except:
st.metric("Total Reviews in DB", 0)
# Reset Database Button
if st.button("đī¸ Reset Database", type="secondary", use_container_width=True,
help="Delete all reviews and start fresh. Useful when switching between different apps."):
import os
if os.path.exists("review_database.db"):
os.remove("review_database.db")
st.success("â
Database deleted! Ready for fresh analysis.")
time.sleep(1)
st.rerun()
else:
st.info("âšī¸ No database found to delete")
# Main content - Input or Results
if not st.session_state.processing_complete:
show_input_form()
else:
show_results_dashboard()
def show_input_form():
"""Show input form for URLs and API key"""
st.markdown("### đ Step 1: Enter Store URLs")
col1, col2 = st.columns(2)
with col1:
st.markdown("#### đ App Store IDs")
st.markdown(
"""
**Format:** Just paste the app ID
- Example: `1158907446` (UAE)
- Example: `1234567890` (US)
"""
)
app_store_urls = st.text_area(
"App Store IDs (one per line)",
placeholder="1158907446\n1234567890",
height=150,
key="app_urls"
)
with col2:
st.markdown("#### đ¤ Play Store Packages")
st.markdown(
"""
**Format:** Package name
- Example: `com.yas.app`
- Example: `com.company.app`
"""
)
play_store_urls = st.text_area(
"Play Store Package Names (one per line)",
placeholder="com.yas.app\ncom.company.app",
height=150,
key="play_urls"
)
st.markdown("---")
st.markdown("### đ Step 2: Configure Settings")
col1, col2 = st.columns([2, 1])
with col1:
hf_api_key = st.text_input(
"đ HuggingFace API Key",
type="password",
placeholder="hf_...",
help="Get your key from: https://huggingface.co/settings/tokens",
key="hf_key"
)
with col2:
review_limit = st.slider(
"đ Reviews per App",
min_value=5,
max_value=100,
value=20,
step=5,
help="More reviews = longer processing time",
key="review_limit"
)
st.markdown("---")
# Submit button
col1, col2, col3 = st.columns([1, 1, 1])
with col2:
if st.button("đ Start Analysis", use_container_width=True, type="primary"):
with st.spinner("Processing..."):
success = process_reviews_streamlit(
app_store_urls,
play_store_urls,
hf_api_key,
review_limit
)
if success:
st.balloons()
st.rerun()
# Documentation
with st.expander("đ How to Use"):
st.markdown("""
### đ Quick Guide
**1. Get HuggingFace API Key:**
- Visit: https://huggingface.co/settings/tokens
- Create new token (Read access)
- Copy token (starts with `hf_`)
**2. Enter URLs:**
- **App Store**: Just the ID number (e.g., `1234567890`)
- **Play Store**: Package name (e.g., `com.company.app`)
- One per line
**3. Click Start:**
- Watch progress bar
- Wait for completion (~7 sec per review)
- View results automatically
### đī¸ What Happens:
- đˇī¸ **Stage 0**: Scrapes reviews from stores
- đ¤ **Stage 1**: Classifies with 3 AI models (Type, Department, Priority)
- đ **Stage 2**: Analyzes sentiment with dual BERT models
- đ **Stage 3**: Synthesizes insights and recommendations
- đĄ **Stage 4**: Generates batch analytics
### ⥠Performance:
- ~7 seconds per review
- 7 AI models working together
- Parallel execution for speed
""")
def show_results_dashboard():
"""Show results dashboard with charts and tables"""
results = st.session_state.results
insights = st.session_state.insights
scraped_count = st.session_state.scraped_count
# Summary section
create_summary_section(scraped_count, results, insights)
st.markdown("---")
# Tabs for different views
tab1, tab2, tab3, tab4 = st.tabs([
"đ Sentiment Analysis",
"đ¨ Critical Issues",
"đ All Reviews",
"đĨ Export"
])
# TAB 1: Sentiment Analysis
with tab1:
st.header("đ Sentiment Analysis Overview")
col1, col2 = st.columns(2)
with col1:
fig_sentiment = create_sentiment_chart(insights)
st.plotly_chart(fig_sentiment, use_container_width=True)
with col2:
fig_priority = create_priority_chart(insights)
st.plotly_chart(fig_priority, use_container_width=True)
st.markdown("### đĸ Department Routing")
fig_dept = create_department_chart(insights)
st.plotly_chart(fig_dept, use_container_width=True)
st.markdown("### đ Emotional Analysis")
fig_emotion = create_emotion_chart(insights)
st.plotly_chart(fig_emotion, use_container_width=True)
# TAB 2: Critical Issues
with tab2:
st.header("đ¨ Critical Issues Requiring Attention")
# Filter critical reviews
critical_reviews = [
r for r in results
if (r.get('priority') == 'critical' or
r.get('stage1_llm1_priority') == 'critical' or
r.get('needs_human_review', r.get('stage3_needs_human_review')) or
(r.get('final_sentiment', r.get('stage3_final_sentiment')) == 'NEGATIVE' and r.get('rating', 5) <= 2))
]
if len(critical_reviews) == 0:
st.success("â
No critical issues found! All reviews are in good shape.")
else:
st.warning(f"Found {len(critical_reviews)} critical issues")
for review in critical_reviews:
with st.expander(
f"â ī¸ {review.get('review_id', 'Unknown')[:30]} - "
f"Rating: {review.get('rating', 'N/A')}/5"
):
col1, col2 = st.columns([2, 1])
with col1:
st.markdown("**Review Text:**")
st.write(review.get('review_text', 'No text available'))
st.markdown("**Reasoning:**")
reasoning = review.get('reasoning', review.get('stage3_reasoning', 'No reasoning available'))
st.info(reasoning)
with col2:
st.markdown("**Classification:**")
st.write(f"đ Type: {review.get('classification_type', review.get('stage1_llm1_type', 'N/A'))}")
st.write(f"đĸ Department: {review.get('department', review.get('stage1_llm1_department', 'N/A'))}")
st.write(f"đ¯ Priority: {review.get('priority', review.get('stage1_llm1_priority', 'N/A'))}")
st.write(f"đ Emotion: {review.get('emotion', review.get('stage1_llm2_emotion', 'N/A'))}")
st.write(f"đ Sentiment: {review.get('final_sentiment', review.get('stage3_final_sentiment', 'N/A'))}")
st.markdown("**Action:**")
action = review.get('action_recommendation', review.get('stage3_action_recommendation', 'No action specified'))
st.error(action)
# TAB 3: All Reviews
with tab3:
st.header("đ Detailed Review Analysis")
# Create DataFrame
df = create_reviews_dataframe(results)
# Filters
col1, col2, col3 = st.columns(3)
with col1:
sentiment_filter = st.multiselect(
"Filter by Sentiment",
options=df['Sentiment'].unique(),
default=df['Sentiment'].unique()
)
with col2:
dept_filter = st.multiselect(
"Filter by Department",
options=df['Department'].unique(),
default=df['Department'].unique()
)
with col3:
priority_filter = st.multiselect(
"Filter by Priority",
options=df['Priority'].unique(),
default=df['Priority'].unique()
)
# Apply filters
filtered_df = df[
(df['Sentiment'].isin(sentiment_filter)) &
(df['Department'].isin(dept_filter)) &
(df['Priority'].isin(priority_filter))
]
st.info(f"Showing {len(filtered_df)} of {len(df)} reviews")
# Display table
st.dataframe(
filtered_df,
use_container_width=True,
height=600
)
# TAB 4: Export
with tab4:
st.header("đĨ Export Results")
st.markdown("### Download Options")
col1, col2 = st.columns(2)
with col1:
st.markdown("#### đ CSV Export")
st.write("Download complete analysis with all classifications")
df = create_reviews_dataframe(results)
csv = df.to_csv(index=False)
st.download_button(
label="đĨ Download CSV Report",
data=csv,
file_name=f"review_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime="text/csv",
use_container_width=True
)
with col2:
st.markdown("#### đ JSON Export")
st.write("Download raw data with all details")
import json
json_data = json.dumps({
'results': results,
'insights': insights,
'scraped_count': scraped_count,
'export_date': datetime.now().isoformat()
}, indent=2)
st.download_button(
label="đĨ Download JSON Data",
data=json_data,
file_name=f"review_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
mime="application/json",
use_container_width=True
)
st.markdown("---")
st.markdown("### đ Summary Statistics")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Total Reviews Analyzed", len(results))
with col2:
positive = insights.get('sentiment_distribution', {}).get('POSITIVE', 0)
total = len(results)
pct = (positive / total * 100) if total > 0 else 0
st.metric("Positive Rate", f"{pct:.1f}%")
with col3:
critical = insights.get('priority_distribution', {}).get('critical', 0)
st.metric("Critical Issues", critical)
# ============================================================================
# FOOTER
# ============================================================================
def show_footer():
"""Show footer with credits"""
st.markdown("---")
st.markdown(
"""
đ¤ Powered by Multi-Stage AI Pipeline |
Stage 1: Classification (Qwen, Mistral, Llama) |
Stage 2: Sentiment (Twitter-BERT) |
Stage 3: Finalization (Llama 70B) |
Stage 4: Batch Analysis
Built with â¤ī¸ using LangGraph + HuggingFace + Streamlit
""",
unsafe_allow_html=True
)
# ============================================================================
# RUN APP
# ============================================================================
if __name__ == "__main__":
main()
show_footer()