Play-Scrapper / app.py
WebashalarForML's picture
Upload 6 files
fee4aa3 verified
"""
PlayPulse Intelligence — Flask App (v2)
─────────────────────────────────────────
Key improvements over v1
• Chat has conversation memory (per session, server-side deque)
• Intent router is enum-strict + falls back properly
• 6 inline chat tools (no agent needed for simple queries)
• Agent is one of those tools — called only for deep analysis
• /chat returns structured payload: reply + optional table / chart_data / agent_data
• "tabular format" requests produce real table JSON the frontend can render
"""
import urllib.parse
import math
import re
import json
import requests
from collections import deque, defaultdict
from datetime import datetime
from flask import Flask, request, render_template, jsonify, session
from google_play_scraper import reviews, Sort, search, app as app_info
import pandas as pd
from utils.agents import run_agent, build_llm
import os
app = Flask(__name__)
app.secret_key = os.getenv("FLASK_SECRET", "playpulse-secret-2026")
# ── Per-session conversation memory (server-side, max 20 turns) ───────────
# key: session_id → deque of {"role": "user"|"assistant", "content": str}
_CONV_MEMORY: dict[str, deque] = defaultdict(lambda: deque(maxlen=20))
MAX_HISTORY_FOR_LLM = 6 # last N turns sent to LLM for context
# ═══════════════════════════════════════════════════════════════════════════
# SCRAPER HELPERS (unchanged from v1)
# ═══════════════════════════════════════════════════════════════════════════
def extract_app_id(url_or_name: str) -> str:
url_or_name = url_or_name.strip()
if "play.google.com" in url_or_name:
parsed = urllib.parse.urlparse(url_or_name)
qp = urllib.parse.parse_qs(parsed.query)
if 'id' in qp:
return qp['id'][0]
if "." in url_or_name and " " not in url_or_name:
return url_or_name
return ""
def scrape_store_ids(query: str, n_hits: int = 5):
try:
url = f"https://play.google.com/store/search?q={urllib.parse.quote(query)}&c=apps"
headers = {"User-Agent": "Mozilla/5.0"}
resp = requests.get(url, headers=headers, timeout=10)
if resp.status_code != 200:
return []
pids = re.findall(r'details\?id=([a-zA-Z0-9._]+)', resp.text)
unique: list[str] = []
for p in pids:
if p not in unique and "None" not in p:
unique.append(p)
return unique[:n_hits]
except Exception:
return []
def serialize_review(r: dict) -> dict:
return {
"reviewId": r.get("reviewId", ""),
"userName": r.get("userName", ""),
"userImage": r.get("userImage", ""),
"content": r.get("content", ""),
"score": r.get("score", 0),
"thumbsUpCount": r.get("thumbsUpCount", 0),
"reviewCreatedVersion": r.get("reviewCreatedVersion", ""),
"at": r["at"].isoformat() if r.get("at") else "",
"replyContent": r.get("replyContent", "") or "",
"repliedAt": r["repliedAt"].isoformat() if r.get("repliedAt") else "",
}
def fetch_app_reviews(app_id, review_count, sort_order, star_ratings_input):
info = app_info(app_id, lang='en', country='us')
sort_map = {
'MOST_RELEVANT': Sort.MOST_RELEVANT,
'NEWEST': Sort.NEWEST,
'RATING': Sort.RATING,
}
selected_sort = sort_map.get(sort_order, Sort.MOST_RELEVANT)
if star_ratings_input == 'all' or not star_ratings_input:
star_filters = [None]
else:
star_filters = sorted(
{int(s) for s in star_ratings_input if str(s).isdigit() and 1 <= int(s) <= 5},
reverse=True
)
per_bucket = math.ceil(_review_limit(review_count) / len(star_filters))
all_reviews: list[dict] = []
seen_ids: set[str] = set()
for star in star_filters:
result, _ = reviews(
app_id, lang='en', country='us',
sort=selected_sort, count=per_bucket,
filter_score_with=star,
)
for r in result:
rid = r.get('reviewId', '')
if rid not in seen_ids:
seen_ids.add(rid)
s = serialize_review(r)
s['appTitle'] = info['title']
s['appId'] = app_id
all_reviews.append(s)
return info, all_reviews
def _review_limit(count):
try:
return int(count)
except Exception:
return 150
# ═══════════════════════════════════════════════════════════════════════════
# INLINE CHAT TOOLS (fast, no heavy agent needed for simple queries)
# ═══════════════════════════════════════════════════════════════════════════
def _tool_rating_breakdown(df: pd.DataFrame) -> dict:
"""Star rating distribution across all reviews."""
dist = df["score"].value_counts().sort_index()
total = max(1, len(df))
rows = [
{
"Stars": f"{'★' * int(s)} ({int(s)})",
"Count": int(c),
"Percentage": f"{round(c/total*100,1)}%",
}
for s, c in dist.items()
]
return {
"table": {
"title": "Rating Distribution",
"columns": ["Stars", "Count", "Percentage"],
"rows": rows,
},
"summary": f"{len(df)} reviews: avg {round(df['score'].mean(),2)}/5",
}
def _tool_app_comparison(df: pd.DataFrame) -> dict:
"""Per-app avg rating + negative % table."""
if "appId" not in df.columns and "appTitle" not in df.columns:
return {"error": "No app column in data"}
app_col = "appTitle" if "appTitle" in df.columns else "appId"
rows = []
for app_name, grp in df.groupby(app_col):
sc = pd.to_numeric(grp["score"], errors="coerce")
rows.append({
"App": str(app_name),
"Reviews": len(grp),
"Avg Rating": f"{round(float(sc.mean()),2)} ★",
"% Negative": f"{round(float((sc <= 2).mean()*100),1)}%",
"% Positive": f"{round(float((sc >= 4).mean()*100),1)}%",
})
rows.sort(key=lambda x: x["Avg Rating"])
return {
"table": {
"title": "App Comparison",
"columns": ["App", "Reviews", "Avg Rating", "% Negative", "% Positive"],
"rows": rows,
},
"summary": f"Compared {len(rows)} apps",
}
def _tool_top_reviews(df: pd.DataFrame, min_stars: int = 1,
max_stars: int = 2, n: int = 5,
app_filter: str = "") -> dict:
"""Filtered review list as table."""
sc = pd.to_numeric(df["score"], errors="coerce")
mask = (sc >= min_stars) & (sc <= max_stars)
if app_filter:
app_col = "appTitle" if "appTitle" in df.columns else "appId"
mask &= df[app_col].astype(str).str.lower().str.contains(
re.escape(app_filter.lower()), na=False)
subset = df[mask].head(n)
tc = "content" if "content" in df.columns else df.columns[0]
app_col = "appTitle" if "appTitle" in df.columns else ("appId" if "appId" in df.columns else None)
rows = []
for _, r in subset.iterrows():
row = {
"User": str(r.get("userName", ""))[:20],
"Stars": "★" * int(r.get("score", 0)),
"Review": str(r.get(tc, ""))[:120],
}
if app_col:
row["App"] = str(r.get(app_col, ""))
if "thumbsUpCount" in df.columns:
row["Helpful"] = int(r.get("thumbsUpCount", 0))
rows.append(row)
label = f"{min_stars}{max_stars} star"
cols = list(rows[0].keys()) if rows else []
return {
"table": {
"title": f"Top {label} Reviews" + (f" — {app_filter}" if app_filter else ""),
"columns": cols,
"rows": rows,
},
"summary": f"Showing {len(rows)} of {int(mask.sum())} matching reviews",
}
def _tool_top_helpful(df: pd.DataFrame, n: int = 5) -> dict:
"""Most helpful reviews."""
if "thumbsUpCount" not in df.columns:
return {"error": "No helpful votes column"}
df2 = df.copy()
df2["__h"] = pd.to_numeric(df2["thumbsUpCount"], errors="coerce").fillna(0)
subset = df2.nlargest(n, "__h")
tc = "content" if "content" in df.columns else df.columns[0]
app_col = "appTitle" if "appTitle" in df.columns else None
rows = []
for _, r in subset.iterrows():
row = {
"Stars": "★" * int(r.get("score", 0)),
"Helpful": int(r.get("thumbsUpCount", 0)),
"Review": str(r.get(tc, ""))[:120],
}
if app_col:
row["App"] = str(r.get(app_col, ""))
rows.append(row)
return {
"table": {
"title": "Most Helpful Reviews",
"columns": list(rows[0].keys()) if rows else [],
"rows": rows,
},
"summary": f"Top {len(rows)} most helpful reviews",
}
def _tool_keyword_search(df: pd.DataFrame, keyword: str, n: int = 8) -> dict:
"""Search review text for keyword."""
tc = "content" if "content" in df.columns else df.columns[0]
mask = df[tc].astype(str).str.lower().str.contains(
re.escape(keyword.lower()), na=False)
subset = df[mask].head(n)
app_col = "appTitle" if "appTitle" in df.columns else None
rows = []
for _, r in subset.iterrows():
row = {
"Stars": "★" * int(r.get("score", 0)),
"Review": str(r.get(tc, ""))[:150],
}
if app_col:
row["App"] = str(r.get(app_col, ""))
rows.append(row)
return {
"table": {
"title": f'Reviews mentioning "{keyword}"',
"columns": list(rows[0].keys()) if rows else [],
"rows": rows,
},
"summary": f"Found {int(mask.sum())} reviews mentioning '{keyword}'",
}
# ═══════════════════════════════════════════════════════════════════════════
# INTENT CLASSIFIER (enum-strict, multi-class)
# ═══════════════════════════════════════════════════════════════════════════
INTENT_SYSTEM = """You are an intent classifier for a game-review chat assistant.
Classify the user message into EXACTLY ONE of these intents:
TABLE — user wants data in tabular / structured / list format
COMPARISON — comparing apps / games against each other
KEYWORD — wants to search for a specific word/phrase in reviews
HELPFUL — wants the most helpful / upvoted reviews
ANALYSIS — deep insight, summary, cluster analysis, sentiment, recommendations
FILTER — filtering the visible table (show only X stars, only app Y)
GREETING — hi, hello, thanks, small talk
GENERAL — questions about features, how to use the tool, unrelated
Return ONLY one word from the list above. No explanation."""
def classify_intent(message: str, llm) -> str:
from langchain_core.messages import HumanMessage, SystemMessage
try:
resp = llm.invoke([
SystemMessage(content=INTENT_SYSTEM),
HumanMessage(content=f'Message: "{message}"'),
])
raw = getattr(resp, "content", str(resp)).strip().upper().split()[0]
valid = {"TABLE","COMPARISON","KEYWORD","HELPFUL","ANALYSIS","FILTER","GREETING","GENERAL"}
return raw if raw in valid else "ANALYSIS"
except Exception:
return "ANALYSIS"
# ═══════════════════════════════════════════════════════════════════════════
# PARAMETER EXTRACTOR (LLM extracts structured params from natural language)
# ═══════════════════════════════════════════════════════════════════════════
def extract_params(message: str, intent: str, llm, apps: list[str]) -> dict:
"""Extract structured parameters from a message given its intent."""
app_list_str = ", ".join(apps[:10]) if apps else "none"
system = f"""Extract parameters from the user message for intent={intent}.
Known app names in dataset: [{app_list_str}]
Return ONLY valid JSON (no markdown):
{{
"min_stars": 1-5 or null,
"max_stars": 1-5 or null,
"n": integer count or 5,
"app_filter": "exact app name or title from known list, or empty string",
"keyword": "search term or empty string",
"metric": "avg_rating|pct_negative|pct_positive|count or empty"
}}"""
from langchain_core.messages import HumanMessage, SystemMessage
try:
resp = llm.invoke([
SystemMessage(content=system),
HumanMessage(content=message),
])
raw = getattr(resp, "content", str(resp)).strip()
raw = re.sub(r"^```(?:json)?", "", raw).strip().rstrip("```")
return json.loads(raw)
except Exception:
return {"min_stars": None, "max_stars": None, "n": 5,
"app_filter": "", "keyword": "", "metric": ""}
# ═══════════════════════════════════════════════════════════════════════════
# RESPONSE FORMATTER (converts tool output + agent report → rich reply)
# ═══════════════════════════════════════════════════════════════════════════
def _format_agent_report(report: dict) -> str:
"""Convert agent report dict into a well-structured markdown-like text reply."""
parts = []
if report.get("direct_answer"):
parts.append(report["direct_answer"])
problems = report.get("top_problems", [])
if problems:
parts.append("\n**Top Issues:**")
for i, p in enumerate(problems[:4], 1):
sev = p.get("severity","").upper()
issue = p.get("issue","")
desc = p.get("description","")
ev = p.get("evidence","")
parts.append(f"{i}. **{issue}** [{sev}] — {desc}" + (f' _"{ev}"_' if ev else ""))
strengths = report.get("key_strengths", [])
if strengths:
parts.append("\n**What Users Love:**")
for s in strengths[:3]:
parts.append(f"• **{s.get('strength','')}** — {s.get('description','')}")
recs = report.get("recommendations", [])
if recs:
parts.append("\n**Recommendations:**")
for i, r in enumerate(recs[:3], 1):
parts.append(f"{i}. [{r.get('priority','').upper()}] {r.get('action','')}{r.get('rationale','')}")
return "\n".join(parts) if parts else report.get("executive_summary", "Analysis complete.")
def _build_agent_table(report: dict, app_breakdown: list) -> dict | None:
"""If agent ran app_comparison tool, surface it as a table."""
if not app_breakdown:
return None
rows = [
{
"App": a.get("app",""),
"Reviews": a.get("count",""),
"Avg Rating": f"{a.get('avg_rating','?')} ★",
"% Negative": f"{a.get('pct_negative','?')}%",
"% Positive": f"{a.get('pct_positive','?')}%",
}
for a in app_breakdown
]
return {
"title": "App Breakdown",
"columns": ["App","Reviews","Avg Rating","% Negative","% Positive"],
"rows": rows,
}
# ═══════════════════════════════════════════════════════════════════════════
# /chat ENDPOINT — the core of PlayPulse Intelligence
# ═══════════════════════════════════════════════════════════════════════════
@app.route('/chat', methods=['POST'])
def chat():
try:
data = request.json or {}
user_message = data.get('message', '').strip()
current_reviews = data.get('reviews', [])
session_id = data.get('session_id') or request.remote_addr or "default"
if not user_message:
return jsonify({"error": "No message provided"}), 400
llm = build_llm()
if not llm:
return jsonify({"reply": "AI service unavailable — no API key configured.", "type": "error"})
# ── Conversation memory ────────────────────────────────────────────
memory = _CONV_MEMORY[session_id]
memory.append({"role": "user", "content": user_message})
# ── Build context from reviews ─────────────────────────────────────
df = pd.DataFrame(current_reviews) if current_reviews else pd.DataFrame()
has_data = not df.empty
# Detected app names for parameter extraction
apps: list[str] = []
if has_data:
for col in ["appTitle", "appId"]:
if col in df.columns:
apps = df[col].dropna().astype(str).unique().tolist()
break
# ── Classify intent ────────────────────────────────────────────────
intent = classify_intent(user_message, llm)
print(f"[ChatRouter] Intent: {intent} | has_data: {has_data} | apps: {apps[:3]}")
# ── Handle GREETING / GENERAL ──────────────────────────────────────
if intent in ("GREETING", "GENERAL"):
from langchain_core.messages import HumanMessage, SystemMessage
history_msgs = []
for turn in list(memory)[-MAX_HISTORY_FOR_LLM:]:
if turn["role"] == "user":
history_msgs.append(HumanMessage(content=turn["content"]))
else:
from langchain_core.messages import AIMessage
history_msgs.append(AIMessage(content=turn["content"]))
sys_msg = SystemMessage(content=(
"You are PlayPulse Intelligence, a friendly AI assistant for analyzing "
"Google Play Store reviews. Be helpful, concise, and conversational. "
"If the user greets you, greet back briefly. "
"If they ask what you can do, explain you can analyze reviews, compare apps, "
"find issues, show ratings, and answer questions about the scraped data."
))
resp = llm.invoke([sys_msg] + history_msgs)
reply = getattr(resp, "content", str(resp)).strip()
memory.append({"role": "assistant", "content": reply})
return jsonify({"reply": reply, "type": "general"})
# ── No data loaded — ask user to scrape first ─────────────────────
if not has_data and intent not in ("GREETING","GENERAL"):
reply = ("No reviews loaded yet. Please scrape an app first using the search bar, "
"then I can analyze the data for you! 🎮")
memory.append({"role": "assistant", "content": reply})
return jsonify({"reply": reply, "type": "general"})
# ── FILTER intent ─────────────────────────────────────────────────
if intent == "FILTER":
params = extract_params(user_message, intent, llm, apps)
filter_payload: dict = {}
if params.get("min_stars"):
stars = list(range(
int(params.get("min_stars",1)),
int(params.get("max_stars",params.get("min_stars",1)))+1
))
filter_payload["stars"] = stars
if params.get("app_filter"):
filter_payload["app"] = params["app_filter"]
if params.get("keyword"):
filter_payload["query"] = params["keyword"]
# Also show a summary table via TABLE tool
result = _tool_top_reviews(
df,
min_stars=int(params.get("min_stars") or 1),
max_stars=int(params.get("max_stars") or 5),
n=int(params.get("n") or 8),
app_filter=params.get("app_filter",""),
)
reply = result.get("summary","Filters applied.")
table = result.get("table")
memory.append({"role": "assistant", "content": reply})
return jsonify({
"reply": reply,
"filters": filter_payload,
"table": table,
"type": "filter",
})
# ── COMPARISON intent ─────────────────────────────────────────────
if intent == "COMPARISON":
result = _tool_app_comparison(df)
if "error" in result:
reply = result["error"]
memory.append({"role": "assistant", "content": reply})
return jsonify({"reply": reply, "type": "general"})
# Also ask LLM to narrate
narration_prompt = (
f"Here is a comparison table of apps by rating:\n"
f"{json.dumps(result['table']['rows'], indent=2)}\n\n"
f"User asked: '{user_message}'\n"
f"Write a 2-3 sentence natural language summary highlighting "
f"the worst and best performing apps."
)
from langchain_core.messages import HumanMessage
narr_resp = llm.invoke([HumanMessage(content=narration_prompt)])
narration = getattr(narr_resp, "content", str(narr_resp)).strip()
memory.append({"role": "assistant", "content": narration})
return jsonify({
"reply": narration,
"table": result["table"],
"type": "comparison",
})
# ── TABLE intent ──────────────────────────────────────────────────
if intent == "TABLE":
# Check what the PREVIOUS assistant message was about
# so "get me this in tabular format" works correctly
prev_context = ""
history = list(memory)
for turn in reversed(history[:-1]): # skip current user msg
if turn["role"] == "assistant":
prev_context = turn["content"]
break
# If previous answer was about app comparison / ratings → show comparison table
comp_keywords = ["rating","low rating","negative","ranked","comparison","games"]
if any(k in prev_context.lower() for k in comp_keywords) or "tabular" in user_message.lower():
result = _tool_app_comparison(df)
if "table" in result:
reply = f"Here's the comparison table. {result['summary']}"
memory.append({"role": "assistant", "content": reply})
return jsonify({
"reply": reply,
"table": result["table"],
"type": "table",
})
# Otherwise extract params and show filtered reviews table
params = extract_params(user_message, "TABLE", llm, apps)
result = _tool_top_reviews(
df,
min_stars=int(params.get("min_stars") or 1),
max_stars=int(params.get("max_stars") or 5),
n=int(params.get("n") or 10),
app_filter=params.get("app_filter",""),
)
reply = result.get("summary","")
memory.append({"role": "assistant", "content": reply})
return jsonify({
"reply": reply,
"table": result.get("table"),
"type": "table",
})
# ── KEYWORD intent ────────────────────────────────────────────────
if intent == "KEYWORD":
params = extract_params(user_message, intent, llm, apps)
kw = params.get("keyword","")
if not kw:
# Ask LLM to extract keyword from message
from langchain_core.messages import HumanMessage
kw_resp = llm.invoke([HumanMessage(content=(
f'Extract the search keyword or phrase from: "{user_message}". '
f'Return ONLY the keyword, nothing else.'
))])
kw = getattr(kw_resp, "content", str(kw_resp)).strip().strip('"')
result = _tool_keyword_search(df, kw, n=10)
reply = result.get("summary","")
memory.append({"role": "assistant", "content": reply})
return jsonify({
"reply": reply,
"table": result.get("table"),
"type": "keyword",
})
# ── HELPFUL intent ────────────────────────────────────────────────
if intent == "HELPFUL":
params = extract_params(user_message, intent, llm, apps)
result = _tool_top_helpful(df, n=int(params.get("n") or 5))
if "error" in result:
reply = result["error"]
else:
reply = result.get("summary","")
memory.append({"role": "assistant", "content": reply})
return jsonify({
"reply": reply,
"table": result.get("table"),
"type": "helpful",
})
# ── ANALYSIS intent (deep — calls LangGraph agent) ────────────────
# Also used as fallback for everything not caught above
# Build conversation context string for agent
history_context = "\n".join(
f"{'User' if t['role']=='user' else 'Assistant'}: {t['content']}"
for t in list(memory)[-MAX_HISTORY_FOR_LLM:]
)
enriched_query = (
f"Conversation so far:\n{history_context}\n\n"
f"User's current question: {user_message}"
) if len(memory) > 2 else user_message
# Run the full LangGraph agent
agent_state = run_agent(enriched_query, df=df if has_data else None)
report = agent_state.get("report", {})
breakdown = agent_state.get("app_breakdown", [])
# Format the reply text
reply = _format_agent_report(report)
if not reply.strip():
reply = report.get("executive_summary","I've completed the analysis.")
# Build optional table from app breakdown
table = _build_agent_table(report, breakdown)
memory.append({"role": "assistant", "content": reply})
return jsonify({
"reply": reply,
"table": table,
"agent_data": {
"top_problems": report.get("top_problems",[]),
"key_strengths": report.get("key_strengths",[]),
"recommendations": report.get("recommendations",[]),
"clusters": agent_state.get("clusters",[]),
"sentiment": agent_state.get("sentiment",{}),
"stats": agent_state.get("stats",{}),
},
"type": "analysis",
})
except Exception as e:
import traceback
print(f"[Chat ERROR] {e}\n{traceback.format_exc()}")
return jsonify({"error": str(e)}), 500
# ═══════════════════════════════════════════════════════════════════════════
# SCRAPE ROUTES (unchanged from v1)
# ═══════════════════════════════════════════════════════════════════════════
@app.route('/scrape', methods=['POST'])
def scrape():
try:
data = request.json
identifier = data.get('identifier', '').strip()
count_type = data.get('review_count_type', 'fixed')
count = 100000 if count_type == 'all' else data.get('review_count', 150)
app_id = extract_app_id(identifier)
if not app_id:
results = search(identifier, lang="en", country="us", n_hits=1)
if results and results[0].get('appId'):
app_id = results[0]['appId']
else:
pids = scrape_store_ids(identifier, n_hits=1)
if pids:
app_id = pids[0]
else:
return jsonify({"error": f"App '{identifier}' not found"}), 404
info, all_reviews = fetch_app_reviews(
app_id, count, data.get('sort_order'), data.get('star_ratings'))
return jsonify({
"app_info": {
"title": info['title'],
"icon": info['icon'],
"score": info['score'],
"reviews": info['reviews'],
"appId": app_id,
},
"reviews": all_reviews,
})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/find-apps', methods=['POST'])
def find_apps():
try:
data = request.json
query = data.get('query', '').strip()
app_count = int(data.get('app_count', 10))
app_ids = scrape_store_ids(query, n_hits=app_count)
if not app_ids:
hits = search(query, lang="en", country="us", n_hits=app_count)
app_ids = [h['appId'] for h in hits if h.get('appId')]
results = []
for aid in app_ids:
try:
info = app_info(aid, lang='en', country='us')
results.append({
"appId": aid,
"title": info['title'],
"icon": info['icon'],
"score": info['score'],
"developer": info.get('developer','Unknown'),
"installs": info.get('installs','0+'),
})
except Exception:
continue
return jsonify({"results": results})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route('/scrape-batch', methods=['POST'])
def scrape_batch():
try:
data = request.json
app_ids = data.get('app_ids', [])
count_type = data.get('review_count_type', 'fixed')
reviews_per_app = 100000 if count_type == 'all' else int(data.get('reviews_per_app', 100))
if not app_ids:
return jsonify({"error": "No app IDs provided"}), 400
batch_results: list[dict] = []
all_combined: list[dict] = []
for app_id in app_ids:
try:
info, app_reviews = fetch_app_reviews(
app_id, reviews_per_app, data.get('sort_order'), data.get('star_ratings'))
batch_results.append({
"title": info['title'],
"icon": info['icon'],
"score": info['score'],
"appId": app_id,
})
all_combined.extend(app_reviews)
except Exception:
continue
return jsonify({"apps": batch_results, "reviews": all_combined})
except Exception as e:
return jsonify({"error": str(e)}), 500
@app.route("/search-suggestions", methods=["POST"])
def search_suggestions():
try:
query = (request.json or {}).get("query","").strip()
if not query or len(query) < 2:
return jsonify({"results": []})
hits = search(query, lang="en", country="us", n_hits=6)
results = []
for h in hits:
aid = h.get("appId","")
if not aid or aid == "None" or "." not in aid:
continue
results.append({
"appId": aid,
"storeUrl": f"https://play.google.com/store/apps/details?id={aid}",
"title": h.get("title",""),
"icon": h.get("icon",""),
"score": round(h.get("score") or 0, 1),
"developer": h.get("developer",""),
"installs": h.get("installs",""),
})
return jsonify({"results": results[:5]})
except Exception as e:
return jsonify({"error": str(e)}), 500
# ═══════════════════════════════════════════════════════════════════════════
# CLEAR CHAT MEMORY (optional endpoint for "New Chat" button)
# ═══════════════════════════════════════════════════════════════════════════
@app.route('/chat/clear', methods=['POST'])
def clear_chat():
session_id = (request.json or {}).get('session_id') or request.remote_addr or "default"
_CONV_MEMORY[session_id].clear()
return jsonify({"ok": True})
# ═══════════════════════════════════════════════════════════════════════════
# PAGE ROUTES
# ═══════════════════════════════════════════════════════════════════════════
@app.route('/scraper')
def scraper():
return render_template('index.html')
@app.route('/batch')
def batch():
return render_template('batch.html')
@app.route('/')
def landing():
return render_template('landing.html')
if __name__ == "__main__":
app.run(host="0.0.0.0", debug=True, port=7860)