Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import json | |
| import os | |
| import requests | |
| from openai import OpenAI | |
| HF_TOKEN = os.environ["HF_TOKEN"] | |
| MODEL_NAME = "google/gemma-4-31B-it:novita" | |
| MEMORY_FILE = "memory.json" | |
| TASKS_FILE = "tasks.json" | |
| client = OpenAI( | |
| base_url="https://router.huggingface.co/v1", | |
| api_key=HF_TOKEN, | |
| ) | |
| # ---------------- MEMORY ---------------- | |
| def load_memory(): | |
| if os.path.exists(MEMORY_FILE): | |
| try: | |
| with open(MEMORY_FILE, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return {} | |
| return {} | |
| def save_memory(memory): | |
| with open(MEMORY_FILE, "w", encoding="utf-8") as f: | |
| json.dump(memory, f, ensure_ascii=False, indent=2) | |
| def update_memory(user_message: str): | |
| text = user_message.strip() | |
| lower = text.lower() | |
| memory = load_memory() | |
| updated = False | |
| if lower.startswith("remember my name is "): | |
| memory["name"] = text[len("remember my name is "):].strip() | |
| updated = True | |
| elif lower.startswith("remember my goal is "): | |
| memory["goal"] = text[len("remember my goal is "):].strip() | |
| updated = True | |
| elif lower.startswith("remember my focus is "): | |
| memory["focus"] = text[len("remember my focus is "):].strip() | |
| updated = True | |
| elif lower.startswith("remember i prefer "): | |
| memory["preference"] = text[len("remember i prefer "):].strip() | |
| updated = True | |
| if updated: | |
| save_memory(memory) | |
| return updated | |
| # ---------------- TASKS ---------------- | |
| def load_tasks(): | |
| if os.path.exists(TASKS_FILE): | |
| try: | |
| with open(TASKS_FILE, "r", encoding="utf-8") as f: | |
| return json.load(f) | |
| except Exception: | |
| return [] | |
| return [] | |
| def save_tasks(tasks): | |
| with open(TASKS_FILE, "w", encoding="utf-8") as f: | |
| json.dump(tasks, f, ensure_ascii=False, indent=2) | |
| def add_task(task: str): | |
| tasks = load_tasks() | |
| tasks.append(task) | |
| save_tasks(tasks) | |
| return f"Task added: {task}" | |
| def delete_task(task: str): | |
| tasks = load_tasks() | |
| if task in tasks: | |
| tasks.remove(task) | |
| save_tasks(tasks) | |
| return f"Deleted task: {task}" | |
| return "Task not found." | |
| def show_tasks(): | |
| tasks = load_tasks() | |
| if not tasks: | |
| return "You have no tasks right now." | |
| lines = [f"{i+1}. {task}" for i, task in enumerate(tasks)] | |
| return "Your tasks:\n" + "\n".join(lines) | |
| # ---------------- WEATHER ---------------- | |
| def get_weather_forecast(location: str) -> str: | |
| try: | |
| geo_url = "https://geocoding-api.open-meteo.com/v1/search" | |
| geo_res = requests.get( | |
| geo_url, | |
| params={"name": location, "count": 1}, | |
| timeout=15, | |
| ) | |
| geo_res.raise_for_status() | |
| geo_data = geo_res.json() | |
| results = geo_data.get("results") | |
| if not results: | |
| return f"Could not find location: {location}" | |
| place = results[0] | |
| name = place.get("name", location) | |
| country = place.get("country", "") | |
| latitude = place["latitude"] | |
| longitude = place["longitude"] | |
| timezone = place.get("timezone", "auto") | |
| weather_url = "https://api.open-meteo.com/v1/forecast" | |
| weather_res = requests.get( | |
| weather_url, | |
| params={ | |
| "latitude": latitude, | |
| "longitude": longitude, | |
| "current": "temperature_2m,wind_speed_10m", | |
| "daily": "temperature_2m_max,temperature_2m_min,precipitation_probability_max", | |
| "timezone": timezone, | |
| "forecast_days": 3, | |
| }, | |
| timeout=15, | |
| ) | |
| weather_res.raise_for_status() | |
| weather_data = weather_res.json() | |
| current = weather_data.get("current", {}) | |
| daily = weather_data.get("daily", {}) | |
| times = daily.get("time", []) | |
| max_temps = daily.get("temperature_2m_max", []) | |
| min_temps = daily.get("temperature_2m_min", []) | |
| rain_probs = daily.get("precipitation_probability_max", []) | |
| lines = [] | |
| place_label = f"{name}, {country}".strip(", ") | |
| lines.append(f"Weather for {place_label}") | |
| if "temperature_2m" in current: | |
| lines.append(f"Current temperature: {current['temperature_2m']}°C") | |
| if "wind_speed_10m" in current: | |
| lines.append(f"Current wind speed: {current['wind_speed_10m']} km/h") | |
| if times: | |
| lines.append("") | |
| lines.append("3-day forecast:") | |
| for i in range(min(3, len(times))): | |
| day = times[i] | |
| max_temp = max_temps[i] if i < len(max_temps) else "N/A" | |
| min_temp = min_temps[i] if i < len(min_temps) else "N/A" | |
| rain = rain_probs[i] if i < len(rain_probs) else "N/A" | |
| lines.append( | |
| f"- {day}: {min_temp}°C to {max_temp}°C, rain chance {rain}%" | |
| ) | |
| return "\n".join(lines) | |
| except requests.RequestException as e: | |
| return f"Weather error: {str(e)}" | |
| except Exception as e: | |
| return f"Weather error: {str(e)}" | |
| # ---------------- PROMPT ---------------- | |
| def build_system_prompt(): | |
| memory = load_memory() | |
| tasks = load_tasks() | |
| tasks_text = "\n".join([f"- {t}" for t in tasks]) if tasks else "No current tasks" | |
| return f""" | |
| You are Madhu's personal assistant. | |
| You are not a casual friend. You are focused, practical, warm, and organized. | |
| Known details: | |
| - Name: {memory.get("name", "Unknown")} | |
| - Goal: {memory.get("goal", "Unknown")} | |
| - Focus: {memory.get("focus", "Unknown")} | |
| - Preference: {memory.get("preference", "Unknown")} | |
| Current tasks: | |
| {tasks_text} | |
| Rules: | |
| - Use saved personal details naturally when helpful. | |
| - Use current tasks when helping with planning. | |
| - Be concise unless the user asks for more detail. | |
| - Do not invent facts. | |
| """.strip() | |
| # ---------------- HISTORY ---------------- | |
| def extract_text_content(content): | |
| if isinstance(content, str): | |
| return content | |
| if isinstance(content, list): | |
| parts = [] | |
| for block in content: | |
| if isinstance(block, dict): | |
| if block.get("type") == "text": | |
| parts.append(block.get("text", "")) | |
| elif "text" in block: | |
| parts.append(str(block.get("text", ""))) | |
| return "\n".join([p for p in parts if p]).strip() | |
| if isinstance(content, dict): | |
| if content.get("type") == "text": | |
| return content.get("text", "") | |
| if "text" in content: | |
| return str(content.get("text", "")) | |
| return str(content) | |
| def history_to_messages(history): | |
| messages = [] | |
| for item in history or []: | |
| if not isinstance(item, dict): | |
| continue | |
| role = item.get("role") | |
| content = extract_text_content(item.get("content", "")) | |
| if role in ("user", "assistant") and content: | |
| messages.append({"role": role, "content": content}) | |
| return messages | |
| # ---------------- PLANNER ---------------- | |
| def plan_day(): | |
| memory = load_memory() | |
| tasks = load_tasks() | |
| if not tasks: | |
| return "You have no tasks yet. Add tasks first using: add task ..." | |
| goal = memory.get("goal", "your current goal") | |
| focus = memory.get("focus", "your current focus") | |
| lines = [ | |
| f"Here is a simple plan for today based on your goal ({goal}) and focus ({focus}):", | |
| "" | |
| ] | |
| for i, task in enumerate(tasks[:5], start=1): | |
| lines.append(f"{i}. {task}") | |
| lines.append("") | |
| lines.append("Start with the most important or hardest task first.") | |
| return "\n".join(lines) | |
| # ---------------- MAIN CHAT ---------------- | |
| def chat_fn(message, history): | |
| if not message or not message.strip(): | |
| return "Please type a message." | |
| lower = message.strip().lower() | |
| # Memory commands | |
| if lower == "what do you remember about me": | |
| memory = load_memory() | |
| if memory: | |
| return json.dumps(memory, indent=2) | |
| return "I don't have anything saved about you yet." | |
| if lower.startswith("forget "): | |
| key = lower.replace("forget ", "", 1).strip() | |
| memory = load_memory() | |
| if key in memory: | |
| del memory[key] | |
| save_memory(memory) | |
| return f"I forgot {key}." | |
| return f"I don't have {key} saved." | |
| if update_memory(message): | |
| return "Noted. I'll remember that." | |
| # Task commands | |
| if lower.startswith("add task "): | |
| task = message[9:].strip() | |
| if not task: | |
| return "Please provide a task. Example: add task learn Mendix" | |
| return add_task(task) | |
| if lower == "show tasks": | |
| return show_tasks() | |
| if lower.startswith("delete task "): | |
| task = message[12:].strip() | |
| if not task: | |
| return "Please provide the exact task to delete." | |
| return delete_task(task) | |
| # Planner commands | |
| if lower == "plan my day" or lower == "what should i do today?": | |
| return plan_day() | |
| # Weather commands | |
| if lower.startswith("weather:"): | |
| location = message.split(":", 1)[1].strip() | |
| if not location: | |
| return "Please provide a location. Example: weather: Hyderabad" | |
| return get_weather_forecast(location) | |
| if "weather in " in lower: | |
| location = lower.split("weather in ", 1)[1].strip(" ?.") | |
| if location: | |
| return get_weather_forecast(location.title()) | |
| # Normal model chat | |
| messages = [{"role": "system", "content": build_system_prompt()}] | |
| messages.extend(history_to_messages(history)) | |
| messages.append({"role": "user", "content": message}) | |
| try: | |
| response = client.chat.completions.create( | |
| model=MODEL_NAME, | |
| messages=messages, | |
| ) | |
| return response.choices[0].message.content or "I could not generate a reply." | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| demo = gr.ChatInterface( | |
| fn=chat_fn, | |
| title="Madhu Personal Assistant", | |
| description="Try: remember my name is Madhu, add task learn Mendix, show tasks, plan my day, what is the weather in Hyderabad?", | |
| ) | |
| demo.launch() |