QuantGrantsList / app.py
rairo's picture
Update app.py
605e112 verified
import streamlit as st
import pandas as pd
import base64
import json
from scrapegraphai.graphs import SearchGraph
import nest_asyncio
import os
import subprocess
import io
import time
import urllib.parse
import asyncio
from langchain_google_genai import ChatGoogleGenerativeAI, GoogleGenerativeAIEmbeddings
from langchain.vectorstores import FAISS
from langchain.text_splitter import CharacterTextSplitter
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from google import genai
from google.genai import types
from langchain_community.document_loaders import PlaywrightURLLoader
import requests
# Import Supadata and initialize the client
from supadata import Supadata, SupadataError
# Import Crawl4AI
from crawl4ai import AsyncWebCrawler
SUPADATA_API_KEY = os.getenv("SUPADATA")
supadata = Supadata(api_key=SUPADATA_API_KEY)
# Ensure Playwright installs required browsers and dependencies
subprocess.run(["playwright", "install"])
nest_asyncio.apply()
GOOGLE_API_KEY = os.environ["GOOGLE_API_KEY"]
graph_config = {
"llm": {
"api_key": GOOGLE_API_KEY,
"model": "google_genai/gemini-2.0-flash-lite",
},
"max_results": 8,
"verbose": True,
"headless": True
}
def get_data(search_term):
"""
Run the SearchGraph for a given search term.
If a rate-limit error (202) occurs, wait 10 seconds and retry.
If no results are returned or an error persists, notify the user.
"""
full_prompt = (
f"search for {search_term} grants\n\n"
"List me all grants or funds with:\n"
"- Grant name/title\n"
"- Short summary \n"
"- Funding organization\n"
"- Grant value (numeric only)\n"
"- Application deadline\n"
"- Eligible countries\n"
"- Sector/field\n"
"- Eligibility criteria\n"
"Return in JSON format."
)
try:
search_graph = SearchGraph(
prompt=full_prompt,
config=graph_config,
)
result = search_graph.run()
if not result or not result.get("grants"):
st.error(f"No results returned for {search_term}. Please try again with a different search term.")
return {}
return result
except Exception as e:
err_str = str(e)
if "202" in err_str:
st.warning("Rate limit reached (202). Waiting 10 seconds before retrying...")
time.sleep(10)
try:
search_graph = SearchGraph(
prompt=full_prompt,
config=graph_config,
)
result = search_graph.run()
if not result or not result.get("grants"):
st.error(f"No results returned for {search_term}. Please try again with a different search term.")
return {}
return result
except Exception as e2:
st.error(f"Retry failed: {e2}. Please try again later.")
return {}
else:
st.error(f"An error occurred for search term: {search_term}, error: {e}. Please try again.")
return {}
SUPADATA_API_KEY = os.getenv("SUPADATA")
def get_data_from_url(url, scraping_tool="supadata"):
"""
Scrape the provided URL using the selected scraping tool.
Args:
url: The URL to scrape
scraping_tool: Either "supadata", "crawl4ai", or "playwright"
Returns:
Dictionary containing the extracted grant data
"""
page_content = None # Placeholder for storing scraped page content
# Choose the scraping method based on the selected tool
if scraping_tool == "crawl4ai":
try:
# Use Crawl4AI for scraping
async def run_crawler():
async with AsyncWebCrawler() as crawler:
result = await crawler.arun(url=url)
return result.markdown
# Run the async crawler in a synchronous context
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
page_content = loop.run_until_complete(run_crawler())
loop.close()
st.success("Successfully scraped using Crawl4AI")
except Exception as e:
st.error(f"Error using Crawl4AI: {e}")
# Fall back to Supadata if Crawl4AI fails
st.warning("Falling back to Supadata scraper...")
scraping_tool = "supadata"
if scraping_tool == "playwright":
try:
loader = PlaywrightURLLoader(urls=[url], remove_selectors=["header", "footer"])
data = loader.aload()
page_content = data[0].page_content if data else ""
st.success("Successfully scraped using Playwright")
except Exception as e:
st.error(f"Error using Playwright: {e}")
# Fall back to Supadata if Playwright fails
st.warning("Falling back to Supadata scraper...")
scraping_tool = "supadata"
if scraping_tool == "supadata":
# **Step 1: Attempt Supadata's Built-in Scraper**
try:
web_content = supadata.web.scrape(url)
page_content = web_content.content
st.success("Successfully scraped using Supadata built-in scraper")
except TypeError as te:
if "unexpected keyword argument 'type'" in str(te):
st.warning("Falling back to Supadata API due to unexpected keyword 'type' error.")
else:
st.error(f"Unexpected error in Supadata scrape: {te}")
# **Step 2: If Supadata's Built-in Scraper Fails, Use Supadata API**
if not page_content:
try:
api_url = "https://api.supadata.ai/v1/web/scrape"
headers = {"X-API-Key": SUPADATA_API_KEY}
response = requests.get(api_url, headers=headers, params={"url": url})
if response.status_code == 200:
page_content = response.json().get("content", "")
st.success("Successfully scraped using Supadata API")
else:
st.error(f"Supadata API failed with status {response.status_code}")
except Exception as e:
st.error(f"Error calling Supadata API: {e}")
# **Step 3: If Supadata API Fails, Use Direct Web Request**
if not page_content:
try:
r = requests.get(url, timeout=10)
if r.status_code == 200:
page_content = r.text
st.success("Successfully retrieved content with direct request")
else:
st.error(f"Manual scraping failed with status code {r.status_code}")
return {}
except Exception as e:
st.error(f"Manual scraping error: {e}")
return {}
# If we still don't have content after all attempts
if not page_content:
st.error("Failed to retrieve content from the URL with all available methods")
return {}
# **Pass Content to Gemini AI**
full_prompt = (
"Extract the following grant data from the provided web content. "
"- Grant name/title\n"
"- Short summary\n"
"- Funding organization\n"
"- Grant value (numeric only)\n"
"- Application deadline\n"
"- Eligible countries\n"
"- Sector/field\n"
"- Eligibility criteria\n"
"Return in JSON format.\n\n"
f"Web content: {page_content}"
)
client = genai.Client(api_key=GOOGLE_API_KEY)
new_answer = client.models.generate_content(
model="models/gemini-2.0-flash-lite",
contents=f"{full_prompt}, return the json string and nothing else"
)
response = new_answer.text
# **Extract JSON Output from Gemini**
try:
start_index = response.find('[')
end_index = response.rfind(']') + 1
json_string = response[start_index:end_index]
result = json.loads(json_string)
except Exception as parse_error:
st.error(f"Error parsing JSON from Gemini model response. Response: {response}")
return {}
# **Ensure JSON is Wrapped Correctly**
if isinstance(result, list):
result = {"grants": result}
if not result.get("grants"):
st.error("No grant opportunities found in the scraped URL.")
return {}
st.success(f"First grant opportunity: {result['grants'][0]}")
return result
def process_multiple_search_terms(search_terms):
"""
Process multiple search terms with progress tracking.
Returns a dictionary with a 'grants' key containing combined results.
"""
all_data = {"grants": []}
progress_bar = st.progress(0)
status_container = st.empty()
total_terms = len(search_terms)
for index, term in enumerate(search_terms):
term = term.strip()
if not term:
continue
progress = (index + 1) / total_terms
progress_bar.progress(progress)
status_container.markdown(
f"""
**Processing Grant Opportunities** πŸš€
Searching term {index+1} of {total_terms}: `{term}`
<br>
<p style='font-size: 0.9em; color: #6699CC;'>Completed: {index}/{total_terms} | Remaining: {total_terms - index - 1}</p>
""",
unsafe_allow_html=True,
)
result = get_data(term)
if result and result.get("grants"):
all_data["grants"].extend(result["grants"])
progress_bar.empty()
status_container.empty()
if not all_data["grants"]:
st.error("No grant opportunities were found. Please try again with different search terms.")
return all_data
def convert_to_csv(data):
df = pd.DataFrame(data["grants"])
return df.to_csv(index=False).encode("utf-8")
def convert_to_excel(data):
df = pd.DataFrame(data["grants"])
buffer = io.BytesIO()
with pd.ExcelWriter(buffer, engine="xlsxwriter") as writer:
df.to_excel(writer, sheet_name="Grants", index=False)
return buffer.getvalue()
def create_knowledge_base(data):
# Store JSON representation of data in session state
st.session_state.knowledge_base_json = json.dumps(data, indent=2)
def chat_with_knowledge_base(query):
if "knowledge_base_json" not in st.session_state:
return "Knowledge base not initialized. Please load grant data first."
context = st.session_state.knowledge_base_json
prompt = f"""
You are an AI assistant that helps users analyze grant opportunities.
Here is the extracted grant data in JSON format:
{context}
User's question: {query}
Answer the question based on the provided grant data.
"""
llm = ChatGoogleGenerativeAI(
model="gemini-2.0-flash-thinking-exp", google_api_key=GOOGLE_API_KEY, temperature=0
)
response = llm.invoke(prompt)
return response.content
def get_shareable_link(file_data, file_name, file_type):
b64 = base64.b64encode(file_data).decode()
return f"data:{file_type};base64,{b64}"
def main():
st.set_page_config(page_title="Quantilytix Grant Finder", page_icon="πŸ’°", layout="wide")
st.title("πŸ’° Quantilytix Grant Finder")
st.markdown("""
<div style="text-align: justify;">
<p>
Welcome to <b>Quantilytix Grant Finder</b>, an AI-powered platform designed to streamline the grant discovery process, especially for academics and researchers across the globe.
</p>
</div>
""", unsafe_allow_html=True)
# Sidebar controls
st.sidebar.image("logoqb.jpeg", use_container_width=True)
st.sidebar.header("Scrape & Configure")
if "scraped_data" not in st.session_state:
st.session_state.scraped_data = None
if "chat_history" not in st.session_state:
st.session_state.chat_history = []
if "chat_interface_active" not in st.session_state:
st.session_state.chat_interface_active = False
# Sidebar: Input Type Selection
input_type = st.sidebar.radio(
"Select Input Type:",
("Search Query", "URL"),
key="input_type_selector"
)
# Sidebar: Input field based on selection
if input_type == "Search Query":
search_input = st.sidebar.text_area(
"Enter Search Terms (one per line). Maximum 2",
height=150,
help="Input search terms to discover grant opportunities. Terms can be specific or generic.",
placeholder="e.g.,\nRenewable energy \nclimate change research\nAgriculture in Africa"
)
else:
url_input = st.sidebar.text_input(
"Enter URL to scrape for grant opportunities",
placeholder="https://example.com/grants"
)
# Scraping tool selector
scraping_tool = st.sidebar.radio(
"Select Scraping Tool:",
("Supadata", "Crawl4AI", "Playwright"),
key="scraping_tool_selector"
)
# Execute based on input type selection
if input_type == "Search Query":
if st.sidebar.button("πŸ” Get Grant Opportunities"):
if search_input:
search_terms = [term.strip() for term in search_input.split("\n") if term.strip()]
if search_terms:
with st.spinner("Searching in progress... Please wait patiently."):
result = process_multiple_search_terms(search_terms)
st.session_state.scraped_data = result
if result.get("grants"):
st.sidebar.success(f"βœ… Found {len(result['grants'])} grant opportunities from {len(search_terms)} search terms!")
else:
st.sidebar.warning("⚠️ Please enter valid search terms.")
else:
st.sidebar.warning("⚠️ Please enter at least one search term to begin.")
else: # URL input
if st.sidebar.button("πŸ” Scrape URL for Grant Opportunities"):
if url_input:
with st.spinner(f"Scraping URL using {scraping_tool}... Please wait patiently."):
result = get_data_from_url(url_input, scraping_tool.lower())
st.session_state.scraped_data = result
if result.get("grants"):
st.sidebar.success(f"βœ… Found {len(result['grants'])} grant opportunities from the URL!")
else:
st.sidebar.warning("⚠️ Please enter a valid URL to scrape.")
# Sidebar: Download & Share Controls
if st.session_state.scraped_data and st.session_state.scraped_data.get('grants'):
st.sidebar.markdown("---")
st.sidebar.subheader("Download & Share")
selected_format = st.sidebar.selectbox("Download As:", ("CSV", "Excel"), key="download_format_selector")
if selected_format == "CSV":
file_data = convert_to_csv(st.session_state.scraped_data)
file_name = "grants_data.csv"
file_type = "text/csv"
else:
file_data = convert_to_excel(st.session_state.scraped_data)
file_name = "grants_data.xlsx"
file_type = "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
download_link_html = f"<a href='data:{file_type};base64,{base64.b64encode(file_data).decode()}' download='{file_name}'><button style='background-color:#4CAF50;color:white;padding:10px 15px;border:none;border-radius:4px;'>⬇️ Download {selected_format}</button></a>"
st.sidebar.markdown(download_link_html, unsafe_allow_html=True)
shareable_link = get_shareable_link(file_data, file_name, file_type)
whatsapp_url = f"https://api.whatsapp.com/send?text={urllib.parse.quote(f'Check out these grant opportunities: {shareable_link}')}"
email_subject = urllib.parse.quote("Grant Opportunities File")
email_body = urllib.parse.quote(f"Download the grant opportunities file here: {shareable_link}")
email_url = f"mailto:?subject={email_subject}&body={email_body}"
st.sidebar.markdown("<div style='margin-top:10px;'>Share via:</div>", unsafe_allow_html=True)
st.sidebar.markdown(f"πŸ“± [WhatsApp]({whatsapp_url}) | πŸ“§ [Email]({email_url})", unsafe_allow_html=True)
# Sidebar: Load as Knowledge Base & Chat
if st.sidebar.button("🧠 Load as Knowledge Base & Chat"):
with st.spinner("Loading data into knowledge base..."):
create_knowledge_base(st.session_state.scraped_data)
st.session_state.chat_interface_active = True
st.session_state.chat_history = []
st.sidebar.success("Knowledge base loaded!")
# Main area: Data Preview
st.markdown("---")
if st.session_state.scraped_data and st.session_state.scraped_data.get('grants'):
st.header("πŸ“Š Found Grant Data")
with st.expander(f"πŸ“Š Preview Grant Data ({len(st.session_state.scraped_data['grants'])} grants)"):
st.dataframe(st.session_state.scraped_data["grants"])
# Main area: Chat UI (shown if knowledge base is loaded)
if st.session_state.get("chat_interface_active"):
st.header("πŸ’¬ Chat with Grants Bot")
query = st.text_input("Your question:", key="chat_input_main")
if query:
with st.spinner("Generating response..."):
response = chat_with_knowledge_base(query)
answer = response["answer"] if isinstance(response, dict) and "answer" in response else response
st.session_state.chat_history.append({"query": query, "response": answer})
if st.session_state.chat_history:
st.subheader("Chat History")
for chat in reversed(st.session_state.chat_history):
st.markdown(
f"<div style='padding: 10px; border-radius: 5px; margin-bottom: 5px; background-color:#444444; color: white;'><strong>You:</strong> {chat['query']}</div>",
unsafe_allow_html=True)
st.markdown(
f"<div style='padding: 10px; border-radius: 5px; margin-bottom: 10px; background-color:#007BFF; color: white;'><strong>Grants Bot:</strong> {chat['response']}</div>",
unsafe_allow_html=True)
else:
st.info("⬅️ Enter search terms or a URL in the sidebar and click the appropriate button to start searching.")
st.sidebar.markdown("---")
st.sidebar.markdown(
"""
<div style='text-align: center; font-size: 0.8em; color: grey;'>
Powered by <a href="https://quantilytix.com" style='color: grey;'>Quantilytix</a> | &copy; 2025
</div>
""",
unsafe_allow_html=True,
)
if __name__ == "__main__":
main()