Spaces:
Runtime error
Runtime error
| from flask import Flask, jsonify, request, send_from_directory | |
| import boto3 | |
| import json | |
| from datetime import datetime, timedelta | |
| import random | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import os | |
| app = Flask(__name__, static_folder='static') | |
| # AWS Credentials from environment variables | |
| AWS_ACCESS_KEY_ID = os.environ.get("AKIATFBMPLLTCCPHI56U") | |
| AWS_SECRET_ACCESS_KEY = os.environ.get("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") | |
| AWS_REGION = os.environ.get("AWS_REGION", "ap-south-1") | |
| if not AWS_ACCESS_KEY_ID or not AWS_SECRET_ACCESS_KEY: | |
| raise ValueError("AWS credentials not provided in environment variables.") | |
| # Bedrock client setup | |
| bedrock = boto3.client( | |
| service_name='bedrock-runtime', | |
| region_name=AWS_REGION, | |
| aws_access_key_id=AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=AWS_SECRET_ACCESS_KEY | |
| ) | |
| TOPICS = [ | |
| "Market trends", "IPO", "Mutual funds", "Cryptocurrency", | |
| "Economy", "Banking", "Government policies and budget", "Gold and Silver" | |
| ] | |
| WEBSITES = [ | |
| "https://economictimes.indiatimes.com/markets/stocks/news", | |
| "https://www.moneycontrol.com/news/business/markets/" | |
| ] | |
| MODELS = [...] # Your full MODELS list here | |
| def fetch_from_websites(topic, current_time, time_24h_ago): | |
| articles = [] | |
| headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'} | |
| used_headlines = set() | |
| for url in WEBSITES: | |
| try: | |
| response = requests.get(url, headers=headers, timeout=10) | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| news_items = soup.select('h1, h2, h3, .story, .article') | |
| for item in news_items: | |
| if len(articles) >= 2: | |
| break | |
| title = item.get_text(strip=True)[:30] | |
| if title and title not in used_headlines and topic.lower() in title.lower(): | |
| used_headlines.add(title) | |
| pub_date = (current_time - timedelta(seconds=random.randint(0, 24*60*60))).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| desc_elem = item.find_next('p') | |
| raw_desc = desc_elem.get_text(strip=True) if desc_elem else f"Sample {topic} news from {url.split('/')[2]}." | |
| articles.append({"title": title, "publishedAt": pub_date, "raw_description": raw_desc}) | |
| except Exception as e: | |
| print(f"Failed to fetch from {url}: {e}") | |
| return articles | |
| def generate_bedrock_response(model, title, raw_desc, topic, current_date_str): | |
| prompt = ( | |
| f"Generate a 60-80 word news article description for '{topic}' based on: " | |
| f"Headline: {title}. Raw data: {raw_desc}. Ensure it’s unique, concise, " | |
| f"reflects trends as of {current_date_str}, and focuses on {topic} relevance." | |
| ) | |
| body_dict = model["body_template"].copy() | |
| body_dict["prompt"] = body_dict["prompt"].format(prompt=prompt) | |
| try: | |
| response = bedrock.invoke_model( | |
| modelId=model["modelId"], | |
| contentType=model["contentType"], | |
| accept=model["accept"], | |
| body=json.dumps(body_dict) | |
| ) | |
| response_body = json.loads(response['body'].read().decode('utf-8')) | |
| return (response_body.get('outputs', [{}])[0].get('text', raw_desc) if "mistral" in model["modelId"] | |
| else response_body.get('generation', raw_desc)).strip() | |
| except Exception as e: | |
| print(f"Bedrock model {model['modelId']} failed: {e}") | |
| return raw_desc[:100] | |
| def fetch_news(topics, total_articles=2, model_index=0): | |
| current_time = datetime.utcnow() | |
| time_24h_ago = current_time - timedelta(hours=24) | |
| current_date_str = current_time.strftime("%Y-%m-%d") | |
| all_articles = [] | |
| model = MODELS[model_index % len(MODELS)] | |
| for topic in topics: | |
| raw_articles = fetch_from_websites(topic, current_time, time_24h_ago) | |
| for article in raw_articles[:total_articles]: | |
| article["description"] = generate_bedrock_response(model, article["title"], article["raw_description"], topic, current_date_str) | |
| del article["raw_description"] | |
| all_articles.append(article) | |
| return all_articles | |
| # Serve frontend | |
| def serve_frontend(): | |
| return send_from_directory(app.static_folder, 'index.html') | |
| # API endpoint | |
| def get_news(): | |
| try: | |
| topics = request.args.get('topics', 'all') | |
| if topics == 'all': | |
| selected_topics = TOPICS | |
| else: | |
| selected_topics = [t.strip() for t in topics.split(',') if t.strip() in TOPICS] | |
| if not selected_topics: | |
| return jsonify({"error": "Invalid topics"}), 400 | |
| articles = fetch_news(selected_topics, total_articles=2, model_index=0) | |
| return jsonify({"articles": articles, "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}) | |
| except Exception as e: | |
| print(f"Error in get_news: {str(e)}") | |
| return jsonify({"error": f"Server error: {str(e)}"}), 500 | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |