Spaces:
Sleeping
Sleeping
| import json | |
| import math | |
| import os | |
| import re | |
| from typing import Dict, Iterator, List, Optional | |
| # Agno Imports | |
| from agno.agent import Agent | |
| from agno.models.llama_cpp import LlamaCpp | |
| from agno.workflow import Workflow | |
| from dotenv import load_dotenv | |
| # Rich UI Imports | |
| from rich.console import Console | |
| from rich.panel import Panel | |
| from rich.table import Table | |
| from serpapi import GoogleSearch | |
| # Load environment variables | |
| load_dotenv() | |
| console = Console() | |
| # ============================================================================== | |
| # 1. ROBUST UTILITIES | |
| # ============================================================================== | |
| def validate_api_key(): | |
| """Ensures API key exists before starting.""" | |
| if not os.getenv("SERPAPI_API_KEY"): | |
| console.print( | |
| Panel( | |
| "[bold red]CRITICAL ERROR: SERPAPI_API_KEY not found in .env file.[/bold red]", | |
| border_style="red", | |
| ) | |
| ) | |
| os._exit(1) | |
| def clean_json_output(text: str) -> Dict: | |
| """Robustly extracts JSON from LLM output.""" | |
| try: | |
| return json.loads(text) | |
| except json.JSONDecodeError: | |
| match = re.search(r"(\{.*\})", text, re.DOTALL) | |
| if match: | |
| try: | |
| return json.loads(match.group(1)) | |
| except: | |
| pass | |
| raise ValueError("Could not extract valid JSON from model output.") | |
| # ============================================================================== | |
| # 2. HELPER TOOLS | |
| # ============================================================================== | |
| def search_google_maps(query: str, location: str) -> List[Dict]: | |
| """Searches Google Maps via SerpApi.""" | |
| params = { | |
| "engine": "google_maps", | |
| "q": f"{query} in {location}", | |
| "type": "search", | |
| "api_key": os.getenv("SERPAPI_API_KEY"), | |
| "hl": "en", | |
| } | |
| try: | |
| return GoogleSearch(params).get_dict().get("local_results", []) | |
| except Exception as e: | |
| console.print(f"[red]API Error:[/red] {e}") | |
| return [] | |
| def get_place_reviews_text(data_id: str, limit: int = 3) -> str: | |
| """ | |
| Fetches text reviews. | |
| LIMIT SET TO 3 to prevent overflowing the 1.2B Model's context window. | |
| """ | |
| params = { | |
| "engine": "google_maps_reviews", | |
| "data_id": data_id, | |
| "api_key": os.getenv("SERPAPI_API_KEY"), | |
| "sort_by": "newestFirst", | |
| "hl": "en", | |
| } | |
| try: | |
| reviews = GoogleSearch(params).get_dict().get("reviews", []) | |
| if not reviews: | |
| return "No detailed reviews available." | |
| # Take only the newest 'limit' reviews to save tokens | |
| snippets = [ | |
| f"- {r.get('snippet')}" for r in reviews[:limit] if r.get("snippet") | |
| ] | |
| return "\n".join(snippets) | |
| except Exception: | |
| return "Error fetching reviews." | |
| # ============================================================================== | |
| # 3. THE WORKFLOW CLASS | |
| # ============================================================================== | |
| class CityScoutWorkflow(Workflow): | |
| current_location: str = "Lahore" | |
| # --- SCOUT (Parser) --- | |
| scout: Agent = Agent( | |
| name="Scout", | |
| model=LlamaCpp( | |
| id="lfm-1.2b", base_url="http://localhost:8080/v1", temperature=0.1 | |
| ), | |
| instructions=[ | |
| "You are a parser. Extract search parameters.", | |
| "If location is not explicitly mentioned, return 'UNKNOWN'.", | |
| 'Output Format JSON: {"query": "...", "location": "...", "category": "..."}', | |
| ], | |
| ) | |
| # --- CRITIC (Comparative Evaluator) --- | |
| critic: Agent = Agent( | |
| name="Critic", | |
| model=LlamaCpp( | |
| id="lfm-1.2b", base_url="http://localhost:8080/v1", temperature=0.1 | |
| ), | |
| instructions=[ | |
| "You are a local expert.", | |
| "You will receive a JSON list of the TOP 3 Places with their reviews.", | |
| "1. COMPARE the 3 options based on the reviews.", | |
| "2. Highlight the 'Best Overall' and 'Best Value' (if applicable).", | |
| "3. Mention any red flags (hygiene, rude staff) found in the text.", | |
| "4. Keep it concise.", | |
| ], | |
| ) | |
| def _parse_intent(self, user_input: str) -> Optional[Dict]: | |
| """Runs Scout to understand user.""" | |
| response = self.scout.run(user_input) | |
| try: | |
| intent = clean_json_output(response.content) | |
| loc = intent.get("location", "UNKNOWN") | |
| if loc and loc not in ["UNKNOWN", ""]: | |
| self.current_location = loc | |
| return { | |
| "query": intent.get("query", "places"), | |
| "location": self.current_location, | |
| "category": intent.get("category", "General"), | |
| } | |
| except Exception: | |
| return None | |
| def _rank_places(self, places: List[Dict]) -> List[Dict]: | |
| """Scores places: Stars + 2*Log(Reviews).""" | |
| scored = [] | |
| for p in places: | |
| try: | |
| stars = float(p.get("rating", 0)) | |
| reviews = int(p.get("reviews", 0)) | |
| score = stars + (math.log(reviews + 1) * 2) | |
| p["math_score"] = score | |
| scored.append(p) | |
| except: | |
| continue | |
| return sorted(scored, key=lambda x: x["math_score"], reverse=True) | |
| def process_request(self, user_input: str) -> Iterator[str]: | |
| # 1. PARSE | |
| intent = self._parse_intent(user_input) | |
| if not intent: | |
| yield "โ ๏ธ I couldn't understand the request." | |
| return | |
| query, loc, category = intent["query"], intent["location"], intent["category"] | |
| # 2. SEARCH & RANK | |
| with console.status( | |
| f"[bold green]๐ Scouting for '{query}' in '{loc}'...[/bold green]" | |
| ): | |
| raw_places = search_google_maps(query, loc) | |
| if not raw_places: | |
| yield f"Sorry, I found no results for '{query}' in '{loc}'." | |
| return | |
| # Rank and slice TOP 3 | |
| ranked_places = self._rank_places(raw_places) | |
| top_3 = ranked_places[:3] | |
| # UI Table | |
| table = Table(title=f"๐ Top 3 Candidates in {loc}") | |
| table.add_column("Name", style="cyan") | |
| table.add_column("Score", style="yellow") | |
| for p in top_3: | |
| table.add_row(p.get("title")[:30], f"{p.get('math_score', 0):.2f}") | |
| console.print(table) | |
| # 3. FETCH DEEP DATA (Loop through Top 3) | |
| comparison_data = [] | |
| # We loop through the top 3 and fetch reviews for EACH | |
| for place in top_3: | |
| p_name = place.get("title") | |
| with console.status( | |
| f"[bold magenta]๐ Reading reviews for: {p_name}...[/bold magenta]" | |
| ): | |
| # API Call happens here (3 times total) | |
| reviews_text = get_place_reviews_text(place.get("data_id"), limit=3) | |
| comparison_data.append( | |
| { | |
| "name": p_name, | |
| "rating": place.get("rating"), | |
| "total_reviews": place.get("reviews"), | |
| "recent_feedback": reviews_text, | |
| } | |
| ) | |
| # 4. CRITIQUE (Comparative) | |
| final_payload = json.dumps( | |
| {"category": category, "candidates": comparison_data} | |
| ) | |
| yield from self.critic.run(final_payload, stream=True) | |
| # ============================================================================== | |
| # 4. MAIN ENTRY | |
| # ============================================================================== | |
| if __name__ == "__main__": | |
| validate_api_key() | |
| console.clear() | |
| console.print( | |
| Panel( | |
| "[bold green]City Scout (Comparative Mode)[/bold green]", | |
| border_style="cyan", | |
| ) | |
| ) | |
| pipeline = CityScoutWorkflow() | |
| while True: | |
| try: | |
| user_input = input("\n๐ค You: ").strip() | |
| if user_input.lower() in ["exit", "quit"]: | |
| break | |
| if not user_input: | |
| continue | |
| console.print("\n[bold magenta]๐ค Scout:[/bold magenta]") | |
| stream = pipeline.process_request(user_input) | |
| for chunk in stream: | |
| if isinstance(chunk, str): | |
| console.print(chunk, end="") | |
| elif hasattr(chunk, "content"): | |
| console.print(chunk.content, end="") | |
| print() | |
| console.rule(style="dim") | |
| except KeyboardInterrupt: | |
| break | |