Spaces:
Sleeping
Sleeping
| from socket import timeout | |
| from serpapi import GoogleSearch | |
| import os | |
| from firecrawl import FirecrawlApp | |
| from flask import Flask, request, jsonify | |
| from flask_cors import CORS | |
| from google import genai | |
| import json | |
| import logging | |
| # -------- App & clients -------- | |
| f_app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY")) | |
| app = Flask(__name__) | |
| CORS(app) | |
| client = genai.Client(api_key=os.getenv("GEMINI_API_KEY")) | |
| # Safe fallback so we never pass None into send_message | |
| SYSTEM_PROMPT = os.getenv( | |
| "SYSTEM_PROMPT", | |
| "You are a helpful research assistant. Respond using a JSON state machine with states PLAN, CALL, OBSERVATION, OUTPUT." | |
| ) | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(name)s - %(message)s") | |
| log = logging.getLogger("rm.py") | |
| # -------- Scholar search (location-aware) -------- | |
| def get_google_scholar_results(key_params: dict, location: str | None = None): | |
| """ | |
| Calls SerpAPI for Google Scholar results. | |
| If `location` is provided, filter author profiles whose text contains that location. | |
| """ | |
| key_params["api_key"] = os.getenv("SERPAPI_API_KEY") | |
| key_params["engine"] = "google_scholar" | |
| key_params["hl"] = "en" | |
| search = GoogleSearch(key_params) | |
| results = search.get_dict() | |
| profiles = results.get("profiles") | |
| organic = results.get("organic_results") | |
| if profiles and location: | |
| loc = location.strip().lower() | |
| filtered = [] | |
| for p in profiles: | |
| # defensively join a few text fields and do a simple substring match | |
| haystack_parts = [ | |
| str(p.get("name", "")), | |
| str(p.get("affiliations", "")), | |
| str(p.get("description", "")), | |
| str(p.get("position", "")), | |
| str(p.get("link", "")), | |
| str(p.get("email", "")), | |
| ] | |
| haystack = " | ".join(haystack_parts).lower() | |
| if loc in haystack: | |
| filtered.append(p) | |
| profiles = filtered | |
| return profiles, organic | |
| def get_results(query): | |
| """ | |
| Location-aware Google Scholar retrieval. | |
| Accepts: | |
| - string query, OR | |
| - dict with keys: {"query" or "q", "location" (optional)} | |
| Returns: (profiles, answer, keys) | |
| - profiles: possibly filtered by location | |
| - answer: simplified list of organic results | |
| - keys: keys present in the first organic result (if any) | |
| """ | |
| if isinstance(query, dict): | |
| q = query.get("query") or query.get("q") or "" | |
| location = query.get("location") | |
| else: | |
| q = str(query) | |
| location = None | |
| q_for_scholar = f"{q} {location}".strip() if location else q | |
| params = {"q": q_for_scholar} | |
| answer = [] | |
| profiles, organic = get_google_scholar_results(params, location=location) | |
| keys = organic[0].keys() if organic and len(organic) > 0 else [] | |
| if organic: | |
| for item in organic: | |
| output = {} | |
| if "title" in item: | |
| output["title"] = item["title"] | |
| if "result_id" in item: | |
| output["result_id"] = item["result_id"] | |
| if "link" in item: | |
| output["link"] = item["link"] | |
| log.info("Result link: %s", output["link"]) | |
| if "https://www.annualreviews" in item["link"]: | |
| output["abstract"] = get_abstract(item["link"]) | |
| if "snippet" in item: | |
| output["snippet"] = item["snippet"] | |
| if "publication_info" in item: | |
| output["publication_info"] = item["publication_info"] | |
| if "resources" in item: | |
| output["resources"] = item["resources"] | |
| answer.append(output) | |
| return profiles, answer, keys | |
| # -------- Scraping / LLM helpers -------- | |
| def get_abstract(url: str): | |
| scrape_result = f_app.scrape_url(url, formats=["markdown", "html"]) | |
| if "Abstract" in scrape_result.html: | |
| offset = scrape_result.html.find("Abstract") | |
| start = scrape_result.html[offset:].find("<p>") | |
| end = scrape_result.html[offset + start:].find("</p>") | |
| return scrape_result.html[offset + start : offset + start + end] | |
| return "Abstract not found" | |
| def scrape_web(url: str): | |
| scrape_result = f_app.scrape_url(url, formats=["markdown", "html"]) | |
| return scrape_result.html | |
| def get_response(chat_client, user): | |
| # never pass None to the SDK | |
| if user is None: | |
| user = "" | |
| response = chat_client.send_message(user) | |
| return response.candidates[0].content.parts[0].text | |
| def convert_to_json(text): | |
| start = text.find("{") | |
| end = text[::-1].find("}") | |
| json_text = text[start : -end] if end != -1 else text[start:] | |
| try: | |
| return json.loads(json_text) | |
| except Exception as e: | |
| return "Json Parse Error due to " + str(e) | |
| def get_observation(function, inp): | |
| functions = ["get_results", "scrape_web"] | |
| if function == functions[0]: | |
| if isinstance(inp, dict): | |
| q = inp.get("query") or inp.get("q") or "" | |
| location = inp.get("location") | |
| profiles, answer, keys = get_results({"query": q, "location": location}) | |
| else: | |
| profiles, answer, keys = get_results(inp) | |
| out_dict = { | |
| "state": "OBSERVATION", | |
| "observation": { | |
| "profiles": profiles, | |
| "answer": answer, | |
| "keys": list(keys) if keys else [] | |
| } | |
| } | |
| elif function == functions[1]: | |
| html_text = scrape_web(inp) | |
| out_dict = { | |
| "state": "OBSERVATION", | |
| "observation": {"html_text": html_text} | |
| } | |
| else: | |
| out_dict = { | |
| "state": "OBSERVATION", | |
| "observation": {"message": "Function Not found, Please Retry"} | |
| } | |
| return out_dict | |
| def get_output(chat_client, inp): | |
| response = get_response(chat_client, str(inp)) | |
| output = convert_to_json(response) | |
| while isinstance(output, dict) and output.get("state") != "OUTPUT": | |
| if output.get("state") == "PLAN": | |
| response = get_response(chat_client, str(output)) | |
| output = convert_to_json(response) | |
| elif output.get("state") == "CALL": | |
| function = output.get("function_name") | |
| params_obj = output.get("params", {}) | |
| inp_to_fn = params_obj if isinstance(params_obj, dict) and params_obj else None | |
| if not inp_to_fn: | |
| for k in params_obj.keys(): | |
| inp_to_fn = params_obj[k] | |
| obs = get_observation(function, inp_to_fn) | |
| response = get_response(chat_client, str(obs)) | |
| output = convert_to_json(response) | |
| elif output.get("state") == "OBSERVATION": | |
| response = get_response(chat_client, str(output)) | |
| output = convert_to_json(response) | |
| else: | |
| response = get_response(chat_client, str(output)) | |
| output = convert_to_json(response) | |
| return output | |
| def chat(query: str): | |
| chat_client = client.chats.create(model="gemini-2.5-flash") | |
| _ = get_response(chat_client, SYSTEM_PROMPT) | |
| inp = {"state": "START", "user": query} | |
| output = get_output(chat_client, inp) | |
| return output["output"] | |
| # -------- Routes -------- | |
| def default(): | |
| return jsonify({"message": "Backend Working Successfully"}) | |
| def get_chat_results(): | |
| if request.method == "POST": | |
| data = request.get_json(silent=True) or {} | |
| query = data.get("query") | |
| else: # GET | |
| query = request.args.get("query") | |
| if not query: | |
| return jsonify({"error": "No query provided"}), 400 | |
| output = chat(query) | |
| return jsonify({"output": output}) | |