micyn-api / main.py
rairo's picture
Update main.py
47eb0bc verified
import os
import io
import json
import hashlib
from datetime import datetime, timedelta
from PIL import Image
import pytz
from flask import Flask, request, jsonify, send_file
from flask_cors import CORS
import google.generativeai as genai
import firebase_admin
from firebase_admin import credentials, db, storage, auth
import pandas as pd
import requests
from urllib.parse import urlparse, unquote
app = Flask(__name__)
CORS(app)
# Firebase initialization
cred = credentials.Certificate('datingi-firebase-adminsdk-obe4r-d63a25b54e.json')
firebase_admin.initialize_app(cred, {
'databaseURL': 'https://datingi.firebaseio.com',
'storageBucket': 'datingi.firebasestorage.app'
})
bucket = storage.bucket()
api_key = os.environ['Gemini']
FIREBASE_API_KEY = os.environ.get('FIREBASE_API_KEY')
# Add test account creation function
def create_test_accounts():
try:
# Create/admin2@test.com (Admin)
try:
admin_user = auth.create_user(
email='rurumukamuri@gmail.com',
password='ruvimbo1',
name='Ruru',
phone='0776076957'
)
except auth.EmailAlreadyExistsError:
admin_user = auth.get_user_by_email('rurumukamuri@gmail.com')
db.reference(f'users/{admin_user.uid}').set({
'is_admin': True
})
print("Test accounts created/updated successfully")
except Exception as e:
print(f"Error creating test accounts: {str(e)}")
# Create test accounts when server starts
create_test_accounts()
# Helper functions
def configure_gemini():
genai.configure(api_key=api_key)
return genai.GenerativeModel('gemini-2.0-flash-thinking-exp')
def verify_token(token):
try:
decoded_token = auth.verify_id_token(token)
return decoded_token['uid']
except Exception:
return None
def verify_admin(auth_header):
if not auth_header or not auth_header.startswith('Bearer '):
raise ValueError('Invalid token')
token = auth_header.split(' ')[1]
uid = verify_token(token)
if not uid:
raise PermissionError('Invalid user')
user_ref = db.reference(f'users/{uid}')
user_data = user_ref.get()
if not user_data or not user_data.get('is_admin', False):
raise PermissionError('Admin access required')
return uid
# Product Categories
CATEGORIES = [
'Masks',
'Gloves',
'Face Shields',
'Protective Suits',
'Sanitizers',
'Disinfectants'
]
# Authentication Endpoints
@app.route('/api/auth/register', methods=['POST'])
def register():
try:
data = request.get_json()
email = data.get('email')
password = data.get('password')
name = data.get('name')
if not email or not password:
return jsonify({'error': 'Email and password required'}), 400
# Create user in Firebase Auth
user = auth.create_user(
email=email,
password=password,
display_name=name
)
# Create user profile in Realtime Database
user_ref = db.reference(f'users/{user.uid}')
user_data = {
'email': email,
'name': name,
'created_at': datetime.now(pytz.UTC).isoformat(),
'is_admin': False,
'shipping_addresses': [],
'phone': data.get('phone', '')
}
user_ref.set(user_data)
# If there's a guest cart, transfer it to the user
session_id = data.get('session_id')
if session_id:
guest_cart_ref = db.reference(f'carts/session_{session_id}')
guest_cart = guest_cart_ref.get()
if guest_cart:
user_cart_ref = db.reference(f'carts/{user.uid}')
user_cart_ref.set(guest_cart)
guest_cart_ref.delete()
return jsonify({
'success': True,
'uid': user.uid,
'email': user.email,
'name': user.display_name
}), 201
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/auth/login', methods=['POST'])
def login():
try:
data = request.get_json()
email = data.get('email')
password = data.get('password')
if not email or not password:
return jsonify({'error': 'Email and password required'}), 400
# Sign in with Firebase Admin SDK
user = auth.get_user_by_email(email)
# Get custom token for client authentication
custom_token = auth.create_custom_token(user.uid)
# Get user profile data
user_ref = db.reference(f'users/{user.uid}')
user_data = user_ref.get()
# Transfer guest cart if exists
session_id = data.get('session_id')
if session_id:
guest_cart_ref = db.reference(f'carts/session_{session_id}')
guest_cart = guest_cart_ref.get()
if guest_cart:
user_cart_ref = db.reference(f'carts/{user.uid}')
user_cart_ref.set(guest_cart)
guest_cart_ref.delete()
return jsonify({
'success': True,
'token': custom_token,
'uid': user.uid,
'email': user.email,
'name': user.display_name,
'is_admin': user_data.get('is_admin', False)
})
except Exception as e:
return jsonify({'error': 'Invalid email or password'}), 401
@app.route('/api/auth/password-reset', methods=['POST'])
def request_password_reset():
try:
data = request.get_json()
email = data.get('email')
if not email:
return jsonify({'error': 'Email required'}), 400
# Generate password reset link
link = auth.generate_password_reset_link(email)
# In a production environment, you would typically send this link via email
# For this example, we'll just return it
return jsonify({
'success': True,
'reset_link': link
})
except Exception as e:
return jsonify({'error': str(e)}), 400
@app.route('/api/user/profile', methods=['GET', 'PUT'])
def user_profile():
try:
auth_header = request.headers.get('Authorization')
if not auth_header:
return jsonify({'error': 'Authorization required'}), 401
uid = verify_token(auth_header.split(' ')[1])
if not uid:
return jsonify({'error': 'Invalid token'}), 401
user_ref = db.reference(f'users/{uid}')
if request.method == 'GET':
user_data = user_ref.get()
return jsonify(user_data)
elif request.method == 'PUT':
data = request.get_json()
updates = {}
# Allow updating specific fields
if 'name' in data:
updates['name'] = data['name']
auth.update_user(uid, display_name=data['name'])
if 'phone' in data:
updates['phone'] = data['phone']
if 'shipping_addresses' in data:
updates['shipping_addresses'] = data['shipping_addresses']
if updates:
user_ref.update(updates)
return jsonify({
'success': True,
'updated_fields': list(updates.keys())
})
except Exception as e:
return jsonify({'error': str(e)}),
# Product Management Endpoints
@app.route('/api/admin/products', methods=['POST'])
def create_product():
try:
verify_admin(request.headers.get('Authorization', ''))
if 'image' not in request.files:
return jsonify({'error': 'No image file uploaded'}), 400
file = request.files['image']
name = request.form.get('name')
category = request.form.get('category')
price = float(request.form.get('price'))
description = request.form.get('description')
stock = int(request.form.get('stock', 0))
if category not in CATEGORIES:
return jsonify({'error': 'Invalid category'}), 400
# Upload image
image_bytes = file.read()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
file_hash = hashlib.md5(image_bytes).hexdigest()
blob = bucket.blob(f'products/{timestamp}_{file_hash}.jpg')
blob.upload_from_string(image_bytes, content_type='image/jpeg')
image_url = blob.public_url
# Create product
product_ref = db.reference('products').push({
'name': name,
'category': category,
'price': price,
'description': description,
'stock': stock,
'image_url': image_url,
'created_at': datetime.now(pytz.UTC).isoformat(),
'active': True
})
return jsonify({
'success': True,
'product_id': product_ref.key
}), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/admin/products/<string:product_id>', methods=['PUT'])
def update_product(product_id):
try:
verify_admin(request.headers.get('Authorization', ''))
updates = {}
if 'name' in request.form:
updates['name'] = request.form['name']
if 'category' in request.form and request.form['category'] in CATEGORIES:
updates['category'] = request.form['category']
if 'price' in request.form:
updates['price'] = float(request.form['price'])
if 'description' in request.form:
updates['description'] = request.form['description']
if 'stock' in request.form:
updates['stock'] = int(request.form['stock'])
if 'active' in request.form:
updates['active'] = request.form['active'].lower() == 'true'
if 'image' in request.files:
file = request.files['image']
image_bytes = file.read()
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
file_hash = hashlib.md5(image_bytes).hexdigest()
blob = bucket.blob(f'products/{timestamp}_{file_hash}.jpg')
blob.upload_from_string(image_bytes, content_type='image/jpeg')
updates['image_url'] = blob.public_url
db.reference(f'products/{product_id}').update(updates)
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
# Shopping Cart and Order Management
@app.route('/api/cart/add', methods=['POST'])
def add_to_cart():
try:
data = request.get_json()
product_id = data['product_id']
quantity = int(data['quantity'])
# Get user ID if logged in
auth_header = request.headers.get('Authorization')
uid = verify_token(auth_header.split(' ')[1]) if auth_header else None
# Use session ID for non-logged in users
session_id = data.get('session_id')
if not uid and not session_id:
return jsonify({'error': 'Either login or provide session_id'}), 400
cart_id = uid if uid else f'session_{session_id}'
cart_ref = db.reference(f'carts/{cart_id}')
# Check product availability
product = db.reference(f'products/{product_id}').get()
if not product:
return jsonify({'error': 'Product not found'}), 404
if not product.get('active', False):
return jsonify({'error': 'Product not available'}), 400
if product.get('stock', 0) < quantity:
return jsonify({'error': 'Insufficient stock'}), 400
# Update cart
cart_item = cart_ref.child(product_id).get() or {'quantity': 0}
new_quantity = cart_item['quantity'] + quantity
cart_ref.child(product_id).set({
'quantity': new_quantity,
'price': product['price'],
'name': product['name']
})
return jsonify({'success': True})
except Exception as e:
return jsonify({'error': str(e)}), 500
@app.route('/api/orders', methods=['POST'])
def create_order():
try:
data = request.get_json()
auth_header = request.headers.get('Authorization')
uid = verify_token(auth_header.split(' ')[1]) if auth_header else None
cart_id = uid if uid else f'session_{data["session_id"]}'
cart_ref = db.reference(f'carts/{cart_id}')
cart = cart_ref.get()
if not cart:
return jsonify({'error': 'Cart is empty'}), 400
# Validate stock and calculate total
total = 0
items = []
for product_id, item in cart.items():
product = db.reference(f'products/{product_id}').get()
if not product or not product.get('active', False):
return jsonify({'error': f'Product {product_id} not available'}), 400
if product['stock'] < item['quantity']:
return jsonify({'error': f'Insufficient stock for {product["name"]}'}), 400
total += item['quantity'] * product['price']
items.append({
'product_id': product_id,
'quantity': item['quantity'],
'price': product['price'],
'name': product['name']
})
# Update stock
db.reference(f'products/{product_id}').update({
'stock': product['stock'] - item['quantity']
})
# Create order
order_ref = db.reference('orders').push({
'user_id': uid,
'session_id': None if uid else data.get('session_id'),
'items': items,
'total': total,
'status': 'pending',
'shipping_address': data.get('shipping_address'),
'contact_email': data.get('contact_email'),
'created_at': datetime.now(pytz.UTC).isoformat()
})
# Clear cart
cart_ref.delete()
return jsonify({
'success': True,
'order_id': order_ref.key
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# AI Reporting
@app.route('/api/admin/reports', methods=['POST'])
def generate_report():
try:
verify_admin(request.headers.get('Authorization', ''))
data = request.get_json()
start_date = datetime.fromisoformat(data['start_date'])
end_date = datetime.fromisoformat(data['end_date'])
# Get orders within date range
orders_ref = db.reference('orders')
orders = orders_ref.get() or {}
filtered_orders = {
k: v for k, v in orders.items()
if start_date <= datetime.fromisoformat(v['created_at']) <= end_date
}
prompt = f"""Analyze this e-commerce data for medical PPE products and generate a comprehensive business report.
Focus on:
1. Total sales and revenue
2. Popular products and categories
3. Customer behavior patterns
4. Stock management recommendations
5. Market trends and insights
Data period: {start_date.date()} to {end_date.date()}"""
model = configure_gemini()
response = model.generate_content([prompt, json.dumps(filtered_orders)])
return jsonify({
'report': response.text,
'data': {
'total_orders': len(filtered_orders),
'total_revenue': sum(order['total'] for order in filtered_orders.values())
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
# Product Browsing
@app.route('/api/products', methods=['GET'])
def get_products():
try:
category = request.args.get('category')
search = request.args.get('search', '').lower()
products_ref = db.reference('products')
products = products_ref.get() or {}
filtered_products = []
for product_id, product in products.items():
if not product.get('active', False):
continue
if category and product['category'] != category:
continue
if search and search not in product['name'].lower():
continue
filtered_products.append({
'id': product_id,
**product
})
return jsonify(filtered_products)
except Exception as e:
return jsonify({'error': str(e)}), 500
# User Order History
@app.route('/api/user/orders', methods=['GET'])
def get_user_orders():
try:
auth_header = request.headers.get('Authorization')
if not auth_header:
return jsonify({'error': 'Authorization required'}), 401
uid = verify_token(auth_header.split(' ')[1])
if not uid:
return jsonify({'error': 'Invalid token'}), 401
orders_ref = db.reference('orders')
orders = orders_ref.order_by_child('user_id').equal_to(uid).get() or {}
return jsonify([{
'id': order_id,
**order_data
} for order_id, order_data in orders.items()])
except Exception as e:
return jsonify({'error': str(e)}), 500
# Admin Dashboard Data
@app.route('/api/admin/dashboard', methods=['GET'])
def get_admin_dashboard():
try:
verify_admin(request.headers.get('Authorization', ''))
# Get recent orders
orders_ref = db.reference('orders')
recent_orders = orders_ref.order_by_child('created_at').limit_to_last(10).get() or {}
# Get low stock products
products_ref = db.reference('products')
products = products_ref.get() or {}
low_stock = [
{'id': pid, **p}
for pid, p in products.items()
if p.get('active', False) and p.get('stock', 0) < 10
]
# Calculate statistics
total_revenue = sum(order['total'] for order in recent_orders.values())
total_orders = len(recent_orders)
return jsonify({
'recent_orders': recent_orders,
'low_stock_products': low_stock,
'statistics': {
'total_revenue': total_revenue,
'total_orders': total_orders,
'total_products': len(products),
'low_stock_count': len(low_stock)
}
})
except Exception as e:
return jsonify({'error': str(e)}), 500
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0", port=7860)