import os import io import base64 from flask import Flask, request, jsonify, render_template from flask_cors import CORS import google.generativeai as genai from werkzeug.utils import secure_filename import PyPDF2 import docx import pandas as pd import json import numpy as np import logging from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_google_genai import GoogleGenerativeAIEmbeddings from langchain_google_genai import ChatGoogleGenerativeAI from langchain.chains.question_answering import load_qa_chain from langchain.prompts import PromptTemplate from langchain.docstore.document import Document from sklearn.metrics.pairwise import cosine_similarity import plotly.express as px import plotly from newsapi import NewsApiClient import certifi import requests from bs4 import BeautifulSoup from concurrent.futures import ThreadPoolExecutor, as_completed from urllib.parse import urlparse, urljoin import time import random from functools import wraps from pathlib import Path from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry app = Flask(__name__) CORS(app, resources={ r"/*": { "origins": "*", "methods": ["GET", "POST", "OPTIONS"], "allow_headers": ["Content-Type"] } }) # Set up logging logging.basicConfig(level=logging.INFO) # Get the directory containing app.py BASE_DIR = Path(__file__).resolve().parent # Set your Google API key securely using an environment variable google_api_key = os.getenv('GOOGLE_API_KEY') genai.configure(api_key=google_api_key) newsapi_key = os.getenv('NEWSAPI_KEY') NEWSAPI_KEY = newsapi_key session = requests.Session() session.verify = certifi.where() newsapi = NewsApiClient(api_key=NEWSAPI_KEY) newsapi.session = session # Initialize the model model = genai.GenerativeModel('gemini-2.0-flash-exp') ALLOWED_EXTENSIONS = {'txt', 'pdf', 'docx', 'xlsx', 'csv'} # In-memory storage files_storage = {} chunks_storage = [] # List of energy company websites to scrape ENERGY_COMPANIES = [ # Oil and Gas Companies "https://corporate.exxonmobil.com/", "https://www.chevron.com/", "https://www.bp.com/", "https://www.shell.com/", "https://totalenergies.com/", "https://www.aramco.com/", "http://www.petrochina.com.cn/ptr/", "https://www.gazprom.com/", "https://www.lukoil.com/", "https://www.rosneft.com/", # Renewable Energy Companies "https://www.nexteraenergy.com/", "https://www.iberdrola.com/", "https://www.siemensgamesa.com/", "https://orsted.com/", "https://www.enelgreenpower.com/", "https://www.firstsolar.com/", "https://bep.brookfield.com/", "https://www.canadiansolar.com/", "https://us.sunpower.com/", # Electricity Generation and Utility Companies "https://www.duke-energy.com/", "https://www.edf.fr/", "https://www.eon.com/", "https://www.enel.com/", "https://www.nationalgrid.com/", "https://www.southerncompany.com/", "https://www.aep.com/", "https://www.iberdrola.com/", "https://www.engie.com/", "https://www.xcelenergy.com/", # Nuclear Energy Companies "https://www.edf.fr/", "https://www.rosatom.ru/", "https://www.exeloncorp.com/", "https://www.westinghousenuclear.com/", "https://www.orano.group/en/" ] def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def convert_excel_to_csv(file_content): logging.info("Converting Excel to CSV in memory") excel_file = io.BytesIO(file_content) df = pd.read_excel(excel_file) csv_buffer = io.StringIO() df.to_csv(csv_buffer, index=False) return csv_buffer.getvalue() def process_document(file_content, filename): logging.info(f"Processing document: {filename}") if filename.endswith('.pdf'): return extract_text_from_pdf(file_content) elif filename.endswith('.docx'): return extract_text_from_docx(file_content) elif filename.endswith('.xlsx'): csv_content = convert_excel_to_csv(file_content) return extract_text_from_csv(csv_content) elif filename.endswith('.csv'): return extract_text_from_csv(file_content.decode('utf-8')) else: return file_content.decode('utf-8') def extract_text_from_pdf(file_content): logging.info("Extracting text from PDF in memory") text = "" pdf_file = io.BytesIO(file_content) reader = PyPDF2.PdfReader(pdf_file) for page in reader.pages: text += page.extract_text() + "\n" return text def extract_text_from_docx(file_content): logging.info("Extracting text from DOCX in memory") docx_file = io.BytesIO(file_content) doc = docx.Document(docx_file) return "\n".join([para.text for para in doc.paragraphs]) def extract_text_from_csv(file_content): logging.info("Extracting text from CSV in memory") df = pd.read_csv(io.StringIO(file_content)) return df.to_string() def get_text_chunks(text): logging.info("Chunking text for vector store...") text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) chunks = text_splitter.split_text(text) return chunks def create_vector_store(text_chunks, file_id): logging.info("Creating vector store in memory...") embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") for chunk in text_chunks: embedding = embeddings.embed_query(chunk) chunks_storage.append({ 'file_id': file_id, 'content': chunk, 'embedding': embedding }) logging.info("Vector store created in memory.") def get_conversational_chain(): prompt_template = """ Answer the question as detailed as possible from the provided context. If the answer is not directly available in the provided context, use your knowledge to infer a reasonable answer based on the given information. If you're unsure or the question is completely unrelated to the context, state that you don't have enough information to answer accurately. Context:\n{context}\n Question:\n{question}\n Answer: """ model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.3) prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) chain = load_qa_chain(model, chain_type="stuff", prompt=prompt) return chain def answer_query_from_document(user_question, file_id): logging.info("Answering query from in-memory storage...") embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") query_embedding = np.array(embeddings.embed_query(user_question)) file_chunks = [chunk for chunk in chunks_storage if chunk['file_id'] == file_id] chunk_embeddings = [np.array(chunk['embedding']) for chunk in file_chunks] similarities = cosine_similarity([query_embedding], chunk_embeddings)[0] # Sort chunks by similarity sorted_chunks = sorted(zip(file_chunks, similarities), key=lambda x: x[1], reverse=True) # Get top 5 most similar chunks top_chunks = sorted_chunks[:5] context = ' '.join([chunk[0]['content'] for chunk in top_chunks]) chain = get_conversational_chain() response = chain.invoke({"input_documents": [Document(page_content=context)], "question": user_question}) return response["output_text"] def analyze_document(text): logging.info(f"Analyzing document with text: {text[:200]}...") # Log first 200 chars prompt = f"Analyze the following document and provide a summary of its key points and any important insights:\n\n{text[:4000]}" response = model.generate_content(prompt) logging.info("Document analysis completed.") return response.text def process_query(query, role=None, file_id=None): if file_id: return answer_query_from_document(query, file_id) else: system_prompt = f"You are an AI assistant specializing in {role}." if role else "You are a helpful AI assistant." prompt = f""" {system_prompt} Please format your response using markdown with proper structure: - Use '##' for main headings - Use '**' for bold text - Use bullet points ('*') for lists - Add proper spacing between sections - Structure the content hierarchically - Use proper paragraphs with line breaks Query: "{query}" Remember to: - Format the response clearly and professionally - Use headings for different sections - Break down complex information into digestible parts - Use bold text for emphasis on key terms - Maintain consistent spacing """ try: response = model.generate_content(prompt) return response.text except Exception as e: logging.error(f"Error generating content: {str(e)}", exc_info=True) raise e def local_summarize(text): """ A simple extractive summarization function that doesn't require downloading models. """ # Simple extractive summarization sentences = text.split('.') # Take first 2-3 sentences as summary if available summary_sentences = sentences[:min(3, len(sentences))] summary = '. '.join(sentence.strip() for sentence in summary_sentences if sentence.strip()) return summary + ('.' if not summary.endswith('.') else '') def scrape_company_news(url): """ Scrapes the top company news items from the given URL. Uses a session with retries to mitigate timeouts or transient errors. """ try: session = requests.Session() retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) adapter = HTTPAdapter(max_retries=retries) session.mount('https://', adapter) session.mount('http://', adapter) headers = { 'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' 'AppleWebKit/537.36 (KHTML, like Gecko) ' 'Chrome/91.0.4472.124 Safari/537.36') } response = session.get(url, headers=headers, timeout=10) soup = BeautifulSoup(response.content, 'html.parser') # Combine results from multiple selectors articles = soup.find_all('article') articles.extend(soup.find_all('div', class_='news-item')) articles.extend(soup.find_all('div', class_='press-release')) news_items = [] # Only take the first 5 items (adjust as needed) for article in articles[:5]: title_elem = article.find('h2') or article.find('h3') or article.find('a') link_elem = article.find('a') if title_elem and link_elem and link_elem.has_attr('href'): news_items.append({ 'title': title_elem.get_text(strip=True), 'url': urljoin(url, link_elem['href']), 'source': urlparse(url).netloc }) return news_items except Exception as e: logging.error(f"Error scraping {url}: {str(e)}") return [] def get_energy_news(query): """ Fetches the latest news articles from NewsData.io API based on the query. """ logging.info(f"Starting news fetch for query: {query}") news_data_api_key = os.getenv('NEWSDATA_API_KEY') if not news_data_api_key: logging.error("NewsData API key not found in environment variables") return [] endpoint = "https://newsdata.io/api/1/news" params = { 'apikey': news_data_api_key, 'q': query, 'country': 'us', 'language': 'en', 'category': 'business' } logging.info(f"Making API request to: {endpoint}") logging.info(f"With parameters: {params}") try: response = requests.get(endpoint, params=params, timeout=10) logging.info(f"API Response status code: {response.status_code}") response.raise_for_status() data = response.json() if data.get("status") == "success": articles = data.get("results", []) logging.info(f"Successfully fetched {len(articles)} articles") return articles else: logging.error(f"NewsData API error response: {data}") return [] except Exception as e: logging.error(f"Error fetching news: {e}") return [] def robust_analyze_news_item(item, query): """ Analyzes a news item using the generative model with better error handling. """ try: # Extract article information title = item.get('title', '') content = item.get('description', '') or item.get('content', '') source = item.get('source_id', 'Unknown Source') url = item.get('link', '#') # Skip if no meaningful content if not content or len(content.strip()) < 10: logging.warning(f"Skipping article due to insufficient content: {title}") return None prompt = ( "Analyze this news article:\n" f"Title: {title}\n" f"Content: {content}\n" "\nProvide a brief analysis in the following format:\n" "1. Summary (2-3 sentences)\n" "2. Key Points (up to 3 bullet points)\n" "3. Market Impact (1-2 sentences about potential market implications)" ) try: response = model.generate_content(prompt) analysis = response.text.strip() except Exception as e: logging.warning(f"Generative model failed: {str(e)}") logging.info("Falling back to local summarizer") analysis = local_summarize(content) return { 'title': title, 'link': url, 'source': source, 'analysis': analysis } except Exception as e: logging.error(f"Error in robust_analyze_news_item: {str(e)}") return None def get_company_news(): with ThreadPoolExecutor(max_workers=10) as executor: future_to_url = {executor.submit(scrape_company_news, url): url for url in ENERGY_COMPANIES} all_company_news = [] for future in as_completed(future_to_url): all_company_news.extend(future.result()) time.sleep(random.uniform(0.5, 1.5)) # Random delay to avoid overwhelming servers return all_company_news def filter_and_analyze_news(query, articles, company_news): """ Processes both News API results and scraped company news. Uses robust_analyze_news_item so that any API errors are handled gracefully. """ all_news = articles + company_news filtered_and_analyzed_news = [] with ThreadPoolExecutor(max_workers=20) as executor: future_to_item = { executor.submit(robust_analyze_news_item, item, query): item for item in all_news } for future in as_completed(future_to_item): result = future.result() if result: filtered_and_analyzed_news.append(result) if len(filtered_and_analyzed_news) >= 20: break return filtered_and_analyzed_news def generate_market_summary(query, filtered_news): """ Generates an overall market summary using the individual news analyses. Uses the generative model but falls back to local summarization in case of errors. """ if not filtered_news: return f"No relevant news found for '{query}' in the energy market context." # Combine the analyses from each news item for context summaries = [] for item in filtered_news: summaries.append(f"Title: {item.get('title', 'No title')}\nAnalysis: {item.get('analysis', '')}\n") combined_summary = "\n\n".join(summaries) prompt = f""" Based on the following news analyses: {combined_summary} Provide a comprehensive market summary that: - Highlights current trends related to {query} in the energy market. - Identifies key insights and potential market impacts. - Organizes the information into clearly defined sections. """ try: response = model.generate_content(prompt) return response.text.strip() except Exception as e: logging.error(f"Error generating market summary using API: {e}. Falling back to local summarization.") return local_summarize(combined_summary) @app.route('/') def index(): return render_template('index.html') # Add error handling decorator def handle_errors(f): @wraps(f) def wrapper(*args, **kwargs): try: return f(*args, **kwargs) except Exception as e: logging.error(f"Error in {f.__name__}: {str(e)}", exc_info=True) return jsonify({'error': str(e)}), 500 return wrapper @app.route('/query', methods=['POST']) @handle_errors def query(): data = request.json if not data: return jsonify({'error': 'No data provided'}), 400 query = data.get('query') role = data.get('role') file_id = data.get('file_id') if not query: return jsonify({'error': 'No query provided'}), 400 if not role: return jsonify({'error': 'No role provided'}), 400 try: response = process_query(query, role, file_id) return jsonify({'response': response}) except Exception as e: logging.error(f"Error processing query: {str(e)}", exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/upload', methods=['POST']) @handle_errors def upload_file(): if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No selected file'}), 400 if not allowed_file(file.filename): return jsonify({'error': 'Invalid file type'}), 400 try: file_content = file.read() filename = secure_filename(file.filename) # Process the file extracted_text = process_document(file_content, filename) text_chunks = get_text_chunks(extracted_text) analysis = analyze_document(extracted_text) # Generate file ID and store file_id = len(files_storage) + 1 files_storage[file_id] = { 'filename': filename, 'file_data': base64.b64encode(file_content).decode('utf-8'), 'analysis': analysis } # Create vector store create_vector_store(text_chunks, file_id) return jsonify({ 'file_id': file_id, 'analysis': analysis, 'message': 'File processed successfully' }) except Exception as e: logging.error(f"Error processing file: {str(e)}", exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/plot', methods=['POST']) def plot(): data = request.json file_id = data.get('file_id') try: file_data_base64 = files_storage[file_id]['file_data'] file_data = base64.b64decode(file_data_base64) df = pd.read_excel(io.BytesIO(file_data)) fig = px.line(df, x=df.columns[0], y=df.columns[1:]) graph_json = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) return jsonify({'graph': graph_json}) except Exception as e: logging.error(f'Error generating plot: {str(e)}', exc_info=True) return jsonify({'error': str(e)}), 500 @app.route('/process_csv_query', methods=['POST']) def process_csv_query(): data = request.json file_id = data.get('file_id') query = data.get('query') try: file_data_base64 = files_storage[file_id]['file_data'] file_data = base64.b64decode(file_data_base64) # Create a DataFrame from the CSV data df = pd.read_csv(io.StringIO(file_data.decode('utf-8'))) # Process the query using the DataFrame response = process_dataframe_query(df, query) return jsonify({'response': response}) except Exception as e: logging.error(f'Error processing CSV query: {str(e)}', exc_info=True) return jsonify({'error': str(e)}), 500 def process_dataframe_query(df, query): # This function simulates the behavior of the CSV agent # You may need to implement more sophisticated logic here prompt = f""" Given the following DataFrame information: Columns: {', '.join(df.columns)} Shape: {df.shape} Answer the following query about the data: {query} Provide a concise and informative response based on the data in the DataFrame. """ response = model.generate_content(prompt) return response.text @app.route('/fetch_news', methods=['POST']) def fetch_news(): try: data = request.json query = data.get('query', '') # Fetch articles articles = get_energy_news(query) if not articles: return jsonify({ 'status': 'error', 'message': 'No articles found', 'articles': [], 'summary': f"No relevant news found for '{query}'" }) # Analyze articles analyzed_articles = [] for article in articles: analysis = robust_analyze_news_item(article, query) if analysis: analyzed_articles.append(analysis) if not analyzed_articles: return jsonify({ 'status': 'error', 'message': 'No articles could be analyzed', 'articles': [], 'summary': f"Could not analyze any articles for '{query}'" }) # Generate market summary summary_prompt = ( f"Based on the following analyzed news articles about '{query}':\n\n" + "\n\n".join([ f"Article: {a['title']}\nAnalysis: {a['analysis']}" for a in analyzed_articles[:5] ]) + "\n\nProvide a comprehensive market summary that:\n" "1. Highlights the main trends and developments\n" "2. Identifies potential market impacts\n" "3. Suggests key takeaways for stakeholders" ) try: summary_response = model.generate_content(summary_prompt) market_summary = summary_response.text.strip() except Exception as e: logging.error(f"Error generating market summary: {str(e)}") market_summary = "Unable to generate market summary due to an error." return jsonify({ 'status': 'success', 'articles': analyzed_articles, 'summary': market_summary }) except Exception as e: logging.error(f"Error in fetch_news route: {str(e)}") return jsonify({ 'status': 'error', 'message': str(e), 'articles': [], 'summary': "An error occurred while processing the news request." }), 500 # Add health check endpoint @app.route('/health', methods=['GET']) def health_check(): return jsonify({'status': 'healthy', 'api_key_configured': bool(google_api_key)}) # Ensure all required environment variables are set def check_environment(): required_vars = ['GOOGLE_API_KEY'] missing_vars = [var for var in required_vars if not os.getenv(var)] if missing_vars: raise EnvironmentError(f"Missing required environment variables: {', '.join(missing_vars)}") if __name__ == '__main__': try: # Check environment variables check_environment() # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Initialize Google AI if not google_api_key: raise ValueError("GOOGLE_API_KEY not configured") genai.configure(api_key=google_api_key) # Start server port = int(os.environ.get('PORT', 7860)) app.run(host='0.0.0.0', port=port, debug=True) except Exception as e: logging.error(f"Failed to start server: {str(e)}", exc_info=True) raise