Spaces:
Sleeping
Sleeping
| """ | |
| PlayPulse Intelligence — Flask App (v2) | |
| ───────────────────────────────────────── | |
| Key improvements over v1 | |
| • Chat has conversation memory (per session, server-side deque) | |
| • Intent router is enum-strict + falls back properly | |
| • 6 inline chat tools (no agent needed for simple queries) | |
| • Agent is one of those tools — called only for deep analysis | |
| • /chat returns structured payload: reply + optional table / chart_data / agent_data | |
| • "tabular format" requests produce real table JSON the frontend can render | |
| """ | |
| import urllib.parse | |
| import math | |
| import re | |
| import json | |
| import requests | |
| from collections import deque, defaultdict | |
| from datetime import datetime | |
| from flask import Flask, request, render_template, jsonify, session | |
| from google_play_scraper import reviews, Sort, search, app as app_info | |
| import pandas as pd | |
| from utils.agents import run_agent, build_llm | |
| import os | |
| app = Flask(__name__) | |
| app.secret_key = os.getenv("FLASK_SECRET", "playpulse-secret-2026") | |
| # ── Per-session conversation memory (server-side, max 20 turns) ─────────── | |
| # key: session_id → deque of {"role": "user"|"assistant", "content": str} | |
| _CONV_MEMORY: dict[str, deque] = defaultdict(lambda: deque(maxlen=20)) | |
| MAX_HISTORY_FOR_LLM = 6 # last N turns sent to LLM for context | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| # SCRAPER HELPERS (unchanged from v1) | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| def extract_app_id(url_or_name: str) -> str: | |
| url_or_name = url_or_name.strip() | |
| if "play.google.com" in url_or_name: | |
| parsed = urllib.parse.urlparse(url_or_name) | |
| qp = urllib.parse.parse_qs(parsed.query) | |
| if 'id' in qp: | |
| return qp['id'][0] | |
| if "." in url_or_name and " " not in url_or_name: | |
| return url_or_name | |
| return "" | |
| def scrape_store_ids(query: str, n_hits: int = 5): | |
| try: | |
| url = f"https://play.google.com/store/search?q={urllib.parse.quote(query)}&c=apps" | |
| headers = {"User-Agent": "Mozilla/5.0"} | |
| resp = requests.get(url, headers=headers, timeout=10) | |
| if resp.status_code != 200: | |
| return [] | |
| pids = re.findall(r'details\?id=([a-zA-Z0-9._]+)', resp.text) | |
| unique: list[str] = [] | |
| for p in pids: | |
| if p not in unique and "None" not in p: | |
| unique.append(p) | |
| return unique[:n_hits] | |
| except Exception: | |
| return [] | |
| def serialize_review(r: dict) -> dict: | |
| return { | |
| "reviewId": r.get("reviewId", ""), | |
| "userName": r.get("userName", ""), | |
| "userImage": r.get("userImage", ""), | |
| "content": r.get("content", ""), | |
| "score": r.get("score", 0), | |
| "thumbsUpCount": r.get("thumbsUpCount", 0), | |
| "reviewCreatedVersion": r.get("reviewCreatedVersion", ""), | |
| "at": r["at"].isoformat() if r.get("at") else "", | |
| "replyContent": r.get("replyContent", "") or "", | |
| "repliedAt": r["repliedAt"].isoformat() if r.get("repliedAt") else "", | |
| } | |
| def fetch_app_reviews(app_id, review_count, sort_order, star_ratings_input): | |
| info = app_info(app_id, lang='en', country='us') | |
| sort_map = { | |
| 'MOST_RELEVANT': Sort.MOST_RELEVANT, | |
| 'NEWEST': Sort.NEWEST, | |
| 'RATING': Sort.RATING, | |
| } | |
| selected_sort = sort_map.get(sort_order, Sort.MOST_RELEVANT) | |
| if star_ratings_input == 'all' or not star_ratings_input: | |
| star_filters = [None] | |
| else: | |
| star_filters = sorted( | |
| {int(s) for s in star_ratings_input if str(s).isdigit() and 1 <= int(s) <= 5}, | |
| reverse=True | |
| ) | |
| per_bucket = math.ceil(_review_limit(review_count) / len(star_filters)) | |
| all_reviews: list[dict] = [] | |
| seen_ids: set[str] = set() | |
| for star in star_filters: | |
| result, _ = reviews( | |
| app_id, lang='en', country='us', | |
| sort=selected_sort, count=per_bucket, | |
| filter_score_with=star, | |
| ) | |
| for r in result: | |
| rid = r.get('reviewId', '') | |
| if rid not in seen_ids: | |
| seen_ids.add(rid) | |
| s = serialize_review(r) | |
| s['appTitle'] = info['title'] | |
| s['appId'] = app_id | |
| all_reviews.append(s) | |
| return info, all_reviews | |
| def _review_limit(count): | |
| try: | |
| return int(count) | |
| except Exception: | |
| return 150 | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| # INLINE CHAT TOOLS (fast, no heavy agent needed for simple queries) | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| def _tool_rating_breakdown(df: pd.DataFrame) -> dict: | |
| """Star rating distribution across all reviews.""" | |
| dist = df["score"].value_counts().sort_index() | |
| total = max(1, len(df)) | |
| rows = [ | |
| { | |
| "Stars": f"{'★' * int(s)} ({int(s)})", | |
| "Count": int(c), | |
| "Percentage": f"{round(c/total*100,1)}%", | |
| } | |
| for s, c in dist.items() | |
| ] | |
| return { | |
| "table": { | |
| "title": "Rating Distribution", | |
| "columns": ["Stars", "Count", "Percentage"], | |
| "rows": rows, | |
| }, | |
| "summary": f"{len(df)} reviews: avg {round(df['score'].mean(),2)}/5", | |
| } | |
| def _tool_app_comparison(df: pd.DataFrame) -> dict: | |
| """Per-app avg rating + negative % table.""" | |
| if "appId" not in df.columns and "appTitle" not in df.columns: | |
| return {"error": "No app column in data"} | |
| app_col = "appTitle" if "appTitle" in df.columns else "appId" | |
| rows = [] | |
| for app_name, grp in df.groupby(app_col): | |
| sc = pd.to_numeric(grp["score"], errors="coerce") | |
| rows.append({ | |
| "App": str(app_name), | |
| "Reviews": len(grp), | |
| "Avg Rating": f"{round(float(sc.mean()),2)} ★", | |
| "% Negative": f"{round(float((sc <= 2).mean()*100),1)}%", | |
| "% Positive": f"{round(float((sc >= 4).mean()*100),1)}%", | |
| }) | |
| rows.sort(key=lambda x: x["Avg Rating"]) | |
| return { | |
| "table": { | |
| "title": "App Comparison", | |
| "columns": ["App", "Reviews", "Avg Rating", "% Negative", "% Positive"], | |
| "rows": rows, | |
| }, | |
| "summary": f"Compared {len(rows)} apps", | |
| } | |
| def _tool_top_reviews(df: pd.DataFrame, min_stars: int = 1, | |
| max_stars: int = 2, n: int = 5, | |
| app_filter: str = "") -> dict: | |
| """Filtered review list as table.""" | |
| sc = pd.to_numeric(df["score"], errors="coerce") | |
| mask = (sc >= min_stars) & (sc <= max_stars) | |
| if app_filter: | |
| app_col = "appTitle" if "appTitle" in df.columns else "appId" | |
| mask &= df[app_col].astype(str).str.lower().str.contains( | |
| re.escape(app_filter.lower()), na=False) | |
| subset = df[mask].head(n) | |
| tc = "content" if "content" in df.columns else df.columns[0] | |
| app_col = "appTitle" if "appTitle" in df.columns else ("appId" if "appId" in df.columns else None) | |
| rows = [] | |
| for _, r in subset.iterrows(): | |
| row = { | |
| "User": str(r.get("userName", ""))[:20], | |
| "Stars": "★" * int(r.get("score", 0)), | |
| "Review": str(r.get(tc, ""))[:120], | |
| } | |
| if app_col: | |
| row["App"] = str(r.get(app_col, "")) | |
| if "thumbsUpCount" in df.columns: | |
| row["Helpful"] = int(r.get("thumbsUpCount", 0)) | |
| rows.append(row) | |
| label = f"{min_stars}–{max_stars} star" | |
| cols = list(rows[0].keys()) if rows else [] | |
| return { | |
| "table": { | |
| "title": f"Top {label} Reviews" + (f" — {app_filter}" if app_filter else ""), | |
| "columns": cols, | |
| "rows": rows, | |
| }, | |
| "summary": f"Showing {len(rows)} of {int(mask.sum())} matching reviews", | |
| } | |
| def _tool_top_helpful(df: pd.DataFrame, n: int = 5) -> dict: | |
| """Most helpful reviews.""" | |
| if "thumbsUpCount" not in df.columns: | |
| return {"error": "No helpful votes column"} | |
| df2 = df.copy() | |
| df2["__h"] = pd.to_numeric(df2["thumbsUpCount"], errors="coerce").fillna(0) | |
| subset = df2.nlargest(n, "__h") | |
| tc = "content" if "content" in df.columns else df.columns[0] | |
| app_col = "appTitle" if "appTitle" in df.columns else None | |
| rows = [] | |
| for _, r in subset.iterrows(): | |
| row = { | |
| "Stars": "★" * int(r.get("score", 0)), | |
| "Helpful": int(r.get("thumbsUpCount", 0)), | |
| "Review": str(r.get(tc, ""))[:120], | |
| } | |
| if app_col: | |
| row["App"] = str(r.get(app_col, "")) | |
| rows.append(row) | |
| return { | |
| "table": { | |
| "title": "Most Helpful Reviews", | |
| "columns": list(rows[0].keys()) if rows else [], | |
| "rows": rows, | |
| }, | |
| "summary": f"Top {len(rows)} most helpful reviews", | |
| } | |
| def _tool_keyword_search(df: pd.DataFrame, keyword: str, n: int = 8) -> dict: | |
| """Search review text for keyword.""" | |
| tc = "content" if "content" in df.columns else df.columns[0] | |
| mask = df[tc].astype(str).str.lower().str.contains( | |
| re.escape(keyword.lower()), na=False) | |
| subset = df[mask].head(n) | |
| app_col = "appTitle" if "appTitle" in df.columns else None | |
| rows = [] | |
| for _, r in subset.iterrows(): | |
| row = { | |
| "Stars": "★" * int(r.get("score", 0)), | |
| "Review": str(r.get(tc, ""))[:150], | |
| } | |
| if app_col: | |
| row["App"] = str(r.get(app_col, "")) | |
| rows.append(row) | |
| return { | |
| "table": { | |
| "title": f'Reviews mentioning "{keyword}"', | |
| "columns": list(rows[0].keys()) if rows else [], | |
| "rows": rows, | |
| }, | |
| "summary": f"Found {int(mask.sum())} reviews mentioning '{keyword}'", | |
| } | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| # INTENT CLASSIFIER (enum-strict, multi-class) | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| INTENT_SYSTEM = """You are an intent classifier for a game-review chat assistant. | |
| Classify the user message into EXACTLY ONE of these intents: | |
| TABLE — user wants data in tabular / structured / list format | |
| COMPARISON — comparing apps / games against each other | |
| KEYWORD — wants to search for a specific word/phrase in reviews | |
| HELPFUL — wants the most helpful / upvoted reviews | |
| ANALYSIS — deep insight, summary, cluster analysis, sentiment, recommendations | |
| FILTER — filtering the visible table (show only X stars, only app Y) | |
| GREETING — hi, hello, thanks, small talk | |
| GENERAL — questions about features, how to use the tool, unrelated | |
| Return ONLY one word from the list above. No explanation.""" | |
| def classify_intent(message: str, llm) -> str: | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| try: | |
| resp = llm.invoke([ | |
| SystemMessage(content=INTENT_SYSTEM), | |
| HumanMessage(content=f'Message: "{message}"'), | |
| ]) | |
| raw = getattr(resp, "content", str(resp)).strip().upper().split()[0] | |
| valid = {"TABLE","COMPARISON","KEYWORD","HELPFUL","ANALYSIS","FILTER","GREETING","GENERAL"} | |
| return raw if raw in valid else "ANALYSIS" | |
| except Exception: | |
| return "ANALYSIS" | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| # PARAMETER EXTRACTOR (LLM extracts structured params from natural language) | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| def extract_params(message: str, intent: str, llm, apps: list[str]) -> dict: | |
| """Extract structured parameters from a message given its intent.""" | |
| app_list_str = ", ".join(apps[:10]) if apps else "none" | |
| system = f"""Extract parameters from the user message for intent={intent}. | |
| Known app names in dataset: [{app_list_str}] | |
| Return ONLY valid JSON (no markdown): | |
| {{ | |
| "min_stars": 1-5 or null, | |
| "max_stars": 1-5 or null, | |
| "n": integer count or 5, | |
| "app_filter": "exact app name or title from known list, or empty string", | |
| "keyword": "search term or empty string", | |
| "metric": "avg_rating|pct_negative|pct_positive|count or empty" | |
| }}""" | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| try: | |
| resp = llm.invoke([ | |
| SystemMessage(content=system), | |
| HumanMessage(content=message), | |
| ]) | |
| raw = getattr(resp, "content", str(resp)).strip() | |
| raw = re.sub(r"^```(?:json)?", "", raw).strip().rstrip("```") | |
| return json.loads(raw) | |
| except Exception: | |
| return {"min_stars": None, "max_stars": None, "n": 5, | |
| "app_filter": "", "keyword": "", "metric": ""} | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| # RESPONSE FORMATTER (converts tool output + agent report → rich reply) | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| def _format_agent_report(report: dict) -> str: | |
| """Convert agent report dict into a well-structured markdown-like text reply.""" | |
| parts = [] | |
| if report.get("direct_answer"): | |
| parts.append(report["direct_answer"]) | |
| problems = report.get("top_problems", []) | |
| if problems: | |
| parts.append("\n**Top Issues:**") | |
| for i, p in enumerate(problems[:4], 1): | |
| sev = p.get("severity","").upper() | |
| issue = p.get("issue","") | |
| desc = p.get("description","") | |
| ev = p.get("evidence","") | |
| parts.append(f"{i}. **{issue}** [{sev}] — {desc}" + (f' _"{ev}"_' if ev else "")) | |
| strengths = report.get("key_strengths", []) | |
| if strengths: | |
| parts.append("\n**What Users Love:**") | |
| for s in strengths[:3]: | |
| parts.append(f"• **{s.get('strength','')}** — {s.get('description','')}") | |
| recs = report.get("recommendations", []) | |
| if recs: | |
| parts.append("\n**Recommendations:**") | |
| for i, r in enumerate(recs[:3], 1): | |
| parts.append(f"{i}. [{r.get('priority','').upper()}] {r.get('action','')} — {r.get('rationale','')}") | |
| return "\n".join(parts) if parts else report.get("executive_summary", "Analysis complete.") | |
| def _build_agent_table(report: dict, app_breakdown: list) -> dict | None: | |
| """If agent ran app_comparison tool, surface it as a table.""" | |
| if not app_breakdown: | |
| return None | |
| rows = [ | |
| { | |
| "App": a.get("app",""), | |
| "Reviews": a.get("count",""), | |
| "Avg Rating": f"{a.get('avg_rating','?')} ★", | |
| "% Negative": f"{a.get('pct_negative','?')}%", | |
| "% Positive": f"{a.get('pct_positive','?')}%", | |
| } | |
| for a in app_breakdown | |
| ] | |
| return { | |
| "title": "App Breakdown", | |
| "columns": ["App","Reviews","Avg Rating","% Negative","% Positive"], | |
| "rows": rows, | |
| } | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| # /chat ENDPOINT — the core of PlayPulse Intelligence | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| def chat(): | |
| try: | |
| data = request.json or {} | |
| user_message = data.get('message', '').strip() | |
| current_reviews = data.get('reviews', []) | |
| session_id = data.get('session_id') or request.remote_addr or "default" | |
| if not user_message: | |
| return jsonify({"error": "No message provided"}), 400 | |
| llm = build_llm() | |
| if not llm: | |
| return jsonify({"reply": "AI service unavailable — no API key configured.", "type": "error"}) | |
| # ── Conversation memory ──────────────────────────────────────────── | |
| memory = _CONV_MEMORY[session_id] | |
| memory.append({"role": "user", "content": user_message}) | |
| # ── Build context from reviews ───────────────────────────────────── | |
| df = pd.DataFrame(current_reviews) if current_reviews else pd.DataFrame() | |
| has_data = not df.empty | |
| # Detected app names for parameter extraction | |
| apps: list[str] = [] | |
| if has_data: | |
| for col in ["appTitle", "appId"]: | |
| if col in df.columns: | |
| apps = df[col].dropna().astype(str).unique().tolist() | |
| break | |
| # ── Classify intent ──────────────────────────────────────────────── | |
| intent = classify_intent(user_message, llm) | |
| print(f"[ChatRouter] Intent: {intent} | has_data: {has_data} | apps: {apps[:3]}") | |
| # ── Handle GREETING / GENERAL ────────────────────────────────────── | |
| if intent in ("GREETING", "GENERAL"): | |
| from langchain_core.messages import HumanMessage, SystemMessage | |
| history_msgs = [] | |
| for turn in list(memory)[-MAX_HISTORY_FOR_LLM:]: | |
| if turn["role"] == "user": | |
| history_msgs.append(HumanMessage(content=turn["content"])) | |
| else: | |
| from langchain_core.messages import AIMessage | |
| history_msgs.append(AIMessage(content=turn["content"])) | |
| sys_msg = SystemMessage(content=( | |
| "You are PlayPulse Intelligence, a friendly AI assistant for analyzing " | |
| "Google Play Store reviews. Be helpful, concise, and conversational. " | |
| "If the user greets you, greet back briefly. " | |
| "If they ask what you can do, explain you can analyze reviews, compare apps, " | |
| "find issues, show ratings, and answer questions about the scraped data." | |
| )) | |
| resp = llm.invoke([sys_msg] + history_msgs) | |
| reply = getattr(resp, "content", str(resp)).strip() | |
| memory.append({"role": "assistant", "content": reply}) | |
| return jsonify({"reply": reply, "type": "general"}) | |
| # ── No data loaded — ask user to scrape first ───────────────────── | |
| if not has_data and intent not in ("GREETING","GENERAL"): | |
| reply = ("No reviews loaded yet. Please scrape an app first using the search bar, " | |
| "then I can analyze the data for you! 🎮") | |
| memory.append({"role": "assistant", "content": reply}) | |
| return jsonify({"reply": reply, "type": "general"}) | |
| # ── FILTER intent ───────────────────────────────────────────────── | |
| if intent == "FILTER": | |
| params = extract_params(user_message, intent, llm, apps) | |
| filter_payload: dict = {} | |
| if params.get("min_stars"): | |
| stars = list(range( | |
| int(params.get("min_stars",1)), | |
| int(params.get("max_stars",params.get("min_stars",1)))+1 | |
| )) | |
| filter_payload["stars"] = stars | |
| if params.get("app_filter"): | |
| filter_payload["app"] = params["app_filter"] | |
| if params.get("keyword"): | |
| filter_payload["query"] = params["keyword"] | |
| # Also show a summary table via TABLE tool | |
| result = _tool_top_reviews( | |
| df, | |
| min_stars=int(params.get("min_stars") or 1), | |
| max_stars=int(params.get("max_stars") or 5), | |
| n=int(params.get("n") or 8), | |
| app_filter=params.get("app_filter",""), | |
| ) | |
| reply = result.get("summary","Filters applied.") | |
| table = result.get("table") | |
| memory.append({"role": "assistant", "content": reply}) | |
| return jsonify({ | |
| "reply": reply, | |
| "filters": filter_payload, | |
| "table": table, | |
| "type": "filter", | |
| }) | |
| # ── COMPARISON intent ───────────────────────────────────────────── | |
| if intent == "COMPARISON": | |
| result = _tool_app_comparison(df) | |
| if "error" in result: | |
| reply = result["error"] | |
| memory.append({"role": "assistant", "content": reply}) | |
| return jsonify({"reply": reply, "type": "general"}) | |
| # Also ask LLM to narrate | |
| narration_prompt = ( | |
| f"Here is a comparison table of apps by rating:\n" | |
| f"{json.dumps(result['table']['rows'], indent=2)}\n\n" | |
| f"User asked: '{user_message}'\n" | |
| f"Write a 2-3 sentence natural language summary highlighting " | |
| f"the worst and best performing apps." | |
| ) | |
| from langchain_core.messages import HumanMessage | |
| narr_resp = llm.invoke([HumanMessage(content=narration_prompt)]) | |
| narration = getattr(narr_resp, "content", str(narr_resp)).strip() | |
| memory.append({"role": "assistant", "content": narration}) | |
| return jsonify({ | |
| "reply": narration, | |
| "table": result["table"], | |
| "type": "comparison", | |
| }) | |
| # ── TABLE intent ────────────────────────────────────────────────── | |
| if intent == "TABLE": | |
| # Check what the PREVIOUS assistant message was about | |
| # so "get me this in tabular format" works correctly | |
| prev_context = "" | |
| history = list(memory) | |
| for turn in reversed(history[:-1]): # skip current user msg | |
| if turn["role"] == "assistant": | |
| prev_context = turn["content"] | |
| break | |
| # If previous answer was about app comparison / ratings → show comparison table | |
| comp_keywords = ["rating","low rating","negative","ranked","comparison","games"] | |
| if any(k in prev_context.lower() for k in comp_keywords) or "tabular" in user_message.lower(): | |
| result = _tool_app_comparison(df) | |
| if "table" in result: | |
| reply = f"Here's the comparison table. {result['summary']}" | |
| memory.append({"role": "assistant", "content": reply}) | |
| return jsonify({ | |
| "reply": reply, | |
| "table": result["table"], | |
| "type": "table", | |
| }) | |
| # Otherwise extract params and show filtered reviews table | |
| params = extract_params(user_message, "TABLE", llm, apps) | |
| result = _tool_top_reviews( | |
| df, | |
| min_stars=int(params.get("min_stars") or 1), | |
| max_stars=int(params.get("max_stars") or 5), | |
| n=int(params.get("n") or 10), | |
| app_filter=params.get("app_filter",""), | |
| ) | |
| reply = result.get("summary","") | |
| memory.append({"role": "assistant", "content": reply}) | |
| return jsonify({ | |
| "reply": reply, | |
| "table": result.get("table"), | |
| "type": "table", | |
| }) | |
| # ── KEYWORD intent ──────────────────────────────────────────────── | |
| if intent == "KEYWORD": | |
| params = extract_params(user_message, intent, llm, apps) | |
| kw = params.get("keyword","") | |
| if not kw: | |
| # Ask LLM to extract keyword from message | |
| from langchain_core.messages import HumanMessage | |
| kw_resp = llm.invoke([HumanMessage(content=( | |
| f'Extract the search keyword or phrase from: "{user_message}". ' | |
| f'Return ONLY the keyword, nothing else.' | |
| ))]) | |
| kw = getattr(kw_resp, "content", str(kw_resp)).strip().strip('"') | |
| result = _tool_keyword_search(df, kw, n=10) | |
| reply = result.get("summary","") | |
| memory.append({"role": "assistant", "content": reply}) | |
| return jsonify({ | |
| "reply": reply, | |
| "table": result.get("table"), | |
| "type": "keyword", | |
| }) | |
| # ── HELPFUL intent ──────────────────────────────────────────────── | |
| if intent == "HELPFUL": | |
| params = extract_params(user_message, intent, llm, apps) | |
| result = _tool_top_helpful(df, n=int(params.get("n") or 5)) | |
| if "error" in result: | |
| reply = result["error"] | |
| else: | |
| reply = result.get("summary","") | |
| memory.append({"role": "assistant", "content": reply}) | |
| return jsonify({ | |
| "reply": reply, | |
| "table": result.get("table"), | |
| "type": "helpful", | |
| }) | |
| # ── ANALYSIS intent (deep — calls LangGraph agent) ──────────────── | |
| # Also used as fallback for everything not caught above | |
| # Build conversation context string for agent | |
| history_context = "\n".join( | |
| f"{'User' if t['role']=='user' else 'Assistant'}: {t['content']}" | |
| for t in list(memory)[-MAX_HISTORY_FOR_LLM:] | |
| ) | |
| enriched_query = ( | |
| f"Conversation so far:\n{history_context}\n\n" | |
| f"User's current question: {user_message}" | |
| ) if len(memory) > 2 else user_message | |
| # Run the full LangGraph agent | |
| agent_state = run_agent(enriched_query, df=df if has_data else None) | |
| report = agent_state.get("report", {}) | |
| breakdown = agent_state.get("app_breakdown", []) | |
| # Format the reply text | |
| reply = _format_agent_report(report) | |
| if not reply.strip(): | |
| reply = report.get("executive_summary","I've completed the analysis.") | |
| # Build optional table from app breakdown | |
| table = _build_agent_table(report, breakdown) | |
| memory.append({"role": "assistant", "content": reply}) | |
| return jsonify({ | |
| "reply": reply, | |
| "table": table, | |
| "agent_data": { | |
| "top_problems": report.get("top_problems",[]), | |
| "key_strengths": report.get("key_strengths",[]), | |
| "recommendations": report.get("recommendations",[]), | |
| "clusters": agent_state.get("clusters",[]), | |
| "sentiment": agent_state.get("sentiment",{}), | |
| "stats": agent_state.get("stats",{}), | |
| }, | |
| "type": "analysis", | |
| }) | |
| except Exception as e: | |
| import traceback | |
| print(f"[Chat ERROR] {e}\n{traceback.format_exc()}") | |
| return jsonify({"error": str(e)}), 500 | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| # SCRAPE ROUTES (unchanged from v1) | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| def scrape(): | |
| try: | |
| data = request.json | |
| identifier = data.get('identifier', '').strip() | |
| count_type = data.get('review_count_type', 'fixed') | |
| count = 100000 if count_type == 'all' else data.get('review_count', 150) | |
| app_id = extract_app_id(identifier) | |
| if not app_id: | |
| results = search(identifier, lang="en", country="us", n_hits=1) | |
| if results and results[0].get('appId'): | |
| app_id = results[0]['appId'] | |
| else: | |
| pids = scrape_store_ids(identifier, n_hits=1) | |
| if pids: | |
| app_id = pids[0] | |
| else: | |
| return jsonify({"error": f"App '{identifier}' not found"}), 404 | |
| info, all_reviews = fetch_app_reviews( | |
| app_id, count, data.get('sort_order'), data.get('star_ratings')) | |
| return jsonify({ | |
| "app_info": { | |
| "title": info['title'], | |
| "icon": info['icon'], | |
| "score": info['score'], | |
| "reviews": info['reviews'], | |
| "appId": app_id, | |
| }, | |
| "reviews": all_reviews, | |
| }) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def find_apps(): | |
| try: | |
| data = request.json | |
| query = data.get('query', '').strip() | |
| app_count = int(data.get('app_count', 10)) | |
| app_ids = scrape_store_ids(query, n_hits=app_count) | |
| if not app_ids: | |
| hits = search(query, lang="en", country="us", n_hits=app_count) | |
| app_ids = [h['appId'] for h in hits if h.get('appId')] | |
| results = [] | |
| for aid in app_ids: | |
| try: | |
| info = app_info(aid, lang='en', country='us') | |
| results.append({ | |
| "appId": aid, | |
| "title": info['title'], | |
| "icon": info['icon'], | |
| "score": info['score'], | |
| "developer": info.get('developer','Unknown'), | |
| "installs": info.get('installs','0+'), | |
| }) | |
| except Exception: | |
| continue | |
| return jsonify({"results": results}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def scrape_batch(): | |
| try: | |
| data = request.json | |
| app_ids = data.get('app_ids', []) | |
| count_type = data.get('review_count_type', 'fixed') | |
| reviews_per_app = 100000 if count_type == 'all' else int(data.get('reviews_per_app', 100)) | |
| if not app_ids: | |
| return jsonify({"error": "No app IDs provided"}), 400 | |
| batch_results: list[dict] = [] | |
| all_combined: list[dict] = [] | |
| for app_id in app_ids: | |
| try: | |
| info, app_reviews = fetch_app_reviews( | |
| app_id, reviews_per_app, data.get('sort_order'), data.get('star_ratings')) | |
| batch_results.append({ | |
| "title": info['title'], | |
| "icon": info['icon'], | |
| "score": info['score'], | |
| "appId": app_id, | |
| }) | |
| all_combined.extend(app_reviews) | |
| except Exception: | |
| continue | |
| return jsonify({"apps": batch_results, "reviews": all_combined}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| def search_suggestions(): | |
| try: | |
| query = (request.json or {}).get("query","").strip() | |
| if not query or len(query) < 2: | |
| return jsonify({"results": []}) | |
| hits = search(query, lang="en", country="us", n_hits=6) | |
| results = [] | |
| for h in hits: | |
| aid = h.get("appId","") | |
| if not aid or aid == "None" or "." not in aid: | |
| continue | |
| results.append({ | |
| "appId": aid, | |
| "storeUrl": f"https://play.google.com/store/apps/details?id={aid}", | |
| "title": h.get("title",""), | |
| "icon": h.get("icon",""), | |
| "score": round(h.get("score") or 0, 1), | |
| "developer": h.get("developer",""), | |
| "installs": h.get("installs",""), | |
| }) | |
| return jsonify({"results": results[:5]}) | |
| except Exception as e: | |
| return jsonify({"error": str(e)}), 500 | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| # CLEAR CHAT MEMORY (optional endpoint for "New Chat" button) | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| def clear_chat(): | |
| session_id = (request.json or {}).get('session_id') or request.remote_addr or "default" | |
| _CONV_MEMORY[session_id].clear() | |
| return jsonify({"ok": True}) | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| # PAGE ROUTES | |
| # ═══════════════════════════════════════════════════════════════════════════ | |
| def scraper(): | |
| return render_template('index.html') | |
| def batch(): | |
| return render_template('batch.html') | |
| def landing(): | |
| return render_template('landing.html') | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", debug=True, port=7860) |