File size: 8,893 Bytes
f98de7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import json
import math
import os
import re
from typing import Dict, Iterator, List, Optional

# Agno Imports
from agno.agent import Agent
from agno.models.llama_cpp import LlamaCpp
from agno.workflow import Workflow
from dotenv import load_dotenv

# Rich UI Imports
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from serpapi import GoogleSearch

# Load environment variables
load_dotenv()
console = Console()

# ==============================================================================
# 1. ROBUST UTILITIES
# ==============================================================================


def validate_api_key():
    """Ensures API key exists before starting."""
    if not os.getenv("SERPAPI_API_KEY"):
        console.print(
            Panel(
                "[bold red]CRITICAL ERROR: SERPAPI_API_KEY not found in .env file.[/bold red]",
                border_style="red",
            )
        )
        os._exit(1)


def clean_json_output(text: str) -> Dict:
    """Robustly extracts JSON from LLM output."""
    try:
        return json.loads(text)
    except json.JSONDecodeError:
        match = re.search(r"(\{.*\})", text, re.DOTALL)
        if match:
            try:
                return json.loads(match.group(1))
            except:
                pass
    raise ValueError("Could not extract valid JSON from model output.")


# ==============================================================================
# 2. HELPER TOOLS
# ==============================================================================


def search_google_maps(query: str, location: str) -> List[Dict]:
    """Searches Google Maps via SerpApi."""
    params = {
        "engine": "google_maps",
        "q": f"{query} in {location}",
        "type": "search",
        "api_key": os.getenv("SERPAPI_API_KEY"),
        "hl": "en",
    }
    try:
        return GoogleSearch(params).get_dict().get("local_results", [])
    except Exception as e:
        console.print(f"[red]API Error:[/red] {e}")
        return []


def get_place_reviews_text(data_id: str, limit: int = 3) -> str:
    """

    Fetches text reviews.

    LIMIT SET TO 3 to prevent overflowing the 1.2B Model's context window.

    """
    params = {
        "engine": "google_maps_reviews",
        "data_id": data_id,
        "api_key": os.getenv("SERPAPI_API_KEY"),
        "sort_by": "newestFirst",
        "hl": "en",
    }
    try:
        reviews = GoogleSearch(params).get_dict().get("reviews", [])
        if not reviews:
            return "No detailed reviews available."

        # Take only the newest 'limit' reviews to save tokens
        snippets = [
            f"- {r.get('snippet')}" for r in reviews[:limit] if r.get("snippet")
        ]
        return "\n".join(snippets)
    except Exception:
        return "Error fetching reviews."


# ==============================================================================
# 3. THE WORKFLOW CLASS
# ==============================================================================


class CityScoutWorkflow(Workflow):
    current_location: str = "Lahore"

    # --- SCOUT (Parser) ---
    scout: Agent = Agent(
        name="Scout",
        model=LlamaCpp(
            id="lfm-1.2b", base_url="http://localhost:8080/v1", temperature=0.1
        ),
        instructions=[
            "You are a parser. Extract search parameters.",
            "If location is not explicitly mentioned, return 'UNKNOWN'.",
            'Output Format JSON: {"query": "...", "location": "...", "category": "..."}',
        ],
    )

    # --- CRITIC (Comparative Evaluator) ---
    critic: Agent = Agent(
        name="Critic",
        model=LlamaCpp(
            id="lfm-1.2b", base_url="http://localhost:8080/v1", temperature=0.1
        ),
        instructions=[
            "You are a local expert.",
            "You will receive a JSON list of the TOP 3 Places with their reviews.",
            "1. COMPARE the 3 options based on the reviews.",
            "2. Highlight the 'Best Overall' and 'Best Value' (if applicable).",
            "3. Mention any red flags (hygiene, rude staff) found in the text.",
            "4. Keep it concise.",
        ],
    )

    def _parse_intent(self, user_input: str) -> Optional[Dict]:
        """Runs Scout to understand user."""
        response = self.scout.run(user_input)
        try:
            intent = clean_json_output(response.content)
            loc = intent.get("location", "UNKNOWN")
            if loc and loc not in ["UNKNOWN", ""]:
                self.current_location = loc
            return {
                "query": intent.get("query", "places"),
                "location": self.current_location,
                "category": intent.get("category", "General"),
            }
        except Exception:
            return None

    def _rank_places(self, places: List[Dict]) -> List[Dict]:
        """Scores places: Stars + 2*Log(Reviews)."""
        scored = []
        for p in places:
            try:
                stars = float(p.get("rating", 0))
                reviews = int(p.get("reviews", 0))
                score = stars + (math.log(reviews + 1) * 2)
                p["math_score"] = score
                scored.append(p)
            except:
                continue
        return sorted(scored, key=lambda x: x["math_score"], reverse=True)

    def process_request(self, user_input: str) -> Iterator[str]:
        # 1. PARSE
        intent = self._parse_intent(user_input)
        if not intent:
            yield "โš ๏ธ I couldn't understand the request."
            return

        query, loc, category = intent["query"], intent["location"], intent["category"]

        # 2. SEARCH & RANK
        with console.status(
            f"[bold green]๐Ÿ”Ž Scouting for '{query}' in '{loc}'...[/bold green]"
        ):
            raw_places = search_google_maps(query, loc)
            if not raw_places:
                yield f"Sorry, I found no results for '{query}' in '{loc}'."
                return

            # Rank and slice TOP 3
            ranked_places = self._rank_places(raw_places)
            top_3 = ranked_places[:3]

        # UI Table
        table = Table(title=f"๐Ÿ† Top 3 Candidates in {loc}")
        table.add_column("Name", style="cyan")
        table.add_column("Score", style="yellow")
        for p in top_3:
            table.add_row(p.get("title")[:30], f"{p.get('math_score', 0):.2f}")
        console.print(table)

        # 3. FETCH DEEP DATA (Loop through Top 3)
        comparison_data = []

        # We loop through the top 3 and fetch reviews for EACH
        for place in top_3:
            p_name = place.get("title")
            with console.status(
                f"[bold magenta]๐Ÿ“ Reading reviews for: {p_name}...[/bold magenta]"
            ):
                # API Call happens here (3 times total)
                reviews_text = get_place_reviews_text(place.get("data_id"), limit=3)

                comparison_data.append(
                    {
                        "name": p_name,
                        "rating": place.get("rating"),
                        "total_reviews": place.get("reviews"),
                        "recent_feedback": reviews_text,
                    }
                )

        # 4. CRITIQUE (Comparative)
        final_payload = json.dumps(
            {"category": category, "candidates": comparison_data}
        )

        yield from self.critic.run(final_payload, stream=True)


# ==============================================================================
# 4. MAIN ENTRY
# ==============================================================================

if __name__ == "__main__":
    validate_api_key()
    console.clear()
    console.print(
        Panel(
            "[bold green]City Scout (Comparative Mode)[/bold green]",
            border_style="cyan",
        )
    )

    pipeline = CityScoutWorkflow()

    while True:
        try:
            user_input = input("\n๐Ÿ‘ค You: ").strip()
            if user_input.lower() in ["exit", "quit"]:
                break
            if not user_input:
                continue

            console.print("\n[bold magenta]๐Ÿค– Scout:[/bold magenta]")

            stream = pipeline.process_request(user_input)
            for chunk in stream:
                if isinstance(chunk, str):
                    console.print(chunk, end="")
                elif hasattr(chunk, "content"):
                    console.print(chunk.content, end="")

            print()
            console.rule(style="dim")

        except KeyboardInterrupt:
            break