import json import math import os import re from typing import Dict, Iterator, List, Optional # Agno Imports from agno.agent import Agent from agno.models.llama_cpp import LlamaCpp from agno.workflow import Workflow from dotenv import load_dotenv # Rich UI Imports from rich.console import Console from rich.panel import Panel from rich.table import Table from serpapi import GoogleSearch # Load environment variables load_dotenv() console = Console() # ============================================================================== # 1. ROBUST UTILITIES # ============================================================================== def validate_api_key(): """Ensures API key exists before starting.""" if not os.getenv("SERPAPI_API_KEY"): console.print( Panel( "[bold red]CRITICAL ERROR: SERPAPI_API_KEY not found in .env file.[/bold red]", border_style="red", ) ) os._exit(1) def clean_json_output(text: str) -> Dict: """Robustly extracts JSON from LLM output.""" try: return json.loads(text) except json.JSONDecodeError: match = re.search(r"(\{.*\})", text, re.DOTALL) if match: try: return json.loads(match.group(1)) except: pass raise ValueError("Could not extract valid JSON from model output.") # ============================================================================== # 2. HELPER TOOLS # ============================================================================== def search_google_maps(query: str, location: str) -> List[Dict]: """Searches Google Maps via SerpApi.""" params = { "engine": "google_maps", "q": f"{query} in {location}", "type": "search", "api_key": os.getenv("SERPAPI_API_KEY"), "hl": "en", } try: return GoogleSearch(params).get_dict().get("local_results", []) except Exception as e: console.print(f"[red]API Error:[/red] {e}") return [] def get_place_reviews_text(data_id: str, limit: int = 3) -> str: """ Fetches text reviews. LIMIT SET TO 3 to prevent overflowing the 1.2B Model's context window. """ params = { "engine": "google_maps_reviews", "data_id": data_id, "api_key": os.getenv("SERPAPI_API_KEY"), "sort_by": "newestFirst", "hl": "en", } try: reviews = GoogleSearch(params).get_dict().get("reviews", []) if not reviews: return "No detailed reviews available." # Take only the newest 'limit' reviews to save tokens snippets = [ f"- {r.get('snippet')}" for r in reviews[:limit] if r.get("snippet") ] return "\n".join(snippets) except Exception: return "Error fetching reviews." # ============================================================================== # 3. THE WORKFLOW CLASS # ============================================================================== class CityScoutWorkflow(Workflow): current_location: str = "Lahore" # --- SCOUT (Parser) --- scout: Agent = Agent( name="Scout", model=LlamaCpp( id="lfm-1.2b", base_url="http://localhost:8080/v1", temperature=0.1 ), instructions=[ "You are a parser. Extract search parameters.", "If location is not explicitly mentioned, return 'UNKNOWN'.", 'Output Format JSON: {"query": "...", "location": "...", "category": "..."}', ], ) # --- CRITIC (Comparative Evaluator) --- critic: Agent = Agent( name="Critic", model=LlamaCpp( id="lfm-1.2b", base_url="http://localhost:8080/v1", temperature=0.1 ), instructions=[ "You are a local expert.", "You will receive a JSON list of the TOP 3 Places with their reviews.", "1. COMPARE the 3 options based on the reviews.", "2. Highlight the 'Best Overall' and 'Best Value' (if applicable).", "3. Mention any red flags (hygiene, rude staff) found in the text.", "4. Keep it concise.", ], ) def _parse_intent(self, user_input: str) -> Optional[Dict]: """Runs Scout to understand user.""" response = self.scout.run(user_input) try: intent = clean_json_output(response.content) loc = intent.get("location", "UNKNOWN") if loc and loc not in ["UNKNOWN", ""]: self.current_location = loc return { "query": intent.get("query", "places"), "location": self.current_location, "category": intent.get("category", "General"), } except Exception: return None def _rank_places(self, places: List[Dict]) -> List[Dict]: """Scores places: Stars + 2*Log(Reviews).""" scored = [] for p in places: try: stars = float(p.get("rating", 0)) reviews = int(p.get("reviews", 0)) score = stars + (math.log(reviews + 1) * 2) p["math_score"] = score scored.append(p) except: continue return sorted(scored, key=lambda x: x["math_score"], reverse=True) def process_request(self, user_input: str) -> Iterator[str]: # 1. PARSE intent = self._parse_intent(user_input) if not intent: yield "āš ļø I couldn't understand the request." return query, loc, category = intent["query"], intent["location"], intent["category"] # 2. SEARCH & RANK with console.status( f"[bold green]šŸ”Ž Scouting for '{query}' in '{loc}'...[/bold green]" ): raw_places = search_google_maps(query, loc) if not raw_places: yield f"Sorry, I found no results for '{query}' in '{loc}'." return # Rank and slice TOP 3 ranked_places = self._rank_places(raw_places) top_3 = ranked_places[:3] # UI Table table = Table(title=f"šŸ† Top 3 Candidates in {loc}") table.add_column("Name", style="cyan") table.add_column("Score", style="yellow") for p in top_3: table.add_row(p.get("title")[:30], f"{p.get('math_score', 0):.2f}") console.print(table) # 3. FETCH DEEP DATA (Loop through Top 3) comparison_data = [] # We loop through the top 3 and fetch reviews for EACH for place in top_3: p_name = place.get("title") with console.status( f"[bold magenta]šŸ“ Reading reviews for: {p_name}...[/bold magenta]" ): # API Call happens here (3 times total) reviews_text = get_place_reviews_text(place.get("data_id"), limit=3) comparison_data.append( { "name": p_name, "rating": place.get("rating"), "total_reviews": place.get("reviews"), "recent_feedback": reviews_text, } ) # 4. CRITIQUE (Comparative) final_payload = json.dumps( {"category": category, "candidates": comparison_data} ) yield from self.critic.run(final_payload, stream=True) # ============================================================================== # 4. MAIN ENTRY # ============================================================================== if __name__ == "__main__": validate_api_key() console.clear() console.print( Panel( "[bold green]City Scout (Comparative Mode)[/bold green]", border_style="cyan", ) ) pipeline = CityScoutWorkflow() while True: try: user_input = input("\nšŸ‘¤ You: ").strip() if user_input.lower() in ["exit", "quit"]: break if not user_input: continue console.print("\n[bold magenta]šŸ¤– Scout:[/bold magenta]") stream = pipeline.process_request(user_input) for chunk in stream: if isinstance(chunk, str): console.print(chunk, end="") elif hasattr(chunk, "content"): console.print(chunk.content, end="") print() console.rule(style="dim") except KeyboardInterrupt: break