Spaces:
Sleeping
Sleeping
| import os | |
| import io | |
| import base64 | |
| from flask import Flask, request, jsonify, render_template | |
| from flask_cors import CORS | |
| import google.generativeai as genai | |
| from werkzeug.utils import secure_filename | |
| import PyPDF2 | |
| import docx | |
| import pandas as pd | |
| import json | |
| import numpy as np | |
| import logging | |
| from langchain.text_splitter import RecursiveCharacterTextSplitter | |
| from langchain_google_genai import GoogleGenerativeAIEmbeddings | |
| from langchain_google_genai import ChatGoogleGenerativeAI | |
| from langchain.chains.question_answering import load_qa_chain | |
| from langchain.prompts import PromptTemplate | |
| from langchain.docstore.document import Document | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import plotly.express as px | |
| import plotly | |
| from newsapi import NewsApiClient | |
| import certifi | |
| import requests | |
| from bs4 import BeautifulSoup | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from urllib.parse import urlparse, urljoin | |
| import time | |
| import random | |
| from functools import wraps | |
| from pathlib import Path | |
| from requests.adapters import HTTPAdapter | |
| from urllib3.util.retry import Retry | |
| app = Flask(__name__) | |
| CORS(app, resources={ | |
| r"/*": { | |
| "origins": "*", | |
| "methods": ["GET", "POST", "OPTIONS"], | |
| "allow_headers": ["Content-Type"] | |
| } | |
| }) | |
| # Set up logging | |
| logging.basicConfig(level=logging.INFO) | |
| # Get the directory containing app.py | |
| BASE_DIR = Path(__file__).resolve().parent | |
| # Set your Google API key securely using an environment variable | |
| google_api_key = os.getenv('GOOGLE_API_KEY') | |
| genai.configure(api_key=google_api_key) | |
| newsapi_key = os.getenv('NEWSAPI_KEY') | |
| NEWSAPI_KEY = newsapi_key | |
| session = requests.Session() | |
| session.verify = certifi.where() | |
| newsapi = NewsApiClient(api_key=NEWSAPI_KEY) | |
| newsapi.session = session | |
| # Initialize the model | |
| model = genai.GenerativeModel('gemini-2.0-flash-exp') | |
| ALLOWED_EXTENSIONS = {'txt', 'pdf', 'docx', 'xlsx', 'csv'} | |
| # In-memory storage | |
| files_storage = {} | |
| chunks_storage = [] | |
| # List of energy company websites to scrape | |
| ENERGY_COMPANIES = [ | |
| # Oil and Gas Companies | |
| "https://corporate.exxonmobil.com/", | |
| "https://www.chevron.com/", | |
| "https://www.bp.com/", | |
| "https://www.shell.com/", | |
| "https://totalenergies.com/", | |
| "https://www.aramco.com/", | |
| "http://www.petrochina.com.cn/ptr/", | |
| "https://www.gazprom.com/", | |
| "https://www.lukoil.com/", | |
| "https://www.rosneft.com/", | |
| # Renewable Energy Companies | |
| "https://www.nexteraenergy.com/", | |
| "https://www.iberdrola.com/", | |
| "https://www.siemensgamesa.com/", | |
| "https://orsted.com/", | |
| "https://www.enelgreenpower.com/", | |
| "https://www.firstsolar.com/", | |
| "https://bep.brookfield.com/", | |
| "https://www.canadiansolar.com/", | |
| "https://us.sunpower.com/", | |
| # Electricity Generation and Utility Companies | |
| "https://www.duke-energy.com/", | |
| "https://www.edf.fr/", | |
| "https://www.eon.com/", | |
| "https://www.enel.com/", | |
| "https://www.nationalgrid.com/", | |
| "https://www.southerncompany.com/", | |
| "https://www.aep.com/", | |
| "https://www.iberdrola.com/", | |
| "https://www.engie.com/", | |
| "https://www.xcelenergy.com/", | |
| # Nuclear Energy Companies | |
| "https://www.edf.fr/", | |
| "https://www.rosatom.ru/", | |
| "https://www.exeloncorp.com/", | |
| "https://www.westinghousenuclear.com/", | |
| "https://www.orano.group/en/" | |
| ] | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def convert_excel_to_csv(file_content): | |
| logging.info("Converting Excel to CSV in memory") | |
| excel_file = io.BytesIO(file_content) | |
| df = pd.read_excel(excel_file) | |
| csv_buffer = io.StringIO() | |
| df.to_csv(csv_buffer, index=False) | |
| return csv_buffer.getvalue() | |
| def process_document(file_content, filename): | |
| logging.info(f"Processing document: {filename}") | |
| if filename.endswith('.pdf'): | |
| return extract_text_from_pdf(file_content) | |
| elif filename.endswith('.docx'): | |
| return extract_text_from_docx(file_content) | |
| elif filename.endswith('.xlsx'): | |
| csv_content = convert_excel_to_csv(file_content) | |
| return extract_text_from_csv(csv_content) | |
| elif filename.endswith('.csv'): | |
| return extract_text_from_csv(file_content.decode('utf-8')) | |
| else: | |
| return file_content.decode('utf-8') | |
| def extract_text_from_pdf(file_content): | |
| logging.info("Extracting text from PDF in memory") | |
| text = "" | |
| pdf_file = io.BytesIO(file_content) | |
| reader = PyPDF2.PdfReader(pdf_file) | |
| for page in reader.pages: | |
| text += page.extract_text() + "\n" | |
| return text | |
| def extract_text_from_docx(file_content): | |
| logging.info("Extracting text from DOCX in memory") | |
| docx_file = io.BytesIO(file_content) | |
| doc = docx.Document(docx_file) | |
| return "\n".join([para.text for para in doc.paragraphs]) | |
| def extract_text_from_csv(file_content): | |
| logging.info("Extracting text from CSV in memory") | |
| df = pd.read_csv(io.StringIO(file_content)) | |
| return df.to_string() | |
| def get_text_chunks(text): | |
| logging.info("Chunking text for vector store...") | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
| chunks = text_splitter.split_text(text) | |
| return chunks | |
| def create_vector_store(text_chunks, file_id): | |
| logging.info("Creating vector store in memory...") | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| for chunk in text_chunks: | |
| embedding = embeddings.embed_query(chunk) | |
| chunks_storage.append({ | |
| 'file_id': file_id, | |
| 'content': chunk, | |
| 'embedding': embedding | |
| }) | |
| logging.info("Vector store created in memory.") | |
| def get_conversational_chain(): | |
| prompt_template = """ | |
| Answer the question as detailed as possible from the provided context. If the answer is not directly | |
| available in the provided context, use your knowledge to infer a reasonable answer based on the given information. | |
| If you're unsure or the question is completely unrelated to the context, state that you don't have enough information to answer accurately. | |
| Context:\n{context}\n | |
| Question:\n{question}\n | |
| Answer: | |
| """ | |
| model = ChatGoogleGenerativeAI(model="gemini-pro", temperature=0.3) | |
| prompt = PromptTemplate(template=prompt_template, input_variables=["context", "question"]) | |
| chain = load_qa_chain(model, chain_type="stuff", prompt=prompt) | |
| return chain | |
| def answer_query_from_document(user_question, file_id): | |
| logging.info("Answering query from in-memory storage...") | |
| embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001") | |
| query_embedding = np.array(embeddings.embed_query(user_question)) | |
| file_chunks = [chunk for chunk in chunks_storage if chunk['file_id'] == file_id] | |
| chunk_embeddings = [np.array(chunk['embedding']) for chunk in file_chunks] | |
| similarities = cosine_similarity([query_embedding], chunk_embeddings)[0] | |
| # Sort chunks by similarity | |
| sorted_chunks = sorted(zip(file_chunks, similarities), key=lambda x: x[1], reverse=True) | |
| # Get top 5 most similar chunks | |
| top_chunks = sorted_chunks[:5] | |
| context = ' '.join([chunk[0]['content'] for chunk in top_chunks]) | |
| chain = get_conversational_chain() | |
| response = chain.invoke({"input_documents": [Document(page_content=context)], "question": user_question}) | |
| return response["output_text"] | |
| def analyze_document(text): | |
| logging.info(f"Analyzing document with text: {text[:200]}...") # Log first 200 chars | |
| prompt = f"Analyze the following document and provide a summary of its key points and any important insights:\n\n{text[:4000]}" | |
| response = model.generate_content(prompt) | |
| logging.info("Document analysis completed.") | |
| return response.text | |
| def process_query(query, role=None, file_id=None): | |
| if file_id: | |
| return answer_query_from_document(query, file_id) | |
| else: | |
| system_prompt = f"You are an AI assistant specializing in {role}." if role else "You are a helpful AI assistant." | |
| prompt = f""" | |
| {system_prompt} | |
| Please format your response using markdown with proper structure: | |
| - Use '##' for main headings | |
| - Use '**' for bold text | |
| - Use bullet points ('*') for lists | |
| - Add proper spacing between sections | |
| - Structure the content hierarchically | |
| - Use proper paragraphs with line breaks | |
| Query: "{query}" | |
| Remember to: | |
| - Format the response clearly and professionally | |
| - Use headings for different sections | |
| - Break down complex information into digestible parts | |
| - Use bold text for emphasis on key terms | |
| - Maintain consistent spacing | |
| """ | |
| try: | |
| response = model.generate_content(prompt) | |
| return response.text | |
| except Exception as e: | |
| logging.error(f"Error generating content: {str(e)}", exc_info=True) | |
| raise e | |
| def local_summarize(text): | |
| """ | |
| A simple extractive summarization function that doesn't require downloading models. | |
| """ | |
| # Simple extractive summarization | |
| sentences = text.split('.') | |
| # Take first 2-3 sentences as summary if available | |
| summary_sentences = sentences[:min(3, len(sentences))] | |
| summary = '. '.join(sentence.strip() for sentence in summary_sentences if sentence.strip()) | |
| return summary + ('.' if not summary.endswith('.') else '') | |
| def scrape_company_news(url): | |
| """ | |
| Scrapes the top company news items from the given URL. | |
| Uses a session with retries to mitigate timeouts or transient errors. | |
| """ | |
| try: | |
| session = requests.Session() | |
| retries = Retry(total=3, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504]) | |
| adapter = HTTPAdapter(max_retries=retries) | |
| session.mount('https://', adapter) | |
| session.mount('http://', adapter) | |
| headers = { | |
| 'User-Agent': ('Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' | |
| 'AppleWebKit/537.36 (KHTML, like Gecko) ' | |
| 'Chrome/91.0.4472.124 Safari/537.36') | |
| } | |
| response = session.get(url, headers=headers, timeout=10) | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Combine results from multiple selectors | |
| articles = soup.find_all('article') | |
| articles.extend(soup.find_all('div', class_='news-item')) | |
| articles.extend(soup.find_all('div', class_='press-release')) | |
| news_items = [] | |
| # Only take the first 5 items (adjust as needed) | |
| for article in articles[:5]: | |
| title_elem = article.find('h2') or article.find('h3') or article.find('a') | |
| link_elem = article.find('a') | |
| if title_elem and link_elem and link_elem.has_attr('href'): | |
| news_items.append({ | |
| 'title': title_elem.get_text(strip=True), | |
| 'url': urljoin(url, link_elem['href']), | |
| 'source': urlparse(url).netloc | |
| }) | |
| return news_items | |
| except Exception as e: | |
| logging.error(f"Error scraping {url}: {str(e)}") | |
| return [] | |
| def get_energy_news(query): | |
| """ | |
| Fetches the latest news articles from NewsData.io API based on the query. | |
| """ | |
| logging.info(f"Starting news fetch for query: {query}") | |
| news_data_api_key = os.getenv('NEWSDATA_API_KEY') | |
| if not news_data_api_key: | |
| logging.error("NewsData API key not found in environment variables") | |
| return [] | |
| endpoint = "https://newsdata.io/api/1/news" | |
| params = { | |
| 'apikey': news_data_api_key, | |
| 'q': query, | |
| 'country': 'us', | |
| 'language': 'en', | |
| 'category': 'business' | |
| } | |
| logging.info(f"Making API request to: {endpoint}") | |
| logging.info(f"With parameters: {params}") | |
| try: | |
| response = requests.get(endpoint, params=params, timeout=10) | |
| logging.info(f"API Response status code: {response.status_code}") | |
| response.raise_for_status() | |
| data = response.json() | |
| if data.get("status") == "success": | |
| articles = data.get("results", []) | |
| logging.info(f"Successfully fetched {len(articles)} articles") | |
| return articles | |
| else: | |
| logging.error(f"NewsData API error response: {data}") | |
| return [] | |
| except Exception as e: | |
| logging.error(f"Error fetching news: {e}") | |
| return [] | |
| def robust_analyze_news_item(item, query): | |
| """ | |
| Analyzes a news item using the generative model with better error handling. | |
| """ | |
| try: | |
| # Extract article information | |
| title = item.get('title', '') | |
| content = item.get('description', '') or item.get('content', '') | |
| source = item.get('source_id', 'Unknown Source') | |
| url = item.get('link', '#') | |
| # Skip if no meaningful content | |
| if not content or len(content.strip()) < 10: | |
| logging.warning(f"Skipping article due to insufficient content: {title}") | |
| return None | |
| prompt = ( | |
| "Analyze this news article:\n" | |
| f"Title: {title}\n" | |
| f"Content: {content}\n" | |
| "\nProvide a brief analysis in the following format:\n" | |
| "1. Summary (2-3 sentences)\n" | |
| "2. Key Points (up to 3 bullet points)\n" | |
| "3. Market Impact (1-2 sentences about potential market implications)" | |
| ) | |
| try: | |
| response = model.generate_content(prompt) | |
| analysis = response.text.strip() | |
| except Exception as e: | |
| logging.warning(f"Generative model failed: {str(e)}") | |
| logging.info("Falling back to local summarizer") | |
| analysis = local_summarize(content) | |
| return { | |
| 'title': title, | |
| 'link': url, | |
| 'source': source, | |
| 'analysis': analysis | |
| } | |
| except Exception as e: | |
| logging.error(f"Error in robust_analyze_news_item: {str(e)}") | |
| return None | |
| def get_company_news(): | |
| with ThreadPoolExecutor(max_workers=10) as executor: | |
| future_to_url = {executor.submit(scrape_company_news, url): url for url in ENERGY_COMPANIES} | |
| all_company_news = [] | |
| for future in as_completed(future_to_url): | |
| all_company_news.extend(future.result()) | |
| time.sleep(random.uniform(0.5, 1.5)) # Random delay to avoid overwhelming servers | |
| return all_company_news | |
| def filter_and_analyze_news(query, articles, company_news): | |
| """ | |
| Processes both News API results and scraped company news. | |
| Uses robust_analyze_news_item so that any API errors are handled gracefully. | |
| """ | |
| all_news = articles + company_news | |
| filtered_and_analyzed_news = [] | |
| with ThreadPoolExecutor(max_workers=20) as executor: | |
| future_to_item = { | |
| executor.submit(robust_analyze_news_item, item, query): item | |
| for item in all_news | |
| } | |
| for future in as_completed(future_to_item): | |
| result = future.result() | |
| if result: | |
| filtered_and_analyzed_news.append(result) | |
| if len(filtered_and_analyzed_news) >= 20: | |
| break | |
| return filtered_and_analyzed_news | |
| def generate_market_summary(query, filtered_news): | |
| """ | |
| Generates an overall market summary using the individual news analyses. | |
| Uses the generative model but falls back to local summarization in case of errors. | |
| """ | |
| if not filtered_news: | |
| return f"No relevant news found for '{query}' in the energy market context." | |
| # Combine the analyses from each news item for context | |
| summaries = [] | |
| for item in filtered_news: | |
| summaries.append(f"Title: {item.get('title', 'No title')}\nAnalysis: {item.get('analysis', '')}\n") | |
| combined_summary = "\n\n".join(summaries) | |
| prompt = f""" | |
| Based on the following news analyses: | |
| {combined_summary} | |
| Provide a comprehensive market summary that: | |
| - Highlights current trends related to {query} in the energy market. | |
| - Identifies key insights and potential market impacts. | |
| - Organizes the information into clearly defined sections. | |
| """ | |
| try: | |
| response = model.generate_content(prompt) | |
| return response.text.strip() | |
| except Exception as e: | |
| logging.error(f"Error generating market summary using API: {e}. Falling back to local summarization.") | |
| return local_summarize(combined_summary) | |
| def index(): | |
| return render_template('index.html') | |
| # Add error handling decorator | |
| def handle_errors(f): | |
| def wrapper(*args, **kwargs): | |
| try: | |
| return f(*args, **kwargs) | |
| except Exception as e: | |
| logging.error(f"Error in {f.__name__}: {str(e)}", exc_info=True) | |
| return jsonify({'error': str(e)}), 500 | |
| return wrapper | |
| def query(): | |
| data = request.json | |
| if not data: | |
| return jsonify({'error': 'No data provided'}), 400 | |
| query = data.get('query') | |
| role = data.get('role') | |
| file_id = data.get('file_id') | |
| if not query: | |
| return jsonify({'error': 'No query provided'}), 400 | |
| if not role: | |
| return jsonify({'error': 'No role provided'}), 400 | |
| try: | |
| response = process_query(query, role, file_id) | |
| return jsonify({'response': response}) | |
| except Exception as e: | |
| logging.error(f"Error processing query: {str(e)}", exc_info=True) | |
| return jsonify({'error': str(e)}), 500 | |
| def upload_file(): | |
| if 'file' not in request.files: | |
| return jsonify({'error': 'No file part'}), 400 | |
| file = request.files['file'] | |
| if file.filename == '': | |
| return jsonify({'error': 'No selected file'}), 400 | |
| if not allowed_file(file.filename): | |
| return jsonify({'error': 'Invalid file type'}), 400 | |
| try: | |
| file_content = file.read() | |
| filename = secure_filename(file.filename) | |
| # Process the file | |
| extracted_text = process_document(file_content, filename) | |
| text_chunks = get_text_chunks(extracted_text) | |
| analysis = analyze_document(extracted_text) | |
| # Generate file ID and store | |
| file_id = len(files_storage) + 1 | |
| files_storage[file_id] = { | |
| 'filename': filename, | |
| 'file_data': base64.b64encode(file_content).decode('utf-8'), | |
| 'analysis': analysis | |
| } | |
| # Create vector store | |
| create_vector_store(text_chunks, file_id) | |
| return jsonify({ | |
| 'file_id': file_id, | |
| 'analysis': analysis, | |
| 'message': 'File processed successfully' | |
| }) | |
| except Exception as e: | |
| logging.error(f"Error processing file: {str(e)}", exc_info=True) | |
| return jsonify({'error': str(e)}), 500 | |
| def plot(): | |
| data = request.json | |
| file_id = data.get('file_id') | |
| try: | |
| file_data_base64 = files_storage[file_id]['file_data'] | |
| file_data = base64.b64decode(file_data_base64) | |
| df = pd.read_excel(io.BytesIO(file_data)) | |
| fig = px.line(df, x=df.columns[0], y=df.columns[1:]) | |
| graph_json = json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder) | |
| return jsonify({'graph': graph_json}) | |
| except Exception as e: | |
| logging.error(f'Error generating plot: {str(e)}', exc_info=True) | |
| return jsonify({'error': str(e)}), 500 | |
| def process_csv_query(): | |
| data = request.json | |
| file_id = data.get('file_id') | |
| query = data.get('query') | |
| try: | |
| file_data_base64 = files_storage[file_id]['file_data'] | |
| file_data = base64.b64decode(file_data_base64) | |
| # Create a DataFrame from the CSV data | |
| df = pd.read_csv(io.StringIO(file_data.decode('utf-8'))) | |
| # Process the query using the DataFrame | |
| response = process_dataframe_query(df, query) | |
| return jsonify({'response': response}) | |
| except Exception as e: | |
| logging.error(f'Error processing CSV query: {str(e)}', exc_info=True) | |
| return jsonify({'error': str(e)}), 500 | |
| def process_dataframe_query(df, query): | |
| # This function simulates the behavior of the CSV agent | |
| # You may need to implement more sophisticated logic here | |
| prompt = f""" | |
| Given the following DataFrame information: | |
| Columns: {', '.join(df.columns)} | |
| Shape: {df.shape} | |
| Answer the following query about the data: | |
| {query} | |
| Provide a concise and informative response based on the data in the DataFrame. | |
| """ | |
| response = model.generate_content(prompt) | |
| return response.text | |
| def fetch_news(): | |
| try: | |
| data = request.json | |
| query = data.get('query', '') | |
| # Fetch articles | |
| articles = get_energy_news(query) | |
| if not articles: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'No articles found', | |
| 'articles': [], | |
| 'summary': f"No relevant news found for '{query}'" | |
| }) | |
| # Analyze articles | |
| analyzed_articles = [] | |
| for article in articles: | |
| analysis = robust_analyze_news_item(article, query) | |
| if analysis: | |
| analyzed_articles.append(analysis) | |
| if not analyzed_articles: | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': 'No articles could be analyzed', | |
| 'articles': [], | |
| 'summary': f"Could not analyze any articles for '{query}'" | |
| }) | |
| # Generate market summary | |
| summary_prompt = ( | |
| f"Based on the following analyzed news articles about '{query}':\n\n" | |
| + "\n\n".join([ | |
| f"Article: {a['title']}\nAnalysis: {a['analysis']}" | |
| for a in analyzed_articles[:5] | |
| ]) | |
| + "\n\nProvide a comprehensive market summary that:\n" | |
| "1. Highlights the main trends and developments\n" | |
| "2. Identifies potential market impacts\n" | |
| "3. Suggests key takeaways for stakeholders" | |
| ) | |
| try: | |
| summary_response = model.generate_content(summary_prompt) | |
| market_summary = summary_response.text.strip() | |
| except Exception as e: | |
| logging.error(f"Error generating market summary: {str(e)}") | |
| market_summary = "Unable to generate market summary due to an error." | |
| return jsonify({ | |
| 'status': 'success', | |
| 'articles': analyzed_articles, | |
| 'summary': market_summary | |
| }) | |
| except Exception as e: | |
| logging.error(f"Error in fetch_news route: {str(e)}") | |
| return jsonify({ | |
| 'status': 'error', | |
| 'message': str(e), | |
| 'articles': [], | |
| 'summary': "An error occurred while processing the news request." | |
| }), 500 | |
| # Add health check endpoint | |
| def health_check(): | |
| return jsonify({'status': 'healthy', 'api_key_configured': bool(google_api_key)}) | |
| # Ensure all required environment variables are set | |
| def check_environment(): | |
| required_vars = ['GOOGLE_API_KEY'] | |
| missing_vars = [var for var in required_vars if not os.getenv(var)] | |
| if missing_vars: | |
| raise EnvironmentError(f"Missing required environment variables: {', '.join(missing_vars)}") | |
| if __name__ == '__main__': | |
| try: | |
| # Check environment variables | |
| check_environment() | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| # Initialize Google AI | |
| if not google_api_key: | |
| raise ValueError("GOOGLE_API_KEY not configured") | |
| genai.configure(api_key=google_api_key) | |
| # Start server | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(host='0.0.0.0', port=port, debug=True) | |
| except Exception as e: | |
| logging.error(f"Failed to start server: {str(e)}", exc_info=True) | |
| raise |