Finder / app.py
gauthamnairy's picture
Update app.py
e41038b verified
import os
import io
import base64
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
import google.generativeai as genai
from werkzeug.utils import secure_filename
import PyPDF2
import docx
import pandas as pd
import json
import numpy as np
import logging
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.chains.question_answering import load_qa_chain
from langchain.prompts import PromptTemplate
from langchain.docstore.document import Document
from sklearn.metrics.pairwise import cosine_similarity
import plotly.express as px
import plotly
from newsapi import NewsApiClient
import certifi
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urlparse, urljoin
import time
import random
from functools import wraps
from pathlib import Path
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
app = Flask(__name__)
CORS(app, resources={
r"/*": {
"origins": "*",
"methods": ["GET", "POST", "OPTIONS"],
"allow_headers": ["Content-Type"]
}
})
# Set up logging
logging.basicConfig(level=logging.INFO)
# Get the directory containing app.py
BASE_DIR = Path(__file__).resolve().parent
# Set your Google API key securely using an environment variable
google_api_key = os.getenv('GOOGLE_API_KEY')
genai.configure(api_key=google_api_key)
newsapi_key = os.getenv('NEWSAPI_KEY')
NEWSAPI_KEY = newsapi_key
session = requests.Session()
session.verify = certifi.where()
newsapi = NewsApiClient(api_key=NEWSAPI_KEY)
newsapi.session = session
# Initialize the model
model = genai.GenerativeModel('gemini-2.0-flash-exp')
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'docx', 'xlsx', 'csv'}
# In-memory storage
files_storage = {}
chunks_storage = []
# List of energy company websites to scrape
ENERGY_COMPANIES = [
# Oil and Gas Companies
"https://corporate.exxonmobil.com/",
"https://www.chevron.com/",
"https://www.bp.com/",
"https://www.shell.com/",
"https://totalenergies.com/",
"https://www.aramco.com/",
"http://www.petrochina.com.cn/ptr/",
"https://www.gazprom.com/",
"https://www.lukoil.com/",
"https://www.rosneft.com/",
# Renewable Energy Companies
"https://www.nexteraenergy.com/",
"https://www.iberdrola.com/",
"https://www.siemensgamesa.com/",
"https://orsted.com/",
"https://www.enelgreenpower.com/",
"https://www.firstsolar.com/",
"https://bep.brookfield.com/",
"https://www.canadiansolar.com/",
"https://us.sunpower.com/",
# Electricity Generation and Utility Companies
"https://www.duke-energy.com/",
"https://www.edf.fr/",
"https://www.eon.com/",
"https://www.enel.com/",
"https://www.nationalgrid.com/",
"https://www.southerncompany.com/",
"https://www.aep.com/",
"https://www.iberdrola.com/",
"https://www.engie.com/",
"https://www.xcelenergy.com/",
# Nuclear Energy Companies
"https://www.edf.fr/",
"https://www.rosatom.ru/",
"https://www.exeloncorp.com/",
"https://www.westinghousenuclear.com/",
"https://www.orano.group/en/"
]
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def convert_excel_to_csv(file_content):
logging.info("Converting Excel to CSV in memory")
excel_file = io.BytesIO(file_content)
df = pd.read_excel(excel_file)
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False)
return csv_buffer.getvalue()
def process_document(file_content, filename):
logging.info(f"Processing document: {filename}")
if filename.endswith('.pdf'):
return extract_text_from_pdf(file_content)
elif filename.endswith('.docx'):
return extract_text_from_docx(file_content)
elif filename.endswith('.xlsx'):
csv_content = convert_excel_to_csv(file_content)
return extract_text_from_csv(csv_content)
elif filename.endswith('.csv'):
return extract_text_from_csv(file_content.decode('utf-8'))
else:
return file_content.decode('utf-8')
def extract_text_from_pdf(file_content):
logging.info("Extracting text from PDF in memory")
text = ""
pdf_file = io.BytesIO(file_content)
reader = PyPDF2.PdfReader(pdf_file)
for page in reader.pages:
text += page.extract_text() + "\n"
return text
def extract_text_from_docx(file_content):
logging.info("Extracting text from DOCX in memory")
docx_file = io.BytesIO(file_content)
doc = docx.Document(docx_file)
return "\n".join([para.text for para in doc.paragraphs])
def extract_text_from_csv(file_content):
logging.info("Extracting text from CSV in memory")
df = pd.read_csv(io.StringIO(file_content))
return df.to_string()
def get_text_chunks(text):
logging.info("Chunking text for vector store...")
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = text_splitter.split_text(text)
return chunks
def create_vector_store(text_chunks, file_id):
logging.info("Creating vector store in memory...")
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
for chunk in text_chunks:
embedding = embeddings.embed_query(chunk)
chunks_storage.append({
'file_id': file_id,
'content': chunk,
'embedding': embedding
})
logging.info("Vector store created in memory.")
def get_conversational_chain():
prompt_template = """
Answer the question as detailed as possible from the provided context. If the answer is not directly
available in the provided context, use your knowledge to infer a reasonable answer based on the given information.
If you're unsure or the question is completely unrelated to the context, state that you don't have enough information to answer accurately.
Context:\n{context}\n
Question:\n{question}\n
Answer:
"""
model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.3)
prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"])
chain = load_qa_chain(model, chain_type="stuff", prompt=prompt)
return chain
def answer_query_from_document(user_question, file_id):
logging.info("Answering query from in-memory storage...")
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
query_embedding = np.array(embeddings.embed_query(user_question))
file_chunks = [chunk for chunk in chunks_storage if chunk['file_id'] == file_id]
chunk_embeddings = [np.array(chunk['embedding']) for chunk in file_chunks]
similarities = cosine_similarity([query_embedding], chunk_embeddings)[0]
# Sort chunks by similarity
sorted_chunks = sorted(zip(file_chunks, similarities), key=lambda x: x[1], reverse=True)
# Get top 5 most similar chunks
top_chunks = sorted_chunks[:5]
context = ' '.join([chunk[0]['content'] for chunk in top_chunks])
chain = get_conversational_chain()
response = chain.invoke({"input_documents": [Document(page_content=context)], "question": user_question})
return response["output_text"]
def analyze_document(text):
logging.info(f"Analyzing document with text: {text[:200]}...") # Log first 200 chars
prompt = f"Analyze the following document and provide a summary of its key points and any important insights:\n\n{text[:4000]}"
response = model.generate_content(prompt)
logging.info("Document analysis completed.")
return response.text
def process_query(query, role=None, file_id=None):
if file_id:
return answer_query_from_document(query, file_id)
else:
system_prompt = f"You are an AI assistant specializing in {role}." if role else "You are a helpful AI assistant."
prompt = f"""
{system_prompt}
Please format your response using markdown with proper structure:
- Use '##' for main headings
- Use '**' for bold text
- Use bullet points ('*') for lists
- Add proper spacing between sections
- Structure the content hierarchically
- Use proper paragraphs with line breaks
Query: "{query}"
Remember to:
- Format the response clearly and professionally
- Use headings for different sections
- Break down complex information into digestible parts
- Use bold text for emphasis on key terms
- Maintain consistent spacing
"""
try:
response = model.generate_content(prompt)
return response.text
except Exception as e:
logging.error(f"Error generating content: {str(e)}", exc_info=True)
raise e
def local_summarize(text):
"""
A simple extractive summarization function that doesn't require downloading models.
"""
# Simple extractive summarization
sentences = text.split('.')
# Take first 2-3 sentences as summary if available
summary_sentences = sentences[:min(3, len(sentences))]
summary = '. '.join(sentence.strip() for sentence in summary_sentences if sentence.strip())
return summary + ('.' if not summary.endswith('.') else '')
def scrape_company_news(url):
"""
Scrapes the top company news items from the given URL.
Uses a session with retries to mitigate timeouts or transient errors.
"""
try:
session = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retries)
session.mount('https://', adapter)
session.mount('http://', adapter)
headers = {
'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/91.0.4472.124 Safari/537.36')
}
response = session.get(url, headers=headers, timeout=10)
soup = BeautifulSoup(response.content, 'html.parser')
# Combine results from multiple selectors
articles = soup.find_all('article')
articles.extend(soup.find_all('div', class_='news-item'))
articles.extend(soup.find_all('div', class_='press-release'))
news_items = []
# Only take the first 5 items (adjust as needed)
for article in articles[:5]:
title_elem = article.find('h2') or article.find('h3') or article.find('a')
link_elem = article.find('a')
if title_elem and link_elem and link_elem.has_attr('href'):
news_items.append({
'title': title_elem.get_text(strip=True),
'url': urljoin(url, link_elem['href']),
'source': urlparse(url).netloc
})
return news_items
except Exception as e:
logging.error(f"Error scraping {url}: {str(e)}")
return []
def get_energy_news(query):
"""
Fetches the latest news articles from NewsData.io API based on the query.
"""
logging.info(f"Starting news fetch for query: {query}")
news_data_api_key = os.getenv('NEWSDATA_API_KEY')
if not news_data_api_key:
logging.error("NewsData API key not found in environment variables")
return []
endpoint = "https://newsdata.io/api/1/news"
params = {
'apikey': news_data_api_key,
'q': query,
'country': 'us',
'language': 'en',
'category': 'business'
}
logging.info(f"Making API request to: {endpoint}")
logging.info(f"With parameters: {params}")
try:
response = requests.get(endpoint, params=params, timeout=10)
logging.info(f"API Response status code: {response.status_code}")
response.raise_for_status()
data = response.json()
if data.get("status") == "success":
articles = data.get("results", [])
logging.info(f"Successfully fetched {len(articles)} articles")
return articles
else:
logging.error(f"NewsData API error response: {data}")
return []
except Exception as e:
logging.error(f"Error fetching news: {e}")
return []
def robust_analyze_news_item(item, query):
"""
Analyzes a news item using the generative model with better error handling.
"""
try:
# Extract article information
title = item.get('title', '')
content = item.get('description', '') or item.get('content', '')
source = item.get('source_id', 'Unknown Source')
url = item.get('link', '#')
# Skip if no meaningful content
if not content or len(content.strip()) < 10:
logging.warning(f"Skipping article due to insufficient content: {title}")
return None
prompt = (
"Analyze this news article:\n"
f"Title: {title}\n"
f"Content: {content}\n"
"\nProvide a brief analysis in the following format:\n"
"1. Summary (2-3 sentences)\n"
"2. Key Points (up to 3 bullet points)\n"
"3. Market Impact (1-2 sentences about potential market implications)"
)
try:
response = model.generate_content(prompt)
analysis = response.text.strip()
except Exception as e:
logging.warning(f"Generative model failed: {str(e)}")
logging.info("Falling back to local summarizer")
analysis = local_summarize(content)
return {
'title': title,
'link': url,
'source': source,
'analysis': analysis
}
except Exception as e:
logging.error(f"Error in robust_analyze_news_item: {str(e)}")
return None
def get_company_news():
with ThreadPoolExecutor(max_workers=10) as executor:
future_to_url = {executor.submit(scrape_company_news, url): url for url in ENERGY_COMPANIES}
all_company_news = []
for future in as_completed(future_to_url):
all_company_news.extend(future.result())
time.sleep(random.uniform(0.5, 1.5)) # Random delay to avoid overwhelming servers
return all_company_news
def filter_and_analyze_news(query, articles, company_news):
"""
Processes both News API results and scraped company news.
Uses robust_analyze_news_item so that any API errors are handled gracefully.
"""
all_news = articles + company_news
filtered_and_analyzed_news = []
with ThreadPoolExecutor(max_workers=20) as executor:
future_to_item = {
executor.submit(robust_analyze_news_item, item, query): item
for item in all_news
}
for future in as_completed(future_to_item):
result = future.result()
if result:
filtered_and_analyzed_news.append(result)
if len(filtered_and_analyzed_news) >= 20:
break
return filtered_and_analyzed_news
def generate_market_summary(query, filtered_news):
"""
Generates an overall market summary using the individual news analyses.
Uses the generative model but falls back to local summarization in case of errors.
"""
if not filtered_news:
return f"No relevant news found for '{query}' in the energy market context."
# Combine the analyses from each news item for context
summaries = []
for item in filtered_news:
summaries.append(f"Title: {item.get('title', 'No title')}\nAnalysis: {item.get('analysis', '')}\n")
combined_summary = "\n\n".join(summaries)
prompt = f"""
Based on the following news analyses:
{combined_summary}
Provide a comprehensive market summary that:
- Highlights current trends related to {query} in the energy market.
- Identifies key insights and potential market impacts.
- Organizes the information into clearly defined sections.
"""
try:
response = model.generate_content(prompt)
return response.text.strip()
except Exception as e:
logging.error(f"Error generating market summary using API: {e}. Falling back to local summarization.")
return local_summarize(combined_summary)
@app.route('/')
def index():
return render_template('index.html')
# Add error handling decorator
def handle_errors(f):
@wraps(f)
def wrapper(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception as e:
logging.error(f"Error in {f.__name__}: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500
return wrapper
@app.route('/query', methods=['POST'])
@handle_errors
def query():
data = request.json
if not data:
return jsonify({'error': 'No data provided'}), 400
query = data.get('query')
role = data.get('role')
file_id = data.get('file_id')
if not query:
return jsonify({'error': 'No query provided'}), 400
if not role:
return jsonify({'error': 'No role provided'}), 400
try:
response = process_query(query, role, file_id)
return jsonify({'response': response})
except Exception as e:
logging.error(f"Error processing query: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/upload', methods=['POST'])
@handle_errors
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file part'}), 400
file = request.files['file']
if file.filename == '':
return jsonify({'error': 'No selected file'}), 400
if not allowed_file(file.filename):
return jsonify({'error': 'Invalid file type'}), 400
try:
file_content = file.read()
filename = secure_filename(file.filename)
# Process the file
extracted_text = process_document(file_content, filename)
text_chunks = get_text_chunks(extracted_text)
analysis = analyze_document(extracted_text)
# Generate file ID and store
file_id = len(files_storage) + 1
files_storage[file_id] = {
'filename': filename,
'file_data': base64.b64encode(file_content).decode('utf-8'),
'analysis': analysis
}
# Create vector store
create_vector_store(text_chunks, file_id)
return jsonify({
'file_id': file_id,
'analysis': analysis,
'message': 'File processed successfully'
})
except Exception as e:
logging.error(f"Error processing file: {str(e)}", exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/plot', methods=['POST'])
def plot():
data = request.json
file_id = data.get('file_id')
try:
file_data_base64 = files_storage[file_id]['file_data']
file_data = base64.b64decode(file_data_base64)
df = pd.read_excel(io.BytesIO(file_data))
fig = px.line(df, x=df.columns[0], y=df.columns[1:])
graph_json = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)
return jsonify({'graph': graph_json})
except Exception as e:
logging.error(f'Error generating plot: {str(e)}', exc_info=True)
return jsonify({'error': str(e)}), 500
@app.route('/process_csv_query', methods=['POST'])
def process_csv_query():
data = request.json
file_id = data.get('file_id')
query = data.get('query')
try:
file_data_base64 = files_storage[file_id]['file_data']
file_data = base64.b64decode(file_data_base64)
# Create a DataFrame from the CSV data
df = pd.read_csv(io.StringIO(file_data.decode('utf-8')))
# Process the query using the DataFrame
response = process_dataframe_query(df, query)
return jsonify({'response': response})
except Exception as e:
logging.error(f'Error processing CSV query: {str(e)}', exc_info=True)
return jsonify({'error': str(e)}), 500
def process_dataframe_query(df, query):
# This function simulates the behavior of the CSV agent
# You may need to implement more sophisticated logic here
prompt = f"""
Given the following DataFrame information:
Columns: {', '.join(df.columns)}
Shape: {df.shape}
Answer the following query about the data:
{query}
Provide a concise and informative response based on the data in the DataFrame.
"""
response = model.generate_content(prompt)
return response.text
@app.route('/fetch_news', methods=['POST'])
def fetch_news():
try:
data = request.json
query = data.get('query', '')
# Fetch articles
articles = get_energy_news(query)
if not articles:
return jsonify({
'status': 'error',
'message': 'No articles found',
'articles': [],
'summary': f"No relevant news found for '{query}'"
})
# Analyze articles
analyzed_articles = []
for article in articles:
analysis = robust_analyze_news_item(article, query)
if analysis:
analyzed_articles.append(analysis)
if not analyzed_articles:
return jsonify({
'status': 'error',
'message': 'No articles could be analyzed',
'articles': [],
'summary': f"Could not analyze any articles for '{query}'"
})
# Generate market summary
summary_prompt = (
f"Based on the following analyzed news articles about '{query}':\n\n"
+ "\n\n".join([
f"Article: {a['title']}\nAnalysis: {a['analysis']}"
for a in analyzed_articles[:5]
])
+ "\n\nProvide a comprehensive market summary that:\n"
"1. Highlights the main trends and developments\n"
"2. Identifies potential market impacts\n"
"3. Suggests key takeaways for stakeholders"
)
try:
summary_response = model.generate_content(summary_prompt)
market_summary = summary_response.text.strip()
except Exception as e:
logging.error(f"Error generating market summary: {str(e)}")
market_summary = "Unable to generate market summary due to an error."
return jsonify({
'status': 'success',
'articles': analyzed_articles,
'summary': market_summary
})
except Exception as e:
logging.error(f"Error in fetch_news route: {str(e)}")
return jsonify({
'status': 'error',
'message': str(e),
'articles': [],
'summary': "An error occurred while processing the news request."
}), 500
# Add health check endpoint
@app.route('/health', methods=['GET'])
def health_check():
return jsonify({'status': 'healthy', 'api_key_configured': bool(google_api_key)})
# Ensure all required environment variables are set
def check_environment():
required_vars = ['GOOGLE_API_KEY']
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
raise EnvironmentError(f"Missing required environment variables: {', '.join(missing_vars)}")
if __name__ == '__main__':
try:
# Check environment variables
check_environment()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Initialize Google AI
if not google_api_key:
raise ValueError("GOOGLE_API_KEY not configured")
genai.configure(api_key=google_api_key)
# Start server
port = int(os.environ.get('PORT', 7860))
app.run(host='0.0.0.0', port=port, debug=True)
except Exception as e:
logging.error(f"Failed to start server: {str(e)}", exc_info=True)
raise