Spaces:
Sleeping
Sleeping
| # app.py | |
| import streamlit as st | |
| # Set page config | |
| st.set_page_config( | |
| page_title="Quantum Bank: Invented by James Burvel O'Callaghan III", | |
| page_icon="💳", | |
| layout="wide", | |
| ) | |
| # Import necessary libraries | |
| import requests | |
| import base64 | |
| import uuid | |
| import os | |
| import hashlib | |
| from datetime import datetime, date, timedelta | |
| import pandas as pd | |
| from sklearn.ensemble import IsolationForest # For anomaly detection | |
| from cryptography.fernet import Fernet # For encryption | |
| import jwt # For JWT authentication | |
| from sklearn.linear_model import LinearRegression # For predictive analytics | |
| import numpy as np | |
| import stripe | |
| import plotly.express as px | |
| from qiskit import Aer, QuantumCircuit, execute # For quantum computing simulation | |
| from web3 import Web3 # For blockchain interaction | |
| from web3.middleware import geth_poa_middleware # For Infura with PoA networks | |
| import openai # For OpenAI ChatGPT integration | |
| from github import Github # For GitHub API | |
| from huggingface_hub import HfApi, HfFolder, Repository # For Hugging Face API | |
| from requests.auth import HTTPBasicAuth | |
| from oauthlib.oauth1 import Client # For MasterCard OAuth1 | |
| import json | |
| # Optional imports for barcode scanning | |
| barcode_scanning_available = True | |
| try: | |
| import cv2 | |
| from pyzbar import pyzbar | |
| from streamlit_webrtc import webrtc_streamer, VideoTransformerBase | |
| except ImportError: | |
| cv2 = None | |
| pyzbar = None | |
| webrtc_streamer = None | |
| VideoTransformerBase = object | |
| barcode_scanning_available = False | |
| # Now, after st.set_page_config(), you can use Streamlit commands | |
| if not barcode_scanning_available: | |
| st.warning("OpenCV and pyzbar are not available. Barcode scanning will be disabled.") | |
| # Load environment variables securely | |
| def load_env_vars(): | |
| env_vars = os.environ | |
| return env_vars | |
| env_vars = load_env_vars() | |
| # Securely retrieve API keys | |
| def get_env_var(key): | |
| return env_vars.get(key) | |
| # Initialize API keys and configurations | |
| stripe.api_key = get_env_var('STRIPE_API_KEY') | |
| plaid_client_id = get_env_var('PLAID_CLIENT_ID') | |
| plaid_secret = get_env_var('PLAID_SECRET') | |
| plaid_env = get_env_var('PLAID_ENV') or 'sandbox' | |
| citibank_client_id = get_env_var('CITIBANK_CLIENT_ID') | |
| citibank_client_secret = get_env_var('CITIBANK_CLIENT_SECRET') | |
| citibank_api_key = get_env_var('CITIBANK_API_KEY') | |
| mt_api_key = get_env_var('MT_API_KEY') | |
| mt_organization_id = get_env_var('MT_ORG_ID') | |
| github_token = get_env_var('GITHUB_TOKEN') | |
| huggingface_token = get_env_var('HUGGINGFACE_TOKEN') | |
| openai_api_key = get_env_var('OPENAI_API_KEY') | |
| infura_project_id = get_env_var('INFURA_PROJECT_ID') | |
| metamask_account_address = get_env_var('METAMASK_ACCOUNT_ADDRESS') | |
| metamask_private_key = get_env_var('METAMASK_PRIVATE_KEY') | |
| visa_api_key = get_env_var('VISA_API_KEY') | |
| visa_api_secret = get_env_var('VISA_API_SECRET') | |
| visa_cert_path = get_env_var('VISA_CERT_PATH') | |
| mastercard_consumer_key = get_env_var('MASTERCARD_CONSUMER_KEY') | |
| mastercard_private_key = get_env_var('MASTERCARD_PRIVATE_KEY') | |
| pipedream_url = get_env_var('PIPEDREAM_URL') | |
| # Set OpenAI API key | |
| openai.api_key = openai_api_key | |
| # Initialize Web3 | |
| if infura_project_id: | |
| infura_url = f"https://mainnet.infura.io/v3/{infura_project_id}" | |
| web3 = Web3(Web3.HTTPProvider(infura_url)) | |
| # For PoA networks like Ropsten or Rinkeby, use the following line | |
| # web3.middleware_onion.inject(geth_poa_middleware, layer=0) | |
| else: | |
| web3 = None | |
| # Initialize GitHub and Hugging Face clients | |
| github_client = Github(github_token) if github_token else None | |
| hf_api = HfApi() | |
| if huggingface_token: | |
| HfFolder.save_token(huggingface_token) | |
| else: | |
| huggingface_token = HfFolder.get_token() | |
| # Initialize Plaid client if credentials are provided | |
| if get_env_var('PLAID_CLIENT_ID') and get_env_var('PLAID_SECRET'): | |
| from plaid.api import plaid_api | |
| from plaid.model import * | |
| from plaid.api_client import ApiClient | |
| from plaid.configuration import Configuration | |
| plaid_environment = { | |
| 'sandbox': plaid.Environment.Sandbox, | |
| 'development': plaid.Environment.Development, | |
| 'production': plaid.Environment.Production, | |
| } | |
| plaid_config = Configuration( | |
| host=plaid_environment.get(plaid_env, plaid.Environment.Sandbox), | |
| api_key={ | |
| 'clientId': plaid_client_id, | |
| 'secret': plaid_secret, | |
| } | |
| ) | |
| plaid_api_client = ApiClient(plaid_config) | |
| plaid_client = plaid_api.PlaidApi(plaid_api_client) | |
| else: | |
| plaid_client = None | |
| # Initialize user database (In-memory for demonstration purposes) | |
| if 'user_database' not in st.session_state: | |
| st.session_state['user_database'] = {} | |
| # JWT Authentication | |
| def authenticate_user(): | |
| st.sidebar.title("User Authentication") | |
| auth_option = st.sidebar.radio("Select an option", ["Login", "Create Account"]) | |
| if auth_option == "Login": | |
| username = st.sidebar.text_input("Username") | |
| password = st.sidebar.text_input("Password", type="password") | |
| if st.sidebar.button("Login"): | |
| # Hash the password | |
| hashed_password = hashlib.sha256(password.encode()).hexdigest() | |
| # Check credentials | |
| user_db = st.session_state['user_database'] | |
| if username in user_db and user_db[username]['password'] == hashed_password: | |
| token = jwt.encode({'user': username}, 'secret', algorithm='HS256') | |
| st.session_state['auth_token'] = token | |
| st.success("Logged in successfully!") | |
| else: | |
| st.error("Invalid username or password") | |
| elif auth_option == "Create Account": | |
| username = st.sidebar.text_input("Choose a Username") | |
| password = st.sidebar.text_input("Choose a Password", type="password") | |
| confirm_password = st.sidebar.text_input("Confirm Password", type="password") | |
| if st.sidebar.button("Create Account"): | |
| if password != confirm_password: | |
| st.error("Passwords do not match") | |
| elif username in st.session_state['user_database']: | |
| st.error("Username already exists") | |
| else: | |
| # Hash the password | |
| hashed_password = hashlib.sha256(password.encode()).hexdigest() | |
| # Store user in the database | |
| st.session_state['user_database'][username] = {'password': hashed_password} | |
| st.success("Account created successfully! Please login.") | |
| if 'auth_token' not in st.session_state: | |
| authenticate_user() | |
| else: | |
| # Proceed with the application | |
| # Decode JWT to get username | |
| token = st.session_state['auth_token'] | |
| decoded_token = jwt.decode(token, 'secret', algorithms=['HS256']) | |
| username = decoded_token['user'] | |
| # Title of the app | |
| st.title(f"Quantum Bank: Welcome, {username}") | |
| st.sidebar.header("Navigation") | |
| # Logout button | |
| if st.sidebar.button("Logout"): | |
| del st.session_state['auth_token'] | |
| st.success("Logged out successfully") | |
| st.experimental_rerun() | |
| # Main application navigation | |
| service_choice = st.sidebar.selectbox("Choose a Service", [ | |
| "Dashboard", | |
| "In-Store Scanner & Payment", | |
| "Citibank Services", | |
| "Plaid Services", | |
| "Stripe Services", | |
| "Modern Treasury Services", | |
| "GitHub Integration", | |
| "Hugging Face Integration", | |
| "Blockchain Integration", | |
| "Visa Services", | |
| "MasterCard Services", | |
| "Pipedream Integration", | |
| "Financial Analysis", | |
| "Budgeting Tools", | |
| "AI Chatbot", | |
| "Quantum Portfolio Optimization", | |
| ]) | |
| # In-Store Scanner & Payment | |
| if service_choice == "In-Store Scanner & Payment": | |
| st.header("In-Store Item Scanner and Payment") | |
| if barcode_scanning_available: | |
| # Mock database of items | |
| ITEM_DATABASE = { | |
| "123456789012": {"name": "Product A", "price": 9.99}, | |
| "987654321098": {"name": "Product B", "price": 19.99}, | |
| # Add more items as needed | |
| } | |
| # Function to get item details | |
| def get_item_details(code): | |
| return ITEM_DATABASE.get(code) | |
| # Video transformer for barcode scanning | |
| class BarcodeScanner(VideoTransformerBase): | |
| def __init__(self): | |
| self.result = None | |
| def transform(self, frame): | |
| image = frame.to_ndarray(format="bgr24") | |
| barcodes = pyzbar.decode(image) | |
| for barcode in barcodes: | |
| (x, y, w, h) = barcode.rect | |
| cv2.rectangle(image, (x, y), (x + w, y + h), (0, 255, 0), 2) | |
| barcode_data = barcode.data.decode("utf-8") | |
| barcode_type = barcode.type | |
| text = f"{barcode_data} ({barcode_type})" | |
| cv2.putText(image, text, (x, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2) | |
| self.result = barcode_data | |
| return image | |
| st.write("Scan an item to view details and purchase.") | |
| # Start barcode scanner | |
| barcode_scanner = BarcodeScanner() | |
| webrtc_ctx = webrtc_streamer( | |
| key="barcode-scanner", | |
| video_transformer_factory=lambda: barcode_scanner, | |
| async_transform=True, | |
| ) | |
| if webrtc_ctx.video_transformer: | |
| barcode_data = barcode_scanner.result | |
| if barcode_data: | |
| item_details = get_item_details(barcode_data) | |
| if item_details: | |
| st.success(f"Item Scanned: {item_details['name']}") | |
| st.write(f"Price: ${item_details['price']:.2f}") | |
| if st.button("Purchase"): | |
| # Process payment | |
| try: | |
| # In a real application, collect payment details securely | |
| # For demonstration, we'll simulate a successful payment | |
| st.write("Payment processed successfully.") | |
| # Update Modern Treasury ledger | |
| mt_response = create_mt_ledger_transaction( | |
| amount=item_details['price'], | |
| description=f"Purchase of {item_details['name']}" | |
| ) | |
| if mt_response: | |
| st.write("Ledger updated successfully.") | |
| else: | |
| st.write("Failed to update ledger.") | |
| except Exception as e: | |
| st.error(f"Error processing payment: {e}") | |
| else: | |
| st.error("Item not found in database.") | |
| def create_mt_ledger_transaction(amount, description): | |
| url = "https://app.moderntreasury.com/api/ledger_transactions" | |
| credentials = f"{mt_organization_id}:{mt_api_key}" | |
| base64_credentials = base64.b64encode(credentials.encode()).decode('utf-8') | |
| headers = { | |
| "Authorization": f"Basic {base64_credentials}", | |
| "Content-Type": "application/json", | |
| "Accept": "application/json" | |
| } | |
| data = { | |
| "description": description, | |
| "status": "posted", | |
| "ledger_entries": [ | |
| { | |
| "amount": int(amount * 100), # Convert to cents | |
| "direction": "debit", | |
| "ledger_account_id": "ledger_account_id_debit", # Replace with your ledger account ID | |
| }, | |
| { | |
| "amount": int(amount * 100), | |
| "direction": "credit", | |
| "ledger_account_id": "ledger_account_id_credit", # Replace with your ledger account ID | |
| }, | |
| ] | |
| } | |
| response = requests.post(url, headers=headers, json=data) | |
| if response.status_code in [200, 201]: | |
| return response.json() | |
| else: | |
| st.error(f"Modern Treasury Error: {response.text}") | |
| return None | |
| else: | |
| st.error("Barcode scanning features are not available in this environment.") | |
| # Citibank Services | |
| elif service_choice == "Citibank Services": | |
| st.header("Citibank Services") | |
| # Citibank API class | |
| class CitibankAPI: | |
| def __init__(self, client_id, client_secret, api_key): | |
| self.client_id = client_id | |
| self.client_secret = client_secret | |
| self.api_key = api_key | |
| self.access_token = self.get_access_token() | |
| self.base_url = "https://sandbox.apihub.citi.com/gcb/api/v1" | |
| def get_access_token(self): | |
| url = "https://sandbox.apihub.citi.com/gcb/api/clientCredentials/oauth2/token/us/gcb" | |
| headers = { | |
| 'Authorization': 'Basic ' + base64.b64encode((self.client_id + ':' + self.client_secret).encode()).decode(), | |
| 'Content-Type': 'application/x-www-form-urlencoded' | |
| } | |
| data = 'grant_type=client_credentials&scope=/api' | |
| response = requests.post(url, headers=headers, data=data) | |
| if response.status_code == 200: | |
| return response.json().get('access_token') | |
| else: | |
| st.error("Failed to obtain Citibank access token.") | |
| return None | |
| def get_accounts(self): | |
| url = f"{self.base_url}/accounts" | |
| headers = { | |
| 'Authorization': f"Bearer {self.access_token}", | |
| 'Accept': 'application/json' | |
| } | |
| response = requests.get(url, headers=headers) | |
| return response.json() | |
| def get_account_details(self, account_id): | |
| url = f"{self.base_url}/accounts/{account_id}" | |
| headers = { | |
| 'Authorization': f"Bearer {self.access_token}", | |
| 'Accept': 'application/json' | |
| } | |
| response = requests.get(url, headers=headers) | |
| return response.json() | |
| def create_account(self, account_data): | |
| url = f"{self.base_url}/accounts" | |
| headers = { | |
| 'Authorization': f"Bearer {self.access_token}", | |
| 'Content-Type': 'application/json', | |
| 'Accept': 'application/json' | |
| } | |
| response = requests.post(url, headers=headers, json=account_data) | |
| return response.json() | |
| def initiate_payment(self, payment_data): | |
| url = f"{self.base_url}/payments" | |
| headers = { | |
| 'Authorization': f"Bearer {self.access_token}", | |
| 'Content-Type': 'application/json', | |
| 'Accept': 'application/json' | |
| } | |
| response = requests.post(url, headers=headers, json=payment_data) | |
| return response.json() | |
| citibank_api_instance = CitibankAPI(citibank_client_id, citibank_client_secret, citibank_api_key) | |
| citibank_action = st.selectbox("Choose an Action", [ | |
| "View Accounts", | |
| "View Account Details", | |
| "Create Account", | |
| "Initiate Payment", | |
| ]) | |
| if citibank_action == "View Accounts": | |
| accounts = citibank_api_instance.get_accounts() | |
| st.json(accounts) | |
| elif citibank_action == "View Account Details": | |
| account_id = st.text_input("Enter Account ID") | |
| if st.button("Get Account Details"): | |
| details = citibank_api_instance.get_account_details(account_id) | |
| st.json(details) | |
| elif citibank_action == "Create Account": | |
| st.subheader("Create a New Account") | |
| account_name = st.text_input("Account Name") | |
| account_type = st.selectbox("Account Type", ["Savings", "Checking"]) | |
| currency = st.text_input("Currency", value="USD") | |
| if st.button("Create Account"): | |
| account_data = { | |
| "accountName": account_name, | |
| "accountType": account_type, | |
| "currency": currency | |
| } | |
| result = citibank_api_instance.create_account(account_data) | |
| st.json(result) | |
| elif citibank_action == "Initiate Payment": | |
| st.subheader("Initiate a Payment") | |
| from_account = st.text_input("From Account ID") | |
| to_account = st.text_input("To Account Number") | |
| amount = st.number_input("Amount", min_value=0.0) | |
| currency = st.text_input("Currency", value="USD") | |
| description = st.text_input("Description") | |
| if st.button("Initiate Payment"): | |
| payment_data = { | |
| "sourceAccountId": from_account, | |
| "transactionAmount": { | |
| "amount": amount, | |
| "currency": currency | |
| }, | |
| "destinationAccountNumber": to_account, | |
| "paymentMethod": "InternalTransfer", | |
| "paymentCurrency": currency, | |
| "remarks": description | |
| } | |
| result = citibank_api_instance.initiate_payment(payment_data) | |
| st.json(result) | |
| # Plaid Services | |
| elif service_choice == "Plaid Services": | |
| st.header("Plaid Services") | |
| if not plaid_client_id or not plaid_secret: | |
| st.error("Please provide Plaid API credentials.") | |
| else: | |
| plaid_action = st.selectbox("Choose an Action", [ | |
| "Link Account", | |
| "Get Accounts", | |
| "Get Transactions", | |
| ]) | |
| if 'plaid_access_token' not in st.session_state: | |
| st.session_state['plaid_access_token'] = None | |
| if plaid_action == "Link Account": | |
| st.subheader("Link a Bank Account") | |
| # Create link token | |
| user_id = str(uuid.uuid4()) | |
| request = LinkTokenCreateRequest( | |
| products=[Products('transactions')], | |
| client_name='TransactPro', | |
| country_codes=[CountryCode('US')], | |
| language='en', | |
| user=LinkTokenCreateRequestUser(client_user_id=user_id) | |
| ) | |
| response = plaid_client.link_token_create(request) | |
| link_token = response['link_token'] | |
| st.write("Use this link token in your frontend to initialize Plaid Link.") | |
| st.code(link_token) | |
| st.session_state['plaid_link_token'] = link_token | |
| elif plaid_action == "Get Accounts": | |
| if st.session_state['plaid_access_token']: | |
| accounts_request = AccountsGetRequest(access_token=st.session_state['plaid_access_token']) | |
| accounts_response = plaid_client.accounts_get(accounts_request) | |
| st.json(accounts_response.to_dict()) | |
| else: | |
| st.error("No access token available. Please link an account first.") | |
| elif plaid_action == "Get Transactions": | |
| if st.session_state['plaid_access_token']: | |
| start_date = (date.today() - timedelta(days=30)).strftime('%Y-%m-%d') | |
| end_date = date.today().strftime('%Y-%m-%d') | |
| transactions_request = TransactionsGetRequest( | |
| access_token=st.session_state['plaid_access_token'], | |
| start_date=start_date, | |
| end_date=end_date | |
| ) | |
| transactions_response = plaid_client.transactions_get(transactions_request) | |
| st.json(transactions_response.to_dict()) | |
| else: | |
| st.error("No access token available. Please link an account first.") | |
| # Stripe Services | |
| elif service_choice == "Stripe Services": | |
| st.header("Stripe Services") | |
| stripe_action = st.selectbox("Choose an Action", [ | |
| "Create Customer", | |
| "Charge Customer", | |
| "View Charges", | |
| ]) | |
| if stripe_action == "Create Customer": | |
| st.subheader("Create a New Customer") | |
| email = st.text_input("Customer Email") | |
| description = st.text_input("Description") | |
| if st.button("Create Customer"): | |
| customer = stripe.Customer.create( | |
| email=email, | |
| description=description | |
| ) | |
| st.success("Customer created successfully.") | |
| st.json(customer) | |
| elif stripe_action == "Charge Customer": | |
| st.subheader("Charge a Customer") | |
| customer_id = st.text_input("Customer ID") | |
| amount = st.number_input("Amount (in cents)", min_value=50) | |
| currency = st.text_input("Currency", value="usd") | |
| description = st.text_input("Description") | |
| if st.button("Charge Customer"): | |
| charge = stripe.Charge.create( | |
| customer=customer_id, | |
| amount=int(amount), | |
| currency=currency, | |
| description=description | |
| ) | |
| st.success("Charge created successfully.") | |
| st.json(charge) | |
| elif stripe_action == "View Charges": | |
| st.subheader("List Charges") | |
| charges = stripe.Charge.list() | |
| st.json(charges) | |
| # Modern Treasury Services | |
| elif service_choice == "Modern Treasury Services": | |
| st.header("Modern Treasury Services") | |
| mt_action = st.selectbox("Choose an Action", [ | |
| "View Payment Orders", | |
| "Create Payment Order", | |
| ]) | |
| def get_mt_headers(): | |
| credentials = f"{mt_organization_id}:{mt_api_key}" | |
| base64_credentials = base64.b64encode(credentials.encode()).decode('utf-8') | |
| return { | |
| "Authorization": f"Basic {base64_credentials}", | |
| "Content-Type": "application/json", | |
| "Accept": "application/json" | |
| } | |
| if mt_action == "View Payment Orders": | |
| url = "https://app.moderntreasury.com/api/payment_orders" | |
| headers = get_mt_headers() | |
| response = requests.get(url, headers=headers) | |
| if response.status_code == 200: | |
| st.json(response.json()) | |
| else: | |
| st.error(f"Error: {response.text}") | |
| elif mt_action == "Create Payment Order": | |
| st.subheader("Create a New Payment Order") | |
| amount = st.number_input("Amount", min_value=0.0) | |
| direction = st.selectbox("Direction", ["credit", "debit"]) | |
| currency = st.text_input("Currency", value="USD") | |
| description = st.text_input("Description") | |
| if st.button("Create Payment Order"): | |
| url = "https://app.moderntreasury.com/api/payment_orders" | |
| headers = get_mt_headers() | |
| data = { | |
| "amount": int(amount * 100), # Convert to cents | |
| "direction": direction, | |
| "currency": currency, | |
| "description": description | |
| } | |
| response = requests.post(url, headers=headers, json=data) | |
| if response.status_code == 201: | |
| st.success("Payment Order created successfully.") | |
| st.json(response.json()) | |
| else: | |
| st.error(f"Error: {response.text}") | |
| # GitHub Integration | |
| elif service_choice == "GitHub Integration": | |
| st.header("GitHub Repository Management") | |
| if not github_client: | |
| st.error("Please provide your GitHub Personal Access Token in the environment variables.") | |
| else: | |
| github_action = st.selectbox("Choose an Action", [ | |
| "List Repositories", | |
| "Create Repository", | |
| "Add File to Repository", | |
| "List Repository Contents", | |
| ]) | |
| if github_action == "List Repositories": | |
| st.subheader("Your GitHub Repositories") | |
| try: | |
| repos = github_client.get_user().get_repos() | |
| repo_list = [repo.full_name for repo in repos] | |
| st.write(repo_list) | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| elif github_action == "Create Repository": | |
| st.subheader("Create a New Repository") | |
| repo_name = st.text_input("Repository Name") | |
| repo_description = st.text_area("Repository Description") | |
| private = st.checkbox("Private Repository") | |
| if st.button("Create Repository"): | |
| try: | |
| user = github_client.get_user() | |
| repo = user.create_repo( | |
| name=repo_name, | |
| description=repo_description, | |
| private=private | |
| ) | |
| st.success(f"Repository '{repo.full_name}' created successfully!") | |
| st.write(f"URL: {repo.html_url}") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| elif github_action == "Add File to Repository": | |
| st.subheader("Add a File to a Repository") | |
| repo_name = st.text_input("Repository Name (e.g., username/repo)") | |
| file_path = st.text_input("File Path in Repository (e.g., folder/filename.py)") | |
| file_content = st.text_area("File Content") | |
| commit_message = st.text_input("Commit Message", value="Add new file via TransactPro app") | |
| if st.button("Add File"): | |
| try: | |
| repo = github_client.get_repo(repo_name) | |
| # Check if the file already exists | |
| try: | |
| contents = repo.get_contents(file_path) | |
| # If the file exists, update it | |
| repo.update_file( | |
| path=file_path, | |
| message=commit_message, | |
| content=file_content, | |
| sha=contents.sha, | |
| branch="main" | |
| ) | |
| st.success(f"File '{file_path}' updated in repository '{repo_name}' successfully!") | |
| except Exception: | |
| # If the file does not exist, create it | |
| repo.create_file( | |
| path=file_path, | |
| message=commit_message, | |
| content=file_content, | |
| branch="main" | |
| ) | |
| st.success(f"File '{file_path}' added to repository '{repo_name}' successfully!") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| elif github_action == "List Repository Contents": | |
| st.subheader("List Contents of a Repository") | |
| repo_name = st.text_input("Repository Name (e.g., username/repo)") | |
| if st.button("List Contents"): | |
| try: | |
| repo = github_client.get_repo(repo_name) | |
| contents = repo.get_contents("") | |
| while contents: | |
| file_content = contents.pop(0) | |
| if file_content.type == "dir": | |
| contents.extend(repo.get_contents(file_content.path)) | |
| else: | |
| st.write(file_content.path) | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| # Hugging Face Integration | |
| elif service_choice == "Hugging Face Integration": | |
| st.header("Hugging Face Model Management") | |
| if not huggingface_token: | |
| st.error("Please provide your Hugging Face Token in the environment variables.") | |
| else: | |
| huggingface_action = st.selectbox("Choose an Action", [ | |
| "List Models", | |
| "Upload Model", | |
| "Create Space", | |
| "Download Model", | |
| ]) | |
| if huggingface_action == "List Models": | |
| st.subheader("Your Hugging Face Models") | |
| try: | |
| user_info = hf_api.whoami(token=huggingface_token) | |
| models = hf_api.list_models(author=user_info['name'], token=huggingface_token) | |
| model_list = [model.modelId for model in models] | |
| st.write(model_list) | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| elif huggingface_action == "Upload Model": | |
| st.subheader("Upload a Model to Hugging Face") | |
| model_name = st.text_input("Model Name (e.g., username/model_name)") | |
| model_files = st.file_uploader("Select Model Files", accept_multiple_files=True) | |
| if st.button("Upload Model"): | |
| try: | |
| repo_url = hf_api.create_repo(name=model_name, token=huggingface_token) | |
| repo = Repository(local_dir=model_name, clone_from=repo_url, use_auth_token=huggingface_token) | |
| repo.git_pull() | |
| for model_file in model_files: | |
| with open(os.path.join(model_name, model_file.name), "wb") as f: | |
| f.write(model_file.getbuffer()) | |
| repo.git_add() | |
| repo.git_commit("Upload model files") | |
| repo.git_push() | |
| st.success(f"Model '{model_name}' uploaded successfully!") | |
| st.write(f"Model URL: https://huggingface.co/{model_name}") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| elif huggingface_action == "Create Space": | |
| st.subheader("Create a New Hugging Face Space") | |
| space_name = st.text_input("Space Name (e.g., username/space_name)") | |
| sdk = st.selectbox("Select SDK", ["streamlit", "gradio", "dash", "static"]) | |
| if st.button("Create Space"): | |
| try: | |
| repo_url = hf_api.create_repo(name=space_name, repo_type="space", space_sdk=sdk, token=huggingface_token) | |
| st.success(f"Space '{space_name}' created successfully!") | |
| st.write(f"Space URL: https://huggingface.co/spaces/{space_name}") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| elif huggingface_action == "Download Model": | |
| st.subheader("Download a Model from Hugging Face") | |
| model_id = st.text_input("Model ID (e.g., username/model_name)") | |
| if st.button("Download Model"): | |
| try: | |
| model_dir = hf_api.download_repo(repo_id=model_id, repo_type='model', local_dir=model_id, token=huggingface_token) | |
| st.success(f"Model '{model_id}' downloaded successfully!") | |
| st.write(f"Model Directory: {model_dir}") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| # Blockchain Integration | |
| elif service_choice == "Blockchain Integration": | |
| st.header("Blockchain Integration with Metamask and Infura") | |
| if not web3: | |
| st.error("Please provide your Infura Project ID in the environment variables.") | |
| else: | |
| blockchain_action = st.selectbox("Choose an Action", [ | |
| "View Account Balance", | |
| "Send Transaction", | |
| "View Transaction Details", | |
| ]) | |
| if blockchain_action == "View Account Balance": | |
| st.subheader("View Ethereum Account Balance") | |
| account_address = st.text_input("Enter Ethereum Account Address", value=metamask_account_address) | |
| if st.button("Get Balance"): | |
| try: | |
| balance_wei = web3.eth.get_balance(account_address) | |
| balance_eth = web3.fromWei(balance_wei, 'ether') | |
| st.write(f"Balance: {balance_eth} ETH") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| elif blockchain_action == "Send Transaction": | |
| st.subheader("Send Ethereum Transaction") | |
| to_address = st.text_input("Recipient Ethereum Address") | |
| amount = st.number_input("Amount in ETH", min_value=0.0) | |
| gas_price = st.number_input("Gas Price (Gwei)", min_value=1.0, value=50.0) | |
| if st.button("Send Transaction"): | |
| if not metamask_private_key: | |
| st.error("Please provide your Metamask Private Key in the environment variables.") | |
| else: | |
| try: | |
| # Build transaction | |
| nonce = web3.eth.getTransactionCount(metamask_account_address) | |
| tx = { | |
| 'nonce': nonce, | |
| 'to': to_address, | |
| 'value': web3.toWei(amount, 'ether'), | |
| 'gas': 21000, | |
| 'gasPrice': web3.toWei(gas_price, 'gwei'), | |
| } | |
| # Sign transaction | |
| signed_tx = web3.eth.account.sign_transaction(tx, metamask_private_key) | |
| # Send transaction | |
| tx_hash = web3.eth.sendRawTransaction(signed_tx.rawTransaction) | |
| st.success(f"Transaction sent! TX Hash: {web3.toHex(tx_hash)}") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| elif blockchain_action == "View Transaction Details": | |
| st.subheader("View Ethereum Transaction Details") | |
| tx_hash = st.text_input("Enter Transaction Hash") | |
| if st.button("Get Transaction Details"): | |
| try: | |
| tx = web3.eth.getTransaction(tx_hash) | |
| st.json(dict(tx)) | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| # Visa Services | |
| elif service_choice == "Visa Services": | |
| st.header("Visa API Integration") | |
| visa_action = st.selectbox("Choose an Action", [ | |
| "Get Transaction Details", | |
| "Initiate Payment", | |
| ]) | |
| if not visa_api_key or not visa_api_secret or not visa_cert_path: | |
| st.error("Please provide Visa API credentials and certificate path.") | |
| else: | |
| if visa_action == "Get Transaction Details": | |
| transaction_id = st.text_input("Enter Transaction ID") | |
| if st.button("Get Details"): | |
| url = f"https://sandbox.api.visa.com/visadirect/v1/transaction/{transaction_id}" | |
| headers = { | |
| "Accept": "application/json", | |
| "Content-Type": "application/json" | |
| } | |
| response = requests.get( | |
| url, | |
| headers=headers, | |
| auth=( | |
| visa_api_key, | |
| visa_api_secret | |
| ), | |
| cert=(visa_cert_path, visa_cert_path) | |
| ) | |
| if response.status_code == 200: | |
| st.json(response.json()) | |
| else: | |
| st.error(f"Error: {response.text}") | |
| elif visa_action == "Initiate Payment": | |
| st.subheader("Initiate a Payment") | |
| # Implement payment initiation with Visa API | |
| st.info("Payment initiation with Visa API is not implemented in this example.") | |
| # MasterCard Services | |
| elif service_choice == "MasterCard Services": | |
| st.header("MasterCard API Integration") | |
| mastercard_action = st.selectbox("Choose an Action", [ | |
| "Get Account Details", | |
| "Process Payment", | |
| ]) | |
| if not mastercard_consumer_key or not mastercard_private_key: | |
| st.error("Please provide MasterCard API credentials.") | |
| else: | |
| if mastercard_action == "Get Account Details": | |
| account_id = st.text_input("Enter Account ID") | |
| if st.button("Get Account Details"): | |
| url = f"https://sandbox.api.mastercard.com/account/{account_id}" | |
| headers = { | |
| "Accept": "application/json", | |
| "Content-Type": "application/json" | |
| } | |
| # Implement OAuth1 authentication for MasterCard API | |
| client = Client( | |
| client_key=mastercard_consumer_key, | |
| rsa_key=mastercard_private_key, | |
| signature_method='RSA-SHA1' | |
| ) | |
| uri, headers, body = client.sign( | |
| url, | |
| http_method='GET', | |
| body='', | |
| headers=headers | |
| ) | |
| response = requests.get(uri, headers=headers) | |
| if response.status_code == 200: | |
| st.json(response.json()) | |
| else: | |
| st.error(f"Error: {response.text}") | |
| elif mastercard_action == "Process Payment": | |
| st.subheader("Process a Payment") | |
| # Implement payment processing with MasterCard API | |
| st.info("Payment processing with MasterCard API is not implemented in this example.") | |
| # Pipedream Integration | |
| elif service_choice == "Pipedream Integration": | |
| st.header("Pipedream Workflow Trigger") | |
| if not pipedream_url: | |
| st.error("Please provide your Pipedream Workflow URL in the environment variables.") | |
| else: | |
| event_name = st.text_input("Event Name") | |
| data = st.text_area("Event Data (JSON format)") | |
| if st.button("Trigger Workflow"): | |
| try: | |
| event_data = json.loads(data) | |
| event_data['event_name'] = event_name | |
| response = requests.post(pipedream_url, json=event_data) | |
| if response.status_code == 200: | |
| st.success("Pipedream workflow triggered successfully.") | |
| else: | |
| st.error(f"Failed to trigger workflow: {response.text}") | |
| except json.JSONDecodeError: | |
| st.error("Invalid JSON data.") | |
| # Financial Analysis | |
| elif service_choice == "Financial Analysis": | |
| st.header("Financial Analysis and Insights") | |
| # Fetch transaction data from APIs or use sample data | |
| transactions = [ | |
| {'date': '2023-01-01', 'amount': -50, 'category': 'Groceries'}, | |
| {'date': '2023-01-02', 'amount': -200, 'category': 'Rent'}, | |
| {'date': '2023-01-03', 'amount': -15, 'category': 'Coffee'}, | |
| # ... more transactions | |
| ] | |
| df_transactions = pd.DataFrame(transactions) | |
| # Visualization | |
| fig = px.bar(df_transactions, x='date', y='amount', color='category', title="Spending Over Time") | |
| st.plotly_chart(fig) | |
| # Anomaly Detection | |
| model = IsolationForest(contamination=0.1) | |
| df_transactions['anomaly'] = model.fit_predict(df_transactions[['amount']]) | |
| anomalies = df_transactions[df_transactions['anomaly'] == -1] | |
| st.subheader("Detected Anomalies") | |
| st.write(anomalies) | |
| # Predictive Analytics | |
| st.subheader("Predictive Analytics") | |
| df_transactions['timestamp'] = pd.to_datetime(df_transactions['date']).astype(np.int64) // 10**9 | |
| X = df_transactions[['timestamp']] | |
| y = df_transactions['amount'] | |
| reg_model = LinearRegression() | |
| reg_model.fit(X, y) | |
| future_dates = pd.date_range(start=date.today(), periods=30) | |
| future_timestamps = future_dates.astype(np.int64) // 10**9 | |
| X_future = pd.DataFrame(future_timestamps, columns=['timestamp']) | |
| y_pred = reg_model.predict(X_future) | |
| pred_df = pd.DataFrame({'date': future_dates, 'predicted_amount': y_pred}) | |
| fig_pred = px.line(pred_df, x='date', y='predicted_amount', title="Predicted Future Spending") | |
| st.plotly_chart(fig_pred) | |
| # Budgeting Tools | |
| elif service_choice == "Budgeting Tools": | |
| st.header("Budgeting and Financial Planning") | |
| budget_categories = ['Rent', 'Groceries', 'Entertainment', 'Utilities', 'Miscellaneous'] | |
| budgets = {} | |
| for category in budget_categories: | |
| budgets[category] = st.number_input(f"{category} Budget", min_value=0.0, format="%0.2f") | |
| st.subheader("Your Budget Overview") | |
| budget_df = pd.DataFrame(list(budgets.items()), columns=['Category', 'Budget']) | |
| st.table(budget_df) | |
| # Compare with actual spending | |
| actual_spending = {'Rent': 1800, 'Groceries': 450, 'Entertainment': 300, 'Utilities': 200, 'Miscellaneous': 150} | |
| spending_df = pd.DataFrame(list(actual_spending.items()), columns=['Category', 'Spent']) | |
| comparison_df = budget_df.merge(spending_df, on='Category') | |
| comparison_df['Difference'] = comparison_df['Budget'] - comparison_df['Spent'] | |
| st.subheader("Budget vs Actual Spending") | |
| st.table(comparison_df) | |
| # AI Chatbot with OpenAI's ChatGPT API | |
| elif service_choice == "AI Chatbot": | |
| st.header("AI-Powered Financial Assistant with ChatGPT") | |
| if not openai_api_key: | |
| st.error("Please provide your OpenAI API Key in the environment variables.") | |
| else: | |
| user_input = st.text_input("You: ") | |
| if user_input: | |
| try: | |
| response = openai.ChatCompletion.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a helpful financial assistant."}, | |
| {"role": "user", "content": user_input}, | |
| ], | |
| ) | |
| assistant_response = response['choices'][0]['message']['content'] | |
| st.text_area("Assistant:", value=assistant_response, height=200) | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| # Quantum Portfolio Optimization | |
| elif service_choice == "Quantum Portfolio Optimization": | |
| st.header("Quantum Portfolio Optimization Simulation") | |
| num_qubits = 3 | |
| qc = QuantumCircuit(num_qubits) | |
| qc.h(range(num_qubits)) | |
| qc.measure_all() | |
| simulator = Aer.get_backend('qasm_simulator') | |
| result = execute(qc, simulator, shots=1024).result() | |
| counts = result.get_counts() | |
| st.write("Simulation Results:") | |
| st.write(counts) | |
| # Interpret the results for portfolio optimization | |
| # Display footer | |
| st.sidebar.markdown("Powered by Streamlit") | |
| st.sidebar.markdown("Made by Citibank Demo Business Inc.") |