Mr-Thop's picture
Update rm.py
6596042
from socket import timeout
from serpapi import GoogleSearch
import os
from firecrawl import FirecrawlApp
from flask import Flask, request, jsonify
from flask_cors import CORS
from google import genai
import json
import logging
f_app = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
app = Flask(__name__)
CORS(app)
client = genai.Client(api_key=os.getenv("GEMINI_API_KEY"))
SYSTEM_PROMPT = os.getenv("SYSTEM_PROMPT")
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(name)s - %(message)s')
def get_google_scholar_results(key_params: dict):
key_params['api_key'] = os.getenv("SERPAPI_API_KEY")
key_params['engine'] = "google_scholar"
key_params['hl'] = "en"
search = GoogleSearch(key_params)
results = search.get_dict()
if "profiles" in results and "organic_results" in results:
return results["profiles"],results["organic_results"]
elif "profiles" in results:
return results["profiles"],None
elif "organic_results" in results:
return None,results["organic_results"]
else:
return None,None
def get_results(query: str):
'''
This function is used to get the results from the Google Scholar API.
It takes a query as input and returns a list of dictionaries, each containing the information about a paper/author.
The keys of the dictionaries are the fields of the paper.
Keys of the dictionary are:
dict_keys(['position', 'title', 'result_id', 'link', 'snippet', 'publication_info', 'resources', 'inline_links'])
'''
params = {
"q": query,
}
answer = []
keys = []
profiles,result = get_google_scholar_results(params)
if result:
keys = result[0].keys()
for i in range(len(result)):
output = {}
if "title" in result[i]:
output["title"] = result[i]["title"]
if "result_id" in result[i]:
output["result_id"] = result[i]["result_id"]
if "link" in result[i]:
output["link"] = result[i]["link"]
if "https://www.annualreviews" in result[i]["link"]:
output["abstract"] = get_abstract(result[i]["link"])
if "snippet" in result[i]:
output["snippet"] = result[i]["snippet"]
if "publication_info" in result[i]:
output["publication_info"] = result[i]["publication_info"]
if "resources" in result[i]:
output["resources"] = result[i]["resources"]
answer.append(output)
return profiles,answer,keys
def get_abstract(url: str):
scrape_result = f_app.scrape(url, formats=['markdown', 'html'])
if "Abstract" in scrape_result.html:
offset = scrape_result.html.find("Abstract")
start = scrape_result.html[offset:].find("<p>")
end = scrape_result.html[offset+start:].find("</p>")
return scrape_result.html[offset+start:offset+start+end]
else:
return "Abstract not found"
def scrape_web(url:str):
'''
This function is used inorder to scrape any websitye based on its url
Returns the html code of the webpage
'''
scrape_result = f_app.scrape(url, formats=['markdown', 'html'])
return scrape_result.html
def get_response(chat_client,user):
response = chat_client.send_message(user)
return response.candidates[0].content.parts[0].text
def convert_to_json(text):
start = text.find("{")
end = text[::-1].find("}")
json_text = text[start : -end]
try:
return json.loads(json_text)
except Exception as e:
return "Json Parse Error due to " + str(e)
def get_observation(function,inp):
functions = ["get_results","scrape_web"]
if function == functions[0]:
profiles,answer,keys = get_results(inp)
out_dict = {
"state" : "OBSERVATION",
"observation" : {
"profiles" : profiles,
"answer" : answer,
"keys" : keys
}
}
elif function == functions[1]:
html_text = scrape_web(inp)
out_dict = {
"state" : "OBSERVATION",
"observation" : {
"html_text" : html_text
}
}
else:
out_dict = {
"state" : "OBSERVATION",
"observation" : {
"message":"Function Not found, Please Retry"
}
}
return out_dict
def get_output(chat_client,inp):
response = get_response(chat_client,str(inp))
output = convert_to_json(response)
while output["state"] != "OUTPUT":
if output["state"] == "PLAN":
response = get_response(chat_client,str(output))
output = convert_to_json(response)
elif output["state"] == "CALL":
function = output["function_name"]
for i in output["params"].keys():
inp = output["params"][i]
obs = get_observation(function,inp)
response = get_response(chat_client,str(obs))
output = convert_to_json(response)
elif output["state"] == "OBSERVATION":
response = get_response(chat_client,str(output))
output = convert_to_json(response)
else:
response = get_response(chat_client,str(output))
output = convert_to_json(response)
return output
def chat(query: str):
chat_client = client.chats.create(
model="gemini-2.5-flash"
)
response = get_response(chat_client,SYSTEM_PROMPT)
inp = {
"state" : "START",
"user" : query
}
output = get_output(chat_client,inp)
return output["output"]
@app.route("/",methods=["GET"])
def default():
return jsonify({"message": "Backend Working Successfully"})
@app.route("/chat",methods=["POST","GET"])
def get_chat_results():
query = request.json.get("query")
app.logger.info(f"Chat Initiated : {query}")
output = chat(query)
app.logger.info("Output Parsed")
return jsonify({"output":output})