from flask import Flask, request, jsonify
import os
import json
import time # Not actively used
import base64 # Not actively used
import uuid
from flask_cors import CORS
from google import genai
from PIL import Image # Not actively used
import io # Not actively used
from typing import List, Dict, Any # Not actively used
import logging
import traceback
from datetime import datetime, timezone
import re
from firebase_admin import credentials, db, storage, auth, exceptions as firebase_exceptions
import firebase_admin
# --- NEW: Geolocation Libraries ---
import geohash as pgh
from haversine import haversine, Unit
app = Flask(__name__)
CORS(app)
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- NEW: Geolocation Configuration ---
GEOHASH_PRECISION = 7 # A good balance. ~153m x 153m area.
# Configure GenAI (Gemini)
GOOGLE_API_KEY = os.getenv('GOOGLE_API_KEY')
gemini_client = None
if not GOOGLE_API_KEY:
logger.warning("GOOGLE_API_KEY environment variable is not set. AI features will be disabled.")
else:
try:
gemini_client = genai.Client(api_key=GOOGLE_API_KEY)
logger.info("Gemini AI Client initialized successfully using genai.Client().")
except Exception as e:
logger.error(f"Failed to initialize Gemini AI Client with genai.Client(): {e}")
gemini_client = None
import resend
# NEW: Initialize the Resend client using environment variables
# This should be placed near your other initializations at the top of the file.
if 'RESEND_API_KEY' in os.environ:
resend.api_key = os.environ["RESEND_API_KEY"]
SENDER_EMAIL = os.environ.get("SENDER_EMAIL")
if not SENDER_EMAIL:
logger.warning("RESEND_API_KEY is set, but SENDER_EMAIL is not. Emails will not be sent.")
resend.api_key = None # Disable client if sender is not configured
else:
logger.info("RESEND_API_KEY environment variable not found. Email notifications will be disabled.")
resend.api_key = None
#--- Firebase Initialization ---
FIREBASE_CREDENTIALS_JSON_STRING = os.getenv("FIREBASE")
FIREBASE_DB_URL = os.getenv("Firebase_DB")
FIREBASE_STORAGE_BUCKET = os.getenv("Firebase_Storage")
FIREBASE_INITIALIZED = False
bucket = None
db_app = None
try:
if FIREBASE_CREDENTIALS_JSON_STRING and FIREBASE_DB_URL and FIREBASE_STORAGE_BUCKET:
credentials_json = json.loads(FIREBASE_CREDENTIALS_JSON_STRING)
cred = credentials.Certificate(credentials_json)
if not firebase_admin._apps:
db_app = firebase_admin.initialize_app(cred, {
'databaseURL': FIREBASE_DB_URL,
'storageBucket': FIREBASE_STORAGE_BUCKET
})
else:
db_app = firebase_admin.get_app()
FIREBASE_INITIALIZED = True
bucket = storage.bucket(app=db_app)
logger.info("Firebase Admin SDK initialized successfully.")
else:
logger.error("Firebase environment variables (FIREBASE, Firebase_DB, Firebase_Storage) not fully set. Firebase Admin SDK not initialized.")
except Exception as e:
logger.error(f"CRITICAL: Error initializing Firebase: {e}")
traceback.print_exc()
#--- END Firebase Initialization ---
# Paynow #
# --- Paynow Initialization ---
from paynow import Paynow
PAYNOW_INTEGRATION_ID = os.getenv("PAYNOW_INTEGRATION_ID")
PAYNOW_INTEGRATION_KEY = os.getenv("PAYNOW_INTEGRATION_KEY")
# IMPORTANT: This should be the publicly accessible URL of your deployed application
APP_BASE_URL = os.getenv("APP_BASE_URL", "https://tunasongaagri.co.zw")
paynow_client = None
if PAYNOW_INTEGRATION_ID and PAYNOW_INTEGRATION_KEY:
try:
paynow_client = Paynow(
PAYNOW_INTEGRATION_ID,
PAYNOW_INTEGRATION_KEY,
return_url=f"{APP_BASE_URL}/my-deals", # A page for the user to land on after payment
result_url=f"{APP_BASE_URL}/api/payment/webhook/paynow" # The webhook URL for server-to-server updates
)
logger.info("Paynow client initialized successfully.")
except Exception as e:
logger.error(f"Failed to initialize Paynow client: {e}")
else:
logger.warning("Paynow environment variables not set. Payment features will be disabled.")
#--- NEW: Geolocation Helpers ---
def _calculate_geohash(lat, lon):
"""Calculates the geohash for a given latitude and longitude."""
return pgh.encode(lat, lon, precision=GEOHASH_PRECISION)
def _get_geohash_query_area(lat, lon):
"""Calculates the geohash for a point and its 8 neighbors."""
center_geohash = _calculate_geohash(lat, lon)
neighbors = pgh.neighbors(center_geohash)
return [center_geohash] + neighbors
def _calculate_distance(point1_coords, point2_coords):
"""Calculates Haversine distance between two coordinate dictionaries."""
point1 = (point1_coords['latitude'], point1_coords['longitude'])
point2 = (point2_coords['latitude'], point2_coords['longitude'])
return haversine(point1, point2, unit=Unit.KILOMETERS)
#--- Helper Functions ---
def verify_token(auth_header):
if not FIREBASE_INITIALIZED:
logger.error("verify_token: Firebase not initialized.")
raise ConnectionError('Server configuration error: Firebase not ready.')
if not auth_header or not auth_header.startswith('Bearer '):
raise ValueError('Invalid token format')
token = auth_header.split(' ')[1]
try:
decoded_token = auth.verify_id_token(token, app=db_app)
return decoded_token['uid']
except auth.AuthError as ae: # More specific Firebase Auth errors
logger.error(f"Token verification failed (AuthError): {ae}")
raise PermissionError(f"Token verification failed: {str(ae)}")
except Exception as e: # Generic catch-all
logger.error(f"Token verification failed (Generic Exception): {e}\n{traceback.format_exc()}")
raise PermissionError(f"Token verification failed: {str(e)}")
def get_user_roles(uid):
if not FIREBASE_INITIALIZED: return {}
try:
user_ref = db.reference(f'users/{uid}', app=db_app)
user_data = user_ref.get()
if user_data and isinstance(user_data.get('roles'), dict):
return user_data.get('roles', {})
return {}
except Exception as e:
logger.error(f"Error in get_user_roles for UID {uid}: {e}\n{traceback.format_exc()}")
return {}
def verify_role(auth_header, required_role):
uid = verify_token(auth_header) # Can raise PermissionError, ValueError, ConnectionError
user_roles = get_user_roles(uid)
if not user_roles.get(required_role, False):
raise PermissionError(f'{required_role} access required')
return uid
def verify_admin_or_facilitator(auth_header):
uid = verify_token(auth_header) # Can raise
if not FIREBASE_INITIALIZED: raise ConnectionError('Server configuration error: Firebase not ready.')
try:
user_ref = db.reference(f'users/{uid}', app=db_app)
user_data = user_ref.get()
if not user_data: raise PermissionError('User profile not found.')
if not user_data.get('is_admin', False) and not user_data.get('is_facilitator', False):
raise PermissionError('Admin or Facilitator access required')
return uid, user_data.get('is_admin', False), user_data.get('is_facilitator', False)
except firebase_exceptions.FirebaseError as fe:
logger.error(f"Firebase error in verify_admin_or_facilitator for UID {uid}: {fe}\n{traceback.format_exc()}")
raise ConnectionError(f"Database access error during admin/facilitator check: {fe}")
# PermissionError from verify_token will be caught by the route handler
def verify_admin(auth_header):
uid = verify_token(auth_header) # Can raise
if not FIREBASE_INITIALIZED: raise ConnectionError('Server configuration error: Firebase not ready.')
try:
user_ref = db.reference(f'users/{uid}', app=db_app)
user_data = user_ref.get()
if not user_data:
logger.warning(f"User {uid} (from token) not found in Realtime DB. Cannot verify admin status.")
raise PermissionError('User profile not found in database. Admin access denied.')
if not user_data.get('is_admin', False):
raise PermissionError('Admin access required')
return uid
except firebase_exceptions.FirebaseError as fe:
logger.error(f"Firebase error in verify_admin for UID {uid}: {fe}\n{traceback.format_exc()}")
raise ConnectionError(f"Database access error during admin check: {fe}")
# PermissionError from verify_token will be caught by the route handler
def ensure_user_profile_exists(uid, email=None, name=None, phone_number=None):
if not FIREBASE_INITIALIZED:
logger.error(f"ensure_user_profile_exists: Firebase not initialized. Cannot ensure profile for UID: {uid}")
return None # Or raise ConnectionError
try:
user_ref = db.reference(f'users/{uid}', app=db_app)
user_data = user_ref.get()
if not user_data:
logger.info(f"Creating missing Tunasonga profile for UID: {uid}")
if not email:
try:
user_record = auth.get_user(uid, app=db_app)
email = user_record.email
except Exception as e_auth_get:
logger.error(f"Failed to get email for UID {uid} from Firebase Auth: {e_auth_get}")
new_profile = {
'email': email, 'name': name or "", 'phone_number': phone_number or "",
'location': "", # Legacy text field
'location_coords': None, # NEW: Geo object
'geohash': None, # NEW: Geohash for indexing
'roles': {'farmer': False, 'buyer': False, 'transporter': False},
'role_applications': {}, 'document_references': {},
'is_admin': False, 'is_facilitator': False,
'created_at': datetime.now(timezone.utc).isoformat(), 'suspended': False,
'account_type': 'webapp_registered' # Default for web app signups
}
user_ref.set(new_profile)
verification = user_ref.get()
if verification and verification.get('email') == email:
logger.info(f"Successfully created Tunasonga profile for UID: {uid}")
return verification
else:
logger.error(f"Profile creation verification failed for UID: {uid} after set.")
return None
return user_data
except firebase_exceptions.FirebaseError as fe:
logger.error(f"Firebase error in ensure_user_profile_exists for UID {uid}: {fe}\n{traceback.format_exc()}")
return None # Or raise
except Exception as e:
logger.error(f"Error ensuring Tunasonga profile exists for UID {uid}: {e}\n{traceback.format_exc()}")
return None # Or raise
#--- Universal Route Error Handler for Auth/Verification & Firebase ---
def handle_route_errors(e, uid_context="unknown"): # Added uid_context for better logging
if isinstance(e, PermissionError):
return jsonify({'error': str(e), 'type': 'PermissionError'}), 403
elif isinstance(e, ValueError): # e.g. invalid token format, or bad input to int()/float()
return jsonify({'error': str(e), 'type': 'ValueError'}), 400
elif isinstance(e, ConnectionError): # Firebase not ready or DB access issue from helpers
return jsonify({'error': str(e), 'type': 'ConnectionError'}), 503
elif isinstance(e, firebase_exceptions.FirebaseError): # Catch specific Firebase SDK errors
logger.error(f"Firebase SDK Error in route (UID context: {uid_context}): {e}\n{traceback.format_exc()}")
return jsonify({'error': f'A Firebase error occurred: {str(e)}', 'type': 'FirebaseSDKError'}), 500
else: # Generic catch-all for other unexpected errors
logger.error(f"Unexpected Error in route (UID context: {uid_context}): {e}\n{traceback.format_exc()}")
return jsonify({'error': f'An unexpected error occurred: {str(e)}', 'type': 'GenericError'}), 500
#--- system notifications ---
def _send_system_notification(user_id, message_content, notif_type, link=None, send_email=False, email_subject=None, email_body=None):
if not FIREBASE_INITIALIZED:
logger.error("_send_system_notification: Firebase not ready.")
return False
if not user_id or not message_content:
logger.warning(f"_send_system_notification: Called with missing user_id or message_content.")
return False
# --- Primary Channel: Firebase In-App Notification ---
firebase_success = False
notif_id = str(uuid.uuid4())
notif_data = {
"message": message_content,
"type": notif_type,
"link": link,
"created_at": datetime.now(timezone.utc).isoformat(),
"read": False
}
try:
db.reference(f'notifications/{user_id}/{notif_id}', app=db_app).set(notif_data)
logger.info(f"Firebase notification sent to {user_id}: {message_content[:50]}...")
firebase_success = True
except firebase_exceptions.FirebaseError as fe:
logger.error(f"Failed to send Firebase notification to {user_id} due to Firebase error: {fe}")
except Exception as e:
logger.error(f"Failed to send Firebase notification to {user_id} due to generic error: {e}")
# --- Secondary Channel: Email via Resend ---
if send_email:
if not resend.api_key or not SENDER_EMAIL:
logger.warning(f"Skipping email for user {user_id} because Resend is not configured.")
return firebase_success # Return status of primary channel
try:
user_profile = db.reference(f'users/{user_id}', app=db_app).get()
if not user_profile or not user_profile.get('email'):
logger.warning(f"Cannot send email to user {user_id}: no profile or email address found.")
return firebase_success
recipient_email = user_profile['email']
# Basic email validation
if '@' not in recipient_email or '.' not in recipient_email.split('@')[1]:
logger.warning(f"Cannot send email to user {user_id}: invalid email format ('{recipient_email}').")
return firebase_success
# Use message_content as fallback for email body if not provided
html_content = email_body if email_body else f"
{message_content}
"
params = {
"from": SENDER_EMAIL,
"to": [recipient_email],
"subject": email_subject or "New Notification from Tunasonga Agri",
"html": html_content,
}
email_response = resend.Emails.send(params)
logger.info(f"Email dispatched to {recipient_email} via Resend. ID: {email_response['id']}")
except firebase_exceptions.FirebaseError as fe_db:
logger.error(f"Email dispatch failed for {user_id}: Could not fetch user profile from Firebase. Error: {fe_db}")
except Exception as e_resend:
# This catches errors from the resend.Emails.send() call
logger.error(f"Email dispatch failed for {user_id} ({recipient_email}). Resend API Error: {e_resend}")
return firebase_success
#--- Authentication Endpoints ---
@app.route('/api/auth/signup', methods=['POST'])
def signup():
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
uid = None # For error context
data_req = {} # For error context
try:
data_req = request.get_json()
email, password, name = data_req.get('email'), data_req.get('password'), data_req.get('name')
phone_number = data_req.get('phone_number')
if not email or not password or not name: return jsonify({'error': 'Email, password, and name are required'}), 400
user_record = auth.create_user(email=email, password=password, display_name=name, app=db_app)
uid = user_record.uid
user_data = ensure_user_profile_exists(uid, email, name, phone_number)
if not user_data:
try: auth.delete_user(uid, app=db_app)
except Exception as e_del: logger.error(f"Failed to rollback auth user {uid} after DB profile error: {e_del}")
return jsonify({'error': 'Failed to create user profile in database.'}), 500
return jsonify({'success': True, 'user': {'uid': uid, **user_data}}), 201
except auth.EmailAlreadyExistsError: return jsonify({'error': 'Email already exists.'}), 409
except Exception as e: return handle_route_errors(e, uid_context=f"signup attempt for {data_req.get('email', 'N/A')}")
@app.route('/api/auth/google-signin', methods=['POST'])
def google_signin():
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
uid = None # For error context
try:
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '): return jsonify({'error': 'Missing or invalid token format for Google Sign-In'}), 401
id_token = auth_header.split(' ')[1]
decoded_token = auth.verify_id_token(id_token, app=db_app)
uid, email, name = decoded_token['uid'], decoded_token.get('email'), decoded_token.get('name')
user_data = ensure_user_profile_exists(uid, email, name)
if not user_data: return jsonify({'error': 'Failed to create or retrieve user profile in database.'}), 500
return jsonify({'success': True, 'user': {'uid': uid, **user_data}}), 200
except auth.InvalidIdTokenError: return jsonify({'error': 'Invalid ID token from Google.'}), 401
except Exception as e: return handle_route_errors(e, uid_context=uid)
#--- User Profile Endpoint ---
@app.route('/api/user/profile', methods=['GET', 'PUT'])
def user_profile():
auth_header = request.headers.get('Authorization')
uid = None
try:
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
uid = verify_token(auth_header)
user_ref = db.reference(f'users/{uid}', app=db_app)
if request.method == 'GET':
user_data = user_ref.get()
if not user_data: return jsonify({'error': 'User profile not found in database.'}), 404
return jsonify({'uid': uid, **user_data}), 200
if request.method == 'PUT':
data = request.get_json()
update_data = {}
if 'name' in data: update_data['name'] = data['name']
if 'location' in data: update_data['location'] = data['location'] # Keep updating legacy text field
if 'phone_number' in data: update_data['phone_number'] = data['phone_number']
# --- MODIFIED: Handle new location_coords object ---
if 'location_coords' in data and isinstance(data['location_coords'], dict):
lat = data['location_coords'].get('latitude')
lon = data['location_coords'].get('longitude')
if isinstance(lat, (int, float)) and isinstance(lon, (int, float)):
# Validate coordinate ranges
if -90 <= lat <= 90 and -180 <= lon <= 180:
update_data['location_coords'] = {'latitude': lat, 'longitude': lon}
update_data['geohash'] = _calculate_geohash(lat, lon)
else:
return jsonify({'error': 'Invalid latitude or longitude values.'}), 400
else:
return jsonify({'error': 'location_coords must contain valid latitude and longitude numbers.'}), 400
if not update_data: return jsonify({'error': 'No updateable fields provided'}), 400
user_ref.update(update_data)
updated_profile = user_ref.get()
return jsonify({'success': True, 'user': {'uid': uid, **updated_profile}}), 200
except Exception as e: return handle_route_errors(e, uid_context=uid)
#--- Role Management Endpoints ---
@app.route('/api/user/roles/apply', methods=['POST'])
def apply_for_role():
auth_header = request.headers.get('Authorization')
uid = None
try:
if not FIREBASE_INITIALIZED or not bucket: return jsonify({'error': 'Server configuration error (Firebase/Storage).'}), 503
uid = verify_token(auth_header)
role_applied_for = request.form.get('role')
if not role_applied_for in ['farmer', 'buyer', 'transporter']: return jsonify({'error': 'Invalid role specified'}), 400
user_data = db.reference(f'users/{uid}', app=db_app).get()
if not user_data: return jsonify({'error': 'User profile not found to apply for role.'}), 404
if user_data.get('roles', {}).get(role_applied_for, False): return jsonify({'error': f'You already have the {role_applied_for} role.'}), 400
if user_data.get('role_applications', {}).get(role_applied_for) == 'pending': return jsonify({'error': f'Your application for {role_applied_for} is already pending.'}), 400
application_id = str(uuid.uuid4())
application_data = {'user_id': uid, 'role': role_applied_for, 'status': 'pending', 'submitted_at': datetime.now(timezone.utc).isoformat(), 'documents': []}
uploaded_doc_references = {}
for doc_type_key in request.files:
file = request.files[doc_type_key]
if file and file.filename:
safe_filename = os.path.basename(file.filename)
filename_in_storage = f"user_documents/{uid}/{role_applied_for}{doc_type_key}{str(uuid.uuid4())}_{safe_filename}"
blob = bucket.blob(filename_in_storage)
blob.upload_from_file(file.stream)
blob.make_public()
doc_type_clean = doc_type_key.replace('_upload', '')
application_data['documents'].append({'type': doc_type_clean, 'path': blob.public_url, 'filename': safe_filename})
uploaded_doc_references[doc_type_clean] = blob.public_url
db.reference(f'role_applications/{application_id}', app=db_app).set(application_data)
user_ref = db.reference(f'users/{uid}', app=db_app)
user_ref.child('role_applications').child(role_applied_for).set('pending')
if uploaded_doc_references: user_ref.child('document_references').update(uploaded_doc_references)
return jsonify({'success': True, 'message': f'Application for {role_applied_for} submitted.', 'application_id': application_id}), 201
except Exception as e: return handle_route_errors(e, uid_context=uid)
#--- Admin Endpoints ---
@app.route('/api/admin/dashboard-stats', methods=['GET'])
def admin_dashboard_stats():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
stats = {
'total_users': 0, 'users_by_role': {'farmer': 0, 'buyer': 0, 'transporter': 0},
'pending_role_applications': 0, 'active_listings_produce': 0, 'active_listings_demand': 0,
'pending_listings': 0, 'active_deals': 0, 'pending_deals_admin_approval': 0,
'admin_profile': db.reference(f'users/{admin_uid}', app=db_app).get()
}
all_users = db.reference('users', app=db_app).get() or {}
stats['total_users'] = len(all_users)
for uid_loop, u_data in all_users.items():
if u_data and u_data.get('roles', {}).get('farmer'): stats['users_by_role']['farmer'] += 1
if u_data and u_data.get('roles', {}).get('buyer'): stats['users_by_role']['buyer'] += 1
if u_data and u_data.get('roles', {}).get('transporter'): stats['users_by_role']['transporter'] += 1
pending_roles_apps = db.reference('role_applications', app=db_app).order_by_child('status').equal_to('pending').get() or {}
stats['pending_role_applications'] = len(pending_roles_apps)
all_listings = db.reference('listings', app=db_app).get() or {}
for lid, ldata in all_listings.items():
if ldata and ldata.get('status') == 'active':
if ldata.get('listing_type') == 'produce': stats['active_listings_produce'] += 1
elif ldata.get('listing_type') == 'demand': stats['active_listings_demand'] += 1
elif ldata and ldata.get('status') == 'pending_approval': stats['pending_listings'] += 1
all_deals = db.reference('deals', app=db_app).get() or {}
for did, ddata in all_deals.items():
if ddata and ddata.get('status') == 'active': stats['active_deals'] += 1
elif ddata and ddata.get('status') == 'accepted_by_farmer': stats['pending_deals_admin_approval'] += 1
return jsonify(stats), 200
except Exception as e: return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/users', methods=['GET'])
def admin_list_users():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
all_users = db.reference('users', app=db_app).get() or {}
return jsonify(all_users), 200
except Exception as e: return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/users/', methods=['GET'])
def admin_get_user(target_uid):
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
user_data = db.reference(f'users/{target_uid}', app=db_app).get()
if not user_data: return jsonify({'error': 'User not found'}), 404
return jsonify(user_data), 200
except Exception as e: return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/users//update', methods=['POST'])
def admin_update_user(target_uid):
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json()
update_payload = {}
if 'suspended' in data and isinstance(data['suspended'], bool): update_payload['suspended'] = data['suspended']
if 'is_admin' in data and isinstance(data['is_admin'], bool):
if target_uid == admin_uid and not data['is_admin']: return jsonify({'error': "Admin cannot remove own admin status."}), 400
update_payload['is_admin'] = data['is_admin']
if 'is_facilitator' in data and isinstance(data['is_facilitator'], bool): update_payload['is_facilitator'] = data['is_facilitator']
if 'roles' in data and isinstance(data['roles'], dict):
valid_roles = {r: v for r, v in data['roles'].items() if r in ['farmer', 'buyer', 'transporter'] and isinstance(v, bool)}
if valid_roles: update_payload['roles'] = valid_roles
if not update_payload: return jsonify({'error': 'No valid fields to update provided'}), 400
db.reference(f'users/{target_uid}', app=db_app).update(update_payload)
updated_user = db.reference(f'users/{target_uid}', app=db_app).get()
return jsonify({'success': True, 'message': f'User {target_uid} updated.', 'user': updated_user}), 200
except Exception as e: return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/facilitators', methods=['GET'])
def admin_list_facilitators():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
all_users = db.reference('users', app=db_app).order_by_child('is_facilitator').equal_to(True).get() or {}
return jsonify(all_users), 200
except Exception as e: return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/users/create', methods=['POST'])
def admin_create_user():
auth_header = request.headers.get('Authorization')
admin_uid = None
uid_new_user = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json()
email, password, name = data.get('email'), data.get('password'), data.get('name')
phone_number, is_facilitator_val = data.get('phone_number'), data.get('is_facilitator', False)
if not email or not password or not name: return jsonify({'error': 'Email, password, and name are required'}), 400
user_record = auth.create_user(email=email, password=password, display_name=name, app=db_app)
uid_new_user = user_record.uid
user_data = ensure_user_profile_exists(uid_new_user, email, name, phone_number)
if not user_data:
try: auth.delete_user(uid_new_user, app=db_app)
except Exception as e_del: logger.error(f"Rollback auth user failed: {e_del}")
return jsonify({'error': 'Failed to create DB profile.'}), 500
if is_facilitator_val:
db.reference(f'users/{uid_new_user}', app=db_app).update({'is_facilitator': True})
user_data['is_facilitator'] = True
return jsonify({'success': True, 'message': 'User created by admin.', 'user': {'uid': uid_new_user, **user_data}}), 201
except auth.EmailAlreadyExistsError: return jsonify({'error': 'Email already exists.'}), 409
except Exception as e: return handle_route_errors(e, uid_context=admin_uid or uid_new_user)
@app.route('/api/admin/roles/pending', methods=['GET'])
def admin_get_pending_roles():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
apps_ref = db.reference('role_applications', app=db_app).order_by_child('status').equal_to('pending')
pending_apps = apps_ref.get()
return jsonify(pending_apps or {}), 200
except Exception as e: return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/roles//', methods=['POST'])
def admin_action_on_role(application_id, action):
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
if action not in ['approve', 'reject']:
return jsonify({'error': 'Invalid action.'}), 400
app_ref = db.reference(f'role_applications/{application_id}', app=db_app)
application = app_ref.get()
if not application or application.get('status') != 'pending':
return jsonify({'error': 'App not found or not pending.'}), 404
user_id = application['user_id']
role = application['role']
user_profile_ref = db.reference(f'users/{user_id}', app=db_app)
update_time = datetime.now(timezone.utc).isoformat()
message_text = ""
if action == 'approve':
app_ref.update({'status': 'approved', 'reviewed_by': admin_uid, 'reviewed_at': update_time})
user_profile_ref.child('roles').update({role: True})
user_profile_ref.child('role_applications').update({role: 'approved'})
message_text = f"Role {role} for user {user_id} approved."
action_past_tense = "approved"
else: # reject
app_ref.update({'status': 'rejected', 'reviewed_by': admin_uid, 'reviewed_at': update_time})
user_profile_ref.child('role_applications').update({role: 'rejected'})
message_text = f"Role {role} for user {user_id} rejected."
action_past_tense = "rejected"
# --- MODIFIED: Send a rich HTML email for this critical event ---
email_subject = f"Your Tunasonga Agri Application for the '{role.capitalize()}' Role"
email_body = f"""
Application Status Update
Hello,
This is an update regarding your application for the {role.capitalize()} role on the Tunasonga platform.
Your application has been {action_past_tense} by an administrator.
View My Profile
If you have any questions, please contact our support team.
Thank you,
The Tunasonga Agri Team
"""
_send_system_notification(
user_id=user_id,
message_content=f"Your application for the '{role}' role has been {action_past_tense}.",
notif_type="role_status",
link="/profile/roles",
send_email=True,
email_subject=email_subject,
email_body=email_body
)
return jsonify({'success': True, 'message': message_text}), 200
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
#--- Marketplace Endpoints ---
# --- MODIFIED: Listing creation now requires coordinates ---
def _create_listing(uid_lister, listing_type, data):
if not FIREBASE_INITIALIZED: raise ConnectionError("Firebase not ready for listing creation.")
# NEW: location_coords is now a required field, 'location' (text) is optional.
required_fields = ['crop_type', 'quantity', 'location_coords']
if listing_type == "produce": required_fields.extend(['asking_price', 'harvest_date'])
else: required_fields.extend(['quality_specs', 'price_range'])
if not all(field in data for field in required_fields): raise ValueError(f'Missing fields for {listing_type} listing. Required: {required_fields}')
# Validate and process coordinates
location_coords = data.get('location_coords')
if not isinstance(location_coords, dict) or 'latitude' not in location_coords or 'longitude' not in location_coords:
raise ValueError('Invalid location_coords object. Must contain latitude and longitude.')
lat = location_coords['latitude']
lon = location_coords['longitude']
if not (isinstance(lat, (int, float)) and isinstance(lon, (int, float)) and -90 <= lat <= 90 and -180 <= lon <= 180):
raise ValueError('Invalid latitude or longitude values in location_coords.')
listing_id = str(uuid.uuid4())
listing_data = {
'lister_id': uid_lister,
'listing_type': listing_type,
'status': 'pending_approval',
'created_at': datetime.now(timezone.utc).isoformat(),
'geohash': _calculate_geohash(lat, lon), # NEW: Store geohash
**data
}
db.reference(f'listings/{listing_id}', app=db_app).set(listing_data)
return listing_id, listing_data
@app.route('/api/listings/produce', methods=['POST'])
def create_produce_listing():
auth_header = request.headers.get('Authorization')
uid = None
try:
uid = verify_role(auth_header, 'farmer')
listing_id, listing_data = _create_listing(uid, "produce", request.get_json())
return jsonify({'success': True, 'message': 'Produce listing created, pending approval.', 'listing_id': listing_id}), 201
except Exception as e: return handle_route_errors(e, uid_context=uid)
@app.route('/api/listings/demand', methods=['POST'])
def create_demand_listing():
auth_header = request.headers.get('Authorization')
uid = None
try:
uid = verify_role(auth_header, 'buyer')
listing_id, listing_data = _create_listing(uid, "demand", request.get_json())
return jsonify({'success': True, 'message': 'Demand listing created, pending approval.', 'listing_id': listing_id}), 201
except Exception as e: return handle_route_errors(e, uid_context=uid)
@app.route('/api/listings/my', methods=['GET'])
def get_my_listings():
auth_header = request.headers.get('Authorization')
uid = None
try:
uid = verify_token(auth_header)
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
# Get all listings from Firebase
listings_ref = db.reference('listings', app=db_app)
all_listings = listings_ref.get()
if not all_listings or not isinstance(all_listings, dict):
return jsonify({'success': True, 'listings': []}), 200
# Filter listings by the authenticated user's UID
user_listings = []
for listing_id, listing_data in all_listings.items():
if isinstance(listing_data, dict) and listing_data.get('lister_id') == uid:
# Add the listing_id to the listing data for frontend convenience
listing_data['id'] = listing_id
user_listings.append(listing_data)
# Sort by creation date (most recent first) if available
user_listings.sort(key=lambda x: x.get('created_at', ''), reverse=True)
return jsonify({
'success': True,
'listings': user_listings,
'count': len(user_listings)
}), 200
except Exception as e:
return handle_route_errors(e, uid_context=uid)
# MODIFIED: User Update/Remove Own Listing to handle coordinates
@app.route('/api/listings/', methods=['PUT', 'DELETE'])
def user_manage_listing(listing_id):
auth_header = request.headers.get('Authorization')
uid = None
try:
uid = verify_token(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
listing_ref = db.reference(f'listings/{listing_id}', app=db_app)
listing_data = listing_ref.get()
if not listing_data or not isinstance(listing_data, dict):
return jsonify({'error': 'Listing not found.'}), 404
if listing_data.get('lister_id') != uid:
return jsonify({'error': 'Not authorized to manage this listing.'}), 403
if request.method == 'PUT':
data = request.get_json()
update_payload = {}
requires_reapproval = False
# Fields that, if changed, require re-approval
# MODIFIED: 'location_coords' is now a critical field.
critical_fields = ['crop_type', 'quantity', 'location_coords', 'asking_price', 'harvest_date', 'quality_specs', 'price_range']
for field in critical_fields:
if field in data and data[field] != listing_data.get(field):
update_payload[field] = data[field]
requires_reapproval = True
# --- MODIFIED: Handle location_coords update and geohash recalculation ---
if 'location_coords' in update_payload:
coords = update_payload['location_coords']
if isinstance(coords, dict) and 'latitude' in coords and 'longitude' in coords:
lat, lon = coords['latitude'], coords['longitude']
if isinstance(lat, (int, float)) and isinstance(lon, (int, float)):
update_payload['geohash'] = _calculate_geohash(lat, lon)
else:
return jsonify({'error': 'Invalid latitude/longitude in location_coords.'}), 400
else:
return jsonify({'error': 'Invalid location_coords object.'}), 400
# Allow user to change status to inactive/closed without re-approval
if 'status' in data and data['status'] in ['inactive', 'closed']:
update_payload['status'] = data['status']
elif 'status' in data and data['status'] not in ['inactive', 'closed'] and data['status'] != listing_data.get('status'):
# If user tries to set to active or pending, force pending_approval if critical fields changed
if requires_reapproval:
update_payload['status'] = 'pending_approval'
else: # If no critical fields changed, allow status update if valid
update_payload['status'] = data['status'] # e.g., from inactive back to active if no critical changes
if requires_reapproval and update_payload.get('status') != 'pending_approval':
update_payload['status'] = 'pending_approval' # Ensure pending approval if critical fields changed
if not update_payload:
return jsonify({'error': 'No valid fields to update provided or no changes detected.'}), 400
update_payload['last_updated_at'] = datetime.now(timezone.utc).isoformat()
listing_ref.update(update_payload)
message = 'Listing updated successfully.'
if requires_reapproval:
message += ' Listing status set to pending_approval due to significant changes.'
_send_system_notification(uid, f"Your listing for '{listing_data.get('crop_type')}' has been updated and is now pending admin approval.", "listing_status", f"/listings/{listing_id}")
else:
_send_system_notification(uid, f"Your listing for '{listing_data.get('crop_type')}' has been updated.", "listing_status", f"/listings/{listing_id}")
updated_listing = listing_ref.get()
return jsonify({'success': True, 'message': message, 'listing': updated_listing}), 200
elif request.method == 'DELETE':
listing_ref.delete()
_send_system_notification(uid, f"Your {listing_data.get('listing_type')} listing for '{listing_data.get('crop_type')}' has been removed.", "listing_removed", f"/my-listings")
return jsonify({'success': True, 'message': 'Listing removed successfully.'}), 200
except Exception as e:
return handle_route_errors(e, uid_context=uid)
@app.route('/api/market/', methods=['GET'])
def get_active_listings(listing_type):
uid_context = "public_market_listings"
try:
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
if listing_type not in ["produce", "demand"]: return jsonify({'error': 'Invalid listing type'}), 400
listings_ref = db.reference('listings', app=db_app).order_by_child('status').equal_to('active')
all_active = listings_ref.get() or {}
type_filtered = {lid: ldata for lid, ldata in all_active.items() if ldata and ldata.get('listing_type') == listing_type}
# NOTE: The text-based location filter below will only work for legacy data that has the 'location' text field.
# A proper distance-based filter for all listings would require a more complex implementation.
crop_filter, loc_filter = request.args.get('crop_type'), request.args.get('location')
final_listings = []
for lid, ldata in type_filtered.items():
match = True
if crop_filter and ldata.get('crop_type', '').lower() != crop_filter.lower(): match = False
if loc_filter and ldata.get('location', '').lower() != loc_filter.lower(): match = False
if match: final_listings.append({'id': lid, **ldata})
return jsonify(final_listings), 200
except Exception as e: return handle_route_errors(e, uid_context=uid_context)
@app.route('/api/admin/listings/pending', methods=['GET'])
def admin_get_pending_listings():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid, _, _ = verify_admin_or_facilitator(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
listings_ref = db.reference('listings', app=db_app).order_by_child('status').equal_to('pending_approval')
return jsonify(listings_ref.get() or {}), 200
except Exception as e: return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/listings/action/', methods=['POST'])
def admin_action_on_listing(listing_id):
auth_header = request.headers.get('Authorization')
reviewer_uid = None
try:
reviewer_uid, _, _ = verify_admin_or_facilitator(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json(); action = data.get('action')
if action not in ['approve', 'reject']: return jsonify({'error': 'Invalid action.'}), 400
listing_ref = db.reference(f'listings/{listing_id}', app=db_app); listing = listing_ref.get()
if not listing or listing.get('status') != 'pending_approval': return jsonify({'error': 'Listing not found or not pending.'}), 404
new_status = 'active' if action == 'approve' else 'rejected'
listing_ref.update({'status': new_status, 'reviewed_by': reviewer_uid, 'reviewed_at': datetime.now(timezone.utc).isoformat()})
_send_system_notification(listing.get('lister_id'), f"Your {listing.get('listing_type')} listing for '{listing.get('crop_type')}' has been {new_status}d.", "listing_status", f"/listings/{listing_id}")
return jsonify({'success': True, 'message': f"Listing {listing_id} {new_status}."}), 200
except Exception as e: return handle_route_errors(e, uid_context=reviewer_uid)
# NEW: Admin Remove Any Listing
@app.route('/api/admin/listings//remove', methods=['DELETE'])
def admin_remove_listing(listing_id):
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid, _, _ = verify_admin_or_facilitator(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
listing_ref = db.reference(f'listings/{listing_id}', app=db_app)
listing_data = listing_ref.get()
if not listing_data or not isinstance(listing_data, dict):
return jsonify({'error': 'Listing not found.'}), 404
lister_id = listing_data.get('lister_id')
listing_type = listing_data.get('listing_type', 'item')
crop_type = listing_data.get('crop_type', 'N/A')
listing_ref.delete()
if lister_id:
_send_system_notification(lister_id, f"Your {listing_type} listing for '{crop_type}' (ID: {listing_id}) has been removed by an administrator.", "listing_removed_by_admin", f"/my-listings")
return jsonify({'success': True, 'message': f'Listing {listing_id} removed by admin/facilitator.'}), 200
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
#--- Deal Management (MODIFIED SECTION) ---
@app.route('/api/deals/propose', methods=['POST'])
def propose_deal():
auth_header = request.headers.get('Authorization')
requester_uid = None # UID of the user making the API call (admin/user)
acting_uid = None # UID of the user on whose behalf the action is taken
try:
logger.info(f"Backend /api/deals/propose received headers: {request.headers}")
logger.info(f"Backend /api/deals/propose received raw data: {request.get_data(as_text=True)}")
requester_uid = verify_token(auth_header) # Can raise exceptions
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json()
if not data:
logger.error("Backend /api/deals/propose: No JSON data received or failed to parse.")
return jsonify({'error': 'Invalid JSON data received.'}), 400
on_behalf_of_uid = data.get('on_behalf_of_uid')
if on_behalf_of_uid:
# If acting on behalf of someone, verify the requester is an admin/facilitator
admin_or_facilitator_uid, _, _ = verify_admin_or_facilitator(auth_header) # This will raise if not admin/facilitator
acting_uid = on_behalf_of_uid
# Verify the user on whose behalf we are acting exists and is a farmer (for WhatsApp farmers)
acting_user_profile = db.reference(f'users/{acting_uid}', app=db_app).get()
if not acting_user_profile:
return jsonify({'error': f'User {acting_uid} (on_behalf_of) not found.'}), 404
# Constraint: WhatsApp farmers can only list produce, so they can only *propose* against demand listings.
# This means the acting_uid must be a farmer.
if not acting_user_profile.get('roles', {}).get('farmer') and acting_user_profile.get('account_type') != 'whatsapp_managed':
return jsonify({'error': f'User {acting_uid} is not a farmer or WhatsApp-managed farmer, cannot propose on their behalf.'}), 403
logger.info(f"Admin/Facilitator {admin_or_facilitator_uid} proposing deal on behalf of {acting_uid}.")
else:
acting_uid = requester_uid # Regular user proposing
listing_id = data.get('listing_id')
proposed_quantity_by_user = data.get('quantity')
proposed_price_per_unit_by_user = data.get('price')
notes = data.get('notes', "")
logger.info(f"Backend /api/deals/propose: Parsed data - listing_id='{listing_id}', quantity='{proposed_quantity_by_user}', price='{proposed_price_per_unit_by_user}'")
if not listing_id or proposed_quantity_by_user is None or proposed_price_per_unit_by_user is None:
logger.error(f"Backend /api/deals/propose: Missing required fields. listing_id: {listing_id}, quantity: {proposed_quantity_by_user}, price: {proposed_price_per_unit_by_user}")
return jsonify({'error': 'listing_id, quantity, and price are required.'}), 400
try:
proposed_quantity_by_user = int(proposed_quantity_by_user)
proposed_price_per_unit_by_user = float(proposed_price_per_unit_by_user)
if proposed_quantity_by_user <= 0 or proposed_price_per_unit_by_user <= 0:
raise ValueError("Quantity and price must be positive numbers.")
except ValueError as ve:
logger.error(f"Backend /api/deals/propose: Invalid quantity or price. Error: {str(ve)}")
return jsonify({'error': f'Invalid quantity or price: {str(ve)}'}), 400
listing_ref = db.reference(f'listings/{listing_id}', app=db_app)
listing_data = listing_ref.get()
if not listing_data or not isinstance(listing_data, dict) or listing_data.get('status') != 'active':
logger.error(f"Backend /api/deals/propose: Target listing {listing_id} not found, not active, or invalid.")
return jsonify({'error': 'Target listing not found, not active, or invalid.'}), 404
original_lister_id = listing_data.get('lister_id')
listing_type = listing_data.get('listing_type') # 'produce' or 'demand'
if original_lister_id == acting_uid:
logger.error(f"Backend /api/deals/propose: User {acting_uid} attempting to deal with own listing {listing_id}.")
return jsonify({'error': 'Cannot propose a deal to your own listing/demand.'}), 400
deal_farmer_id = None
deal_buyer_id = None
deal_notes_prefix = ""
notification_recipient_id = original_lister_id # The other party
if listing_type == 'produce':
# Current user (proposer acting_uid) is a BUYER, proposing to a FARMER's produce listing.
# If acting on behalf of a WhatsApp farmer, they cannot be the buyer here.
if on_behalf_of_uid and acting_user_profile.get('account_type') == 'whatsapp_managed':
return jsonify({'error': 'WhatsApp farmers can only propose deals as a farmer (against demand listings).'}), 403
deal_farmer_id = original_lister_id
deal_buyer_id = acting_uid
deal_notes_prefix = "Buyer's proposal: "
available_quantity = listing_data.get('quantity', 0)
if proposed_quantity_by_user > available_quantity:
logger.error(f"Backend /api/deals/propose: Proposed quantity {proposed_quantity_by_user} exceeds available {available_quantity} for produce listing {listing_id}.")
return jsonify({'error': f'Proposed quantity ({proposed_quantity_by_user}) exceeds available quantity ({available_quantity}) for the listing.'}), 400
elif listing_type == 'demand':
# Current user (proposer acting_uid) is a FARMER, making an offer against a BUYER's demand listing.
deal_farmer_id = acting_uid
deal_buyer_id = original_lister_id
deal_notes_prefix = "Farmer's offer against demand: "
# Optional: Check if farmer's offered quantity matches/is within demand's quantity
# demand_quantity_needed = listing_data.get('quantity', float('inf')) # Assuming demand listing also has 'quantity'
# if proposed_quantity_by_user > demand_quantity_needed:
# logger.error(f"Backend /api/deals/propose: Farmer's offered quantity {proposed_quantity_by_user} exceeds demanded quantity {demand_quantity_needed} for demand {listing_id}.")
# return jsonify({'error': f'Your offered quantity ({proposed_quantity_by_user}) exceeds the demanded quantity ({demand_quantity_needed}).'}), 400
else:
logger.error(f"Backend /api/deals/propose: Invalid target listing type '{listing_type}' for listing {listing_id}.")
return jsonify({'error': 'Invalid target listing type for creating a deal.'}), 400
deal_id = str(uuid.uuid4())
deal_data_to_set = {
'deal_id': deal_id,
'proposer_id': acting_uid, # Use acting_uid here
'listing_id': listing_id,
'farmer_id': deal_farmer_id,
'buyer_id': deal_buyer_id,
'proposed_quantity': proposed_quantity_by_user,
'proposed_price': proposed_price_per_unit_by_user, # This is price_per_unit
'deal_notes': deal_notes_prefix + notes,
'status': 'proposed',
'created_at': datetime.now(timezone.utc).isoformat(),
'chat_room_id': f"deal_{deal_id}"
}
# If an admin acted on behalf, record that
if on_behalf_of_uid:
deal_data_to_set['proxied_by_admin_uid'] = requester_uid
db.reference(f'deals/{deal_id}', app=db_app).set(deal_data_to_set)
_send_system_notification(
notification_recipient_id,
f"You have a new proposal/offer from {acting_uid[:6]}... regarding your {listing_type} for '{listing_data.get('crop_type')}'. Qty: {proposed_quantity_by_user}, Price/Unit: {proposed_price_per_unit_by_user}",
"new_deal_proposal",
f"/deals/{deal_id}"
)
logger.info(f"Backend /api/deals/propose: Deal {deal_id} created successfully by UID {acting_uid} for listing {listing_id}.")
return jsonify({'success': True, 'message': 'Proposal/Offer submitted successfully.', 'deal': deal_data_to_set}), 201
except Exception as e:
# Log the original error before passing to generic handler
logger.error(f"Backend /api/deals/propose: Unhandled exception for UID {acting_uid or requester_uid or 'unknown'}. Error: {str(e)}\n{traceback.format_exc()}")
return handle_route_errors(e, uid_context=acting_uid or requester_uid or "propose_deal_unknown_user")
@app.route('/api/deals//respond', methods=['POST'])
def respond_to_deal(deal_id):
auth_header = request.headers.get('Authorization')
requester_uid = None # UID of the user making the API call (admin/user)
acting_uid = None # UID of the user on whose behalf the action is taken
try:
requester_uid = verify_token(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error'}), 503
data = request.get_json()
response_action = data.get('action') # 'accept' or 'reject'
on_behalf_of_uid = data.get('on_behalf_of_uid')
if on_behalf_of_uid:
# If acting on behalf of someone, verify the requester is an admin/facilitator
admin_or_facilitator_uid, _, _ = verify_admin_or_facilitator(auth_header) # This will raise if not admin/facilitator
acting_uid = on_behalf_of_uid
# Verify the user on whose behalf we are acting exists and is a farmer (for WhatsApp farmers)
acting_user_profile = db.reference(f'users/{acting_uid}', app=db_app).get()
if not acting_user_profile:
return jsonify({'error': f'User {acting_uid} (on_behalf_of) not found.'}), 404
# Constraint: WhatsApp farmers can only list produce, so they would only *respond* as a farmer.
if not acting_user_profile.get('roles', {}).get('farmer') and acting_user_profile.get('account_type') != 'whatsapp_managed':
return jsonify({'error': f'User {acting_uid} is not a farmer or WhatsApp-managed farmer, cannot respond on their behalf.'}), 403
logger.info(f"Admin/Facilitator {admin_or_facilitator_uid} responding to deal {deal_id} on behalf of {acting_uid}.")
else:
acting_uid = requester_uid # Regular user responding
if response_action not in ['accept', 'reject']:
return jsonify({'error': 'Invalid action. Must be "accept" or "reject".'}), 400
deal_ref = db.reference(f'deals/{deal_id}', app=db_app)
deal_data = deal_ref.get()
if not deal_data or not isinstance(deal_data, dict):
return jsonify({'error': 'Deal not found.'}), 404
current_deal_status = deal_data.get('status')
proposer_id = deal_data.get('proposer_id')
farmer_id_in_deal = deal_data.get('farmer_id')
buyer_id_in_deal = deal_data.get('buyer_id')
# Authorization Check: Who is allowed to respond?
can_respond = False
if current_deal_status == 'proposed':
if proposer_id == buyer_id_in_deal and acting_uid == farmer_id_in_deal: # Buyer proposed, Farmer responds
can_respond = True
elif proposer_id == farmer_id_in_deal and acting_uid == buyer_id_in_deal: # Farmer proposed/countered, Buyer responds
can_respond = True
# Add other statuses if a different party needs to respond (e.g., after admin action)
if not can_respond:
logger.warning(f"UID {acting_uid} unauthorized to respond to deal {deal_id}. Deal status: {current_deal_status}, Proposer: {proposer_id}, Farmer: {farmer_id_in_deal}, Buyer: {buyer_id_in_deal}")
return jsonify({'error': 'Not authorized to respond to this deal at its current state.'}), 403
update_time = datetime.now(timezone.utc).isoformat()
# Determine the other party for notification
other_party_id = None
if acting_uid == farmer_id_in_deal:
other_party_id = buyer_id_in_deal
elif acting_uid == buyer_id_in_deal:
other_party_id = farmer_id_in_deal
listing_id = deal_data.get('listing_id')
listing_data_for_notif = {}
if listing_id:
listing_data_for_notif = db.reference(f'listings/{listing_id}', app=db_app).get() or {}
crop_type_for_notif = listing_data_for_notif.get('crop_type', 'your listing/demand')
update_payload = {}
if on_behalf_of_uid:
update_payload['proxied_by_admin_uid'] = requester_uid # Record who proxied the action
if response_action == 'accept':
# Quantity check (if applicable, e.g., if responding to a proposal against a produce listing)
if proposer_id == buyer_id_in_deal and acting_uid == farmer_id_in_deal: # Farmer accepting buyer's proposal
if listing_id and isinstance(listing_data_for_notif, dict):
available_quantity = listing_data_for_notif.get('quantity', 0)
proposed_quantity = deal_data.get('proposed_quantity', 0)
if proposed_quantity > available_quantity:
deal_ref.update({
'status': 'rejected_by_system_insufficient_qty',
'system_rejection_at': update_time,
'system_rejection_reason': f'Listing quantity ({available_quantity}) insufficient for deal quantity ({proposed_quantity}) at time of acceptance.'
})
if other_party_id:
_send_system_notification(other_party_id, f"Your deal proposal for '{crop_type_for_notif}' could not be accepted due to insufficient stock.", "deal_status_update", f"/deals/{deal_id}")
return jsonify({'success': False, 'error': 'Deal could not be accepted. Listing quantity is no longer sufficient.'}), 409
accepted_by_field = ""
new_status = ""
if acting_uid == farmer_id_in_deal:
accepted_by_field = "farmer_accepted_at"
new_status = "accepted_by_farmer" # Farmer accepts buyer's proposal
notification_message_to_other_party = f"Your deal proposal for '{crop_type_for_notif}' has been ACCEPTED by the farmer. It is now pending admin approval."
elif acting_uid == buyer_id_in_deal:
accepted_by_field = "buyer_accepted_at"
new_status = "accepted_by_buyer" # Buyer accepts farmer's offer/counter
notification_message_to_other_party = f"Your offer/counter-offer for '{crop_type_for_notif}' has been ACCEPTED by the buyer. It is now pending admin approval."
else: # Should not happen due to auth check
return jsonify({'error': 'Internal error determining accepter role.'}), 500
update_payload.update({'status': new_status, accepted_by_field: update_time, 'last_responder_id': acting_uid})
deal_ref.update(update_payload)
if other_party_id:
_send_system_notification(other_party_id, notification_message_to_other_party, "deal_status_update", f"/deals/{deal_id}")
# Notify admins
admins_ref = db.reference('users', app=db_app).order_by_child('is_admin').equal_to(True).get()
if admins_ref:
for admin_id_loop, _ in admins_ref.items():
_send_system_notification(admin_id_loop, f"Deal ID {deal_id} for '{crop_type_for_notif}' has been accepted by {acting_uid[:6]}... and needs your approval.", "admin_deal_approval_needed", f"/admin/deals/pending") # Path for admin to see pending deals
return jsonify({'success': True, 'message': f'Deal accepted by {("farmer" if acting_uid == farmer_id_in_deal else "buyer")}, pending admin approval.'}), 200
elif response_action == 'reject':
rejected_by_field = ""
new_status = ""
if acting_uid == farmer_id_in_deal:
rejected_by_field = "farmer_rejected_at" # Or just 'responded_at'
new_status = "rejected_by_farmer"
notification_message_to_other_party = f"Your deal proposal for '{crop_type_for_notif}' has been REJECTED by the farmer."
elif acting_uid == buyer_id_in_deal:
rejected_by_field = "buyer_rejected_at"
new_status = "rejected_by_buyer"
notification_message_to_other_party = f"Your offer/counter-offer for '{crop_type_for_notif}' has been REJECTED by the buyer."
else: # Should not happen
return jsonify({'error': 'Internal error determining rejector role.'}), 500
update_payload.update({'status': new_status, rejected_by_field: update_time, 'last_responder_id': acting_uid})
deal_ref.update(update_payload)
if other_party_id:
_send_system_notification(other_party_id, notification_message_to_other_party, "deal_status_update", f"/deals/{deal_id}")
return jsonify({'success': True, 'message': f'Deal rejected by {("farmer" if acting_uid == farmer_id_in_deal else "buyer")}.'}), 200
except Exception as e:
return handle_route_errors(e, uid_context=acting_uid or requester_uid)
@app.route('/api/deals//complete', methods=['POST'])
def complete_deal_by_admin(deal_id):
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error'}), 503
deal_ref = db.reference(f'deals/{deal_id}', app=db_app)
deal_data = deal_ref.get()
if not deal_data or not isinstance(deal_data, dict):
return jsonify({'error': 'Deal not found.'}), 404
if deal_data.get('status') != 'active':
if not (deal_data.get('status') == 'active' and deal_data.get('transport_status') in ['transporter_accepted', 'in_transit', 'transport_completed', None]):
return jsonify({'error': f"Deal cannot be completed by admin from current status: '{deal_data.get('status')}' / transport: '{deal_data.get('transport_status')}'."}), 400
listing_id = deal_data.get('listing_id')
deal_quantity = deal_data.get('proposed_quantity', 0)
if not listing_id or not isinstance(deal_quantity, (int, float)) or deal_quantity <= 0:
return jsonify({'error': 'Deal data is incomplete (missing listing_id or invalid quantity). Cannot complete.'}), 400
listing_ref = db.reference(f'listings/{listing_id}', app=db_app)
def update_listing_transaction(current_listing_data_tx):
if not current_listing_data_tx or not isinstance(current_listing_data_tx, dict):
logger.error(f"CompleteDealByAdmin: Listing {listing_id} not found or malformed during transaction for deal {deal_id}.")
return current_listing_data_tx
current_listing_quantity = current_listing_data_tx.get('quantity', 0)
new_listing_quantity = current_listing_quantity - deal_quantity
updates = {}
if new_listing_quantity <= 0:
updates['quantity'] = 0
updates['status'] = 'closed'
else:
updates['quantity'] = new_listing_quantity
current_listing_data_tx.update(updates)
return current_listing_data_tx
transaction_result = listing_ref.transaction(update_listing_transaction)
if transaction_result is None and listing_ref.get() is not None :
logger.warning(f"CompleteDealByAdmin: Transaction to update listing {listing_id} for deal {deal_id} was aborted or listing became null. Deal will be marked complete, but listing quantity may not be updated.")
elif listing_ref.get() is None:
logger.warning(f"CompleteDealByAdmin: Listing {listing_id} not found for deal {deal_id}. Deal will be marked complete, but listing quantity not updated.")
else:
logger.info(f"Deal {deal_id} completion: Listing {listing_id} quantity updated via transaction.")
deal_updates = {
'status': 'completed',
'completed_at': datetime.now(timezone.utc).isoformat(),
'completed_by': admin_uid
}
if deal_data.get('assigned_transporter_id'):
deal_updates['transport_status'] = 'transport_completed'
deal_ref.update(deal_updates)
farmer_id = deal_data.get('farmer_id')
buyer_id = deal_data.get('buyer_id')
transporter_id = deal_data.get('assigned_transporter_id')
listing_data_for_notif = db.reference(f'listings/{listing_id}', app=db_app).get() or {}
crop_type_for_notif = listing_data_for_notif.get('crop_type', 'your item')
if farmer_id:
_send_system_notification(farmer_id, f"Admin has marked deal {deal_id} for '{crop_type_for_notif}' as COMPLETED.", "deal_completed_by_admin", f"/deals/{deal_id}")
if buyer_id:
_send_system_notification(buyer_id, f"Admin has marked deal {deal_id} for '{crop_type_for_notif}' as COMPLETED.", "deal_completed_by_admin", f"/deals/{deal_id}")
if transporter_id and deal_data.get('assigned_transporter_id'):
_send_system_notification(transporter_id, f"Admin has marked transport job for deal {deal_id} as COMPLETED.", "transport_job_completed_by_admin", f"/transporter/jobs/{deal_id}")
return jsonify({'success': True, 'message': f'Deal {deal_id} marked as completed by admin. Listing quantity updated (if applicable).'}), 200
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/deals/my', methods=['GET'])
def get_my_deals():
auth_header = request.headers.get('Authorization')
uid = None
try:
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
uid = verify_token(auth_header)
if not uid:
return jsonify({'error': 'Invalid or expired token / verification failed.'}), 401
all_deals_ref = db.reference('deals', app=db_app)
all_deals = all_deals_ref.get()
my_deals = {}
if all_deals:
for deal_id, deal_data_loop in all_deals.items():
if isinstance(deal_data_loop, dict) and \
(deal_data_loop.get('buyer_id') == uid or \
deal_data_loop.get('farmer_id') == uid or \
deal_data_loop.get('assigned_transporter_id') == uid):
deal_data_with_id = deal_data_loop.copy()
deal_data_with_id['deal_id'] = deal_id
my_deals[deal_id] = deal_data_with_id
return jsonify(my_deals), 200
except Exception as e_auth_related:
return handle_route_errors(e_auth_related, uid_context=uid)
# NEW: Admin Remove Any Deal
@app.route('/api/admin/deals//remove', methods=['DELETE'])
def admin_remove_deal(deal_id):
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid, _, _ = verify_admin_or_facilitator(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
deal_ref = db.reference(f'deals/{deal_id}', app=db_app)
deal_data = deal_ref.get()
if not deal_data or not isinstance(deal_data, dict):
return jsonify({'error': 'Deal not found.'}), 404
farmer_id = deal_data.get('farmer_id')
buyer_id = deal_data.get('buyer_id')
transporter_id = deal_data.get('assigned_transporter_id')
listing_id = deal_data.get('listing_id')
crop_type = "N/A"
if listing_id:
listing_details = db.reference(f'listings/{listing_id}', app=db_app).get()
if listing_details:
crop_type = listing_details.get('crop_type', 'N/A')
deal_ref.delete()
message_to_parties = f"Deal ID {deal_id} for '{crop_type}' has been removed by an administrator."
if farmer_id:
_send_system_notification(farmer_id, message_to_parties, "deal_removed_by_admin", f"/my-deals")
if buyer_id:
_send_system_notification(buyer_id, message_to_parties, "deal_removed_by_admin", f"/my-deals")
if transporter_id:
_send_system_notification(transporter_id, f"Your transport job for deal {deal_id} has been cancelled by an administrator.", "transport_job_cancelled_by_admin", f"/transporter/jobs")
return jsonify({'success': True, 'message': f'Deal {deal_id} removed by admin/facilitator.'}), 200
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
# Removed the admin_create_manual_deal endpoint as its functionality is now integrated into propose_deal and respond_to_deal.
#--- END OF MODIFIED DEAL MANAGEMENT SECTION ---
#--- TRANSPORTER ENDPOINTS (NEW BLOCK) ---
# --- NEW: Find Nearest Transporters Endpoint ---
@app.route('/api/transporters/find-nearest', methods=['POST'])
def find_nearest_transporters():
auth_header = request.headers.get('Authorization')
uid = None
try:
# Any authenticated user can find transporters
uid = verify_token(auth_header)
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json()
lat = data.get('latitude')
lon = data.get('longitude')
radius_km = data.get('radius_km', 50) # Default search radius of 50km
if not isinstance(lat, (int, float)) or not isinstance(lon, (int, float)):
return jsonify({'error': 'Valid latitude and longitude are required.'}), 400
if not (isinstance(radius_km, (int, float)) and radius_km > 0):
return jsonify({'error': 'A valid, positive search radius (radius_km) is required.'}), 400
origin_coords = {'latitude': lat, 'longitude': lon}
# 1. Get the 9-box geohash query area
geohash_area_to_query = _get_geohash_query_area(lat, lon)
candidate_transporters = {} # Use dict to avoid duplicates
# 2. Query Firebase for each geohash box
# NOTE: This performs 9 separate queries. For very high traffic,
# a more advanced data structure/service might be needed.
for gh in geohash_area_to_query:
# Query users by geohash. Indexing '.indexOn": ["geohash"]' in Firebase rules for 'users' is critical for performance.
query_results = db.reference('users', app=db_app).order_by_child('geohash').equal_to(gh).get()
if query_results:
for user_id, user_data in query_results.items():
# 3. Filter for actual transporters with valid coordinates
if user_data and user_data.get('roles', {}).get('transporter') and user_data.get('location_coords'):
candidate_transporters[user_id] = user_data
# 4. Perform precise distance calculation and filtering
transporters_in_radius = []
for user_id, user_data in candidate_transporters.items():
distance = _calculate_distance(origin_coords, user_data['location_coords'])
if distance <= radius_km:
# Add distance to the user data before returning
user_data_with_dist = user_data.copy()
user_data_with_dist['uid'] = user_id
user_data_with_dist['distance_km'] = round(distance, 2)
transporters_in_radius.append(user_data_with_dist)
# 5. Sort the final list by distance
transporters_in_radius.sort(key=lambda x: x['distance_km'])
return jsonify(transporters_in_radius), 200
except Exception as e:
return handle_route_errors(e, uid_context=uid)
@app.route('/api/admin/deals//assign-transporter', methods=['POST'])
def admin_assign_transporter(deal_id):
auth_header = request.headers.get('Authorization')
reviewer_uid = None
try:
reviewer_uid, _, _ = verify_admin_or_facilitator(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json()
transporter_id = data.get('transporter_id')
initial_offer_amount = data.get('initial_offer_amount')
initial_currency = data.get('initial_currency', 'USD')
proposed_pickup_date = data.get('proposed_pickup_date')
notes_for_transporter = data.get('notes_for_transporter', "")
if not transporter_id or initial_offer_amount is None or not proposed_pickup_date:
return jsonify({'error': 'transporter_id, initial_offer_amount, and proposed_pickup_date are required.'}), 400
try:
initial_offer_amount = float(initial_offer_amount)
except ValueError:
return jsonify({'error': 'initial_offer_amount must be a valid number.'}), 400
deal_ref = db.reference(f'deals/{deal_id}', app=db_app)
deal_data = deal_ref.get()
if not deal_data:
return jsonify({'error': 'Deal not found.'}), 404
if deal_data.get('status') != 'active': # Only assign transport to active deals
return jsonify({'error': f"Deal is not active. Current status: {deal_data.get('status')}"}), 400
transporter_profile_ref = db.reference(f'users/{transporter_id}', app=db_app)
transporter_profile = transporter_profile_ref.get()
if not transporter_profile or not transporter_profile.get('roles', {}).get('transporter'):
return jsonify({'error': 'Invalid transporter ID or user is not a transporter.'}), 400
transport_offer_details = {
"amount": initial_offer_amount,
"currency": initial_currency,
"pickup_date_proposed": proposed_pickup_date,
"delivery_date_proposed": data.get("proposed_delivery_date", ""),
"notes": notes_for_transporter,
"offered_by": "admin", # Or reviewer_uid for more specific tracking
"offer_timestamp": datetime.now(timezone.utc).isoformat()
}
update_payload = {
'assigned_transporter_id': transporter_id,
'transport_status': 'assigned', # Job is now assigned to the transporter
'transport_offer': transport_offer_details,
'last_transport_update_by': reviewer_uid,
'last_transport_update_at': datetime.now(timezone.utc).isoformat()
}
history_entry_key = db.reference(f'deals/{deal_id}/transport_job_history', app=db_app).push().key
history_entry = {
"action": "assigned_to_transporter",
"user_id": reviewer_uid, # Admin/Facilitator who assigned
"transporter_id": transporter_id,
"offer": transport_offer_details,
"timestamp": datetime.now(timezone.utc).isoformat()
}
update_payload[f'transport_job_history/{history_entry_key}'] = history_entry
deal_ref.update(update_payload)
# Denormalize for transporter's easy query
transporter_job_summary = {
'deal_id': deal_id,
'farmer_id': deal_data.get('farmer_id'),
'buyer_id': deal_data.get('buyer_id'),
'listing_id': deal_data.get('listing_id'),
'transport_status': 'assigned',
'assigned_at': datetime.now(timezone.utc).isoformat(),
'offer': transport_offer_details
# Add other relevant summary fields from the deal if needed
}
db.reference(f'transporter_jobs/{transporter_id}/{deal_id}', app=db_app).set(transporter_job_summary)
_send_system_notification(
transporter_id,
f"You have been assigned a new transport job for deal ID: {deal_id}. Please review and respond.",
"new_transport_job",
f"/transporter/jobs/{deal_id}" # Example link for frontend
)
return jsonify({'success': True, 'message': f'Transport job for deal {deal_id} assigned to transporter {transporter_id}.'}), 200
except Exception as e:
return handle_route_errors(e, uid_context=reviewer_uid)
@app.route('/api/transporter/jobs/pending', methods=['GET'])
def transporter_get_pending_jobs():
auth_header = request.headers.get('Authorization')
transporter_uid = None
try:
transporter_uid = verify_role(auth_header, 'transporter')
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
# Querying denormalized transporter_jobs for efficiency
pending_jobs_ref = db.reference(f'transporter_jobs/{transporter_uid}', app=db_app)\
.order_by_child('transport_status')\
.equal_to('assigned')
pending_jobs_data = pending_jobs_ref.get() or {}
# If you need full deal details, you might fetch them based on deal_id from pending_jobs_data
# For now, returning the summary stored in transporter_jobs
return jsonify(pending_jobs_data), 200
except Exception as e:
return handle_route_errors(e, uid_context=transporter_uid)
@app.route('/api/transporter/deals//job-action', methods=['POST'])
def transporter_job_action(deal_id):
auth_header = request.headers.get('Authorization')
transporter_uid = None
try:
transporter_uid = verify_role(auth_header, 'transporter')
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json()
action = data.get('action') # "accept", "reject", "counter_offer"
deal_ref = db.reference(f'deals/{deal_id}', app=db_app)
deal_data = deal_ref.get()
if not deal_data or not isinstance(deal_data, dict):
return jsonify({'error': 'Deal (transport job) not found.'}), 404
if deal_data.get('assigned_transporter_id') != transporter_uid:
return jsonify({'error': 'You are not assigned to this transport job.'}), 403
current_transport_status = deal_data.get('transport_status')
update_payload = {
'last_transport_update_by': transporter_uid,
'last_transport_update_at': datetime.now(timezone.utc).isoformat()
}
history_action_type = ""
notification_message_to_admin = ""
new_transport_status = "" # Will hold the new status for the deal and transporter_jobs
if action == "accept":
if current_transport_status != 'assigned' and current_transport_status != 'admin_reoffered': # Assuming admin can re-offer
return jsonify({'error': f'Job cannot be accepted from current status: {current_transport_status}'}), 400
new_transport_status = 'transporter_accepted'
update_payload['transport_status'] = new_transport_status
update_payload['transport_offer_accepted_at'] = datetime.now(timezone.utc).isoformat()
if data.get("acceptance_notes"): # Store notes if provided
update_payload['transport_acceptance_notes'] = data.get("acceptance_notes")
history_action_type = "job_accepted"
notification_message_to_admin = f"Transporter {transporter_uid[:6]}... has ACCEPTED the transport job for deal {deal_id}."
elif action == "reject":
if current_transport_status not in ['assigned', 'admin_reoffered', 'admin_rejected_counter']: # Transporter can reject if assigned, or if admin rejected their counter
return jsonify({'error': f'Job cannot be rejected from current status: {current_transport_status}'}), 400
rejection_reason = data.get('rejection_reason', 'No reason provided.')
new_transport_status = 'transporter_rejected'
update_payload['transport_status'] = new_transport_status
update_payload['transport_rejection_reason'] = rejection_reason
# Admin will need to re-assign. Consider if assigned_transporter_id should be nulled here or by admin.
history_action_type = "job_rejected"
notification_message_to_admin = f"Transporter {transporter_uid[:6]}... has REJECTED the transport job for deal {deal_id}. Reason: {rejection_reason}"
elif action == "counter_offer":
# Can counter if initially 'assigned' or if admin 'admin_rejected_counter' to their previous counter
if current_transport_status not in ['assigned', 'admin_rejected_counter']:
return jsonify({'error': f'Cannot make counter-offer from current status: {current_transport_status}'}), 400
counter_amount = data.get('counter_offer_amount')
counter_currency = data.get('currency', deal_data.get('transport_offer', {}).get('currency', 'USD'))
counter_pickup_date = data.get('proposed_pickup_date', deal_data.get('transport_offer', {}).get('pickup_date_proposed'))
counter_delivery_date = data.get('proposed_delivery_date', deal_data.get('transport_offer', {}).get('delivery_date_proposed', ""))
counter_notes = data.get('notes', "")
if counter_amount is None:
return jsonify({'error': 'counter_offer_amount is required for a counter-offer.'}), 400
try: counter_amount = float(counter_amount)
except ValueError: return jsonify({'error': 'counter_offer_amount must be a number.'}), 400
new_transport_status = 'transporter_counter_offer'
update_payload['transport_status'] = new_transport_status
update_payload['transport_offer'] = { # This becomes the new current offer
"amount": counter_amount,
"currency": counter_currency,
"pickup_date_proposed": counter_pickup_date,
"delivery_date_proposed": counter_delivery_date,
"notes": counter_notes,
"offered_by": "transporter",
"offer_timestamp": datetime.now(timezone.utc).isoformat()
}
history_action_type = "job_counter_offered"
notification_message_to_admin = f"Transporter {transporter_uid[:6]}... has made a COUNTER-OFFER for the transport job on deal {deal_id}."
else:
return jsonify({'error': 'Invalid action specified. Must be "accept", "reject", or "counter_offer".'}), 400
# Log history within the deal
history_entry_key = db.reference(f'deals/{deal_id}/transport_job_history', app=db_app).push().key
history_entry_data = {
"action": history_action_type,
"user_id": transporter_uid, # The transporter performing the action
"timestamp": datetime.now(timezone.utc).isoformat(),
}
if action == "counter_offer": history_entry_data["offer_details"] = update_payload['transport_offer']
if action == "reject": history_entry_data["reason"] = rejection_reason
if action == "accept" and data.get("acceptance_notes"): history_entry_data["notes"] = data.get("acceptance_notes")
update_payload[f'transport_job_history/{history_entry_key}'] = history_entry_data
deal_ref.update(update_payload)
# Notify Admin/Facilitator
admins_ref = db.reference('users', app=db_app).order_by_child('is_admin').equal_to(True).get()
if admins_ref:
for admin_id_loop, _ in admins_ref.items(): # Renamed admin_id
_send_system_notification(admin_id_loop, notification_message_to_admin, "transport_job_update", f"/admin/deals/{deal_id}")
# Update denormalized transporter_jobs summary
transporter_job_ref = db.reference(f'transporter_jobs/{transporter_uid}/{deal_id}', app=db_app)
if transporter_job_ref.get(): # Check if summary exists
transporter_job_ref.update({
'transport_status': new_transport_status,
'last_action_at': datetime.now(timezone.utc).isoformat(),
'offer': update_payload.get('transport_offer', deal_data.get('transport_offer'))
})
return jsonify({'success': True, 'message': f'Transport job action "{action}" processed successfully.'}), 200
except Exception as e:
return handle_route_errors(e, uid_context=transporter_uid)
@app.route('/api/admin/deals//respond-to-counter-offer', methods=['POST'])
def admin_respond_to_transporter_counter(deal_id):
auth_header = request.headers.get('Authorization')
reviewer_uid = None
try:
reviewer_uid, _, _ = verify_admin_or_facilitator(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json()
action = data.get('action') # "accept_counter" or "reject_counter"
notes = data.get('notes', "")
if action not in ["accept_counter", "reject_counter"]:
return jsonify({'error': 'Invalid action. Must be "accept_counter" or "reject_counter".'}), 400
deal_ref = db.reference(f'deals/{deal_id}', app=db_app)
deal_data = deal_ref.get()
if not deal_data or not isinstance(deal_data, dict):
return jsonify({'error': 'Deal not found.'}), 404
if deal_data.get('transport_status') != 'transporter_counter_offer':
return jsonify({'error': 'Deal is not in "transporter_counter_offer" status.'}), 400
transporter_id = deal_data.get('assigned_transporter_id')
if not transporter_id: # Should not happen if status is transporter_counter_offer
return jsonify({'error': 'No transporter assigned to this deal for counter-offer response.'}), 400
update_payload = {
'last_transport_update_by': reviewer_uid,
'last_transport_update_at': datetime.now(timezone.utc).isoformat()
}
history_action_type = ""
notification_message_to_transporter = ""
new_transport_status = ""
if action == "accept_counter":
new_transport_status = 'transporter_accepted' # The counter offer becomes the accepted offer
update_payload['transport_status'] = new_transport_status
update_payload['transport_offer_finalized_at'] = datetime.now(timezone.utc).isoformat()
# The current deal_data['transport_offer'] is the transporter's counter, which is now accepted.
history_action_type = "admin_accepted_counter"
notification_message_to_transporter = f"Your counter-offer for transport on deal {deal_id} has been ACCEPTED by admin."
elif action == "reject_counter":
new_transport_status = 'admin_rejected_counter'
update_payload['transport_status'] = new_transport_status
if notes: update_payload['admin_rejection_notes_to_transporter'] = notes # Store admin notes
history_action_type = "admin_rejected_counter"
notification_message_to_transporter = f"Your counter-offer for transport on deal {deal_id} has been REJECTED by admin. {notes}"
# Transporter might be able to make another counter, or admin might re-assign.
# Log history
history_entry_key = db.reference(f'deals/{deal_id}/transport_job_history', app=db_app).push().key
history_entry_data = {
"action": history_action_type,
"user_id": reviewer_uid, # Admin/Facilitator
"timestamp": datetime.now(timezone.utc).isoformat(),
"notes": notes
}
update_payload[f'transport_job_history/{history_entry_key}'] = history_entry_data
deal_ref.update(update_payload)
_send_system_notification(transporter_id, notification_message_to_transporter, "transport_job_update", f"/transporter/jobs/{deal_id}")
# Update denormalized transporter_jobs summary
transporter_job_ref = db.reference(f'transporter_jobs/{transporter_id}/{deal_id}', app=db_app)
if transporter_job_ref.get():
transporter_job_ref.update({
'transport_status': new_transport_status,
'last_action_at': datetime.now(timezone.utc).isoformat()
# Offer details remain as the transporter's last counter offer, which was accepted/rejected
})
return jsonify({'success': True, 'message': f'Response to transporter counter-offer processed: {action}.'}), 200
except Exception as e:
return handle_route_errors(e, uid_context=reviewer_uid)
#--- END OF TRANSPORTER ENDPOINTS ---
#--- ADMIN ENDPOINTS FOR ALL LISTINGS/DEALS (NEW BLOCK) ---
@app.route('/api/admin/listings/all', methods=['GET'])
def admin_get_all_listings():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
listings_ref = db.reference('listings', app=db_app)
all_listings = listings_ref.get() or {}
return jsonify(all_listings), 200
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/deals/all', methods=['GET'])
def admin_get_all_deals():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
deals_ref = db.reference('deals', app=db_app)
all_deals = deals_ref.get() or {}
return jsonify(all_deals), 200
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
#--- END OF ADMIN ALL LISTINGS/DEALS ---
#--- ADMIN ENDPOINTS FOR WHATSAPP-MANAGED FARMER ACCOUNTS ---
@app.route('/api/admin/whatsapp-farmers/create', methods=['POST'])
def admin_create_whatsapp_farmer():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json()
name = data.get('name')
phone_number = data.get('phone_number') # Expecting E.164 format ideally
location = data.get('location', "")
initial_notes = data.get('initial_notes', "")
if not name or not phone_number:
return jsonify({'error': 'Farmer name and phone number are required.'}), 400
# Basic validation for phone number format (can be enhanced)
if not re.match(r"^\+?[1-9]\d{1,14}$", phone_number):
return jsonify({'error': 'Invalid phone number format. Please use E.164 like +263771234567.'}), 400
# Check if phone number is already in use by a fully registered webapp user
# This query can be slow if you have many users. Consider indexing phone_number.
users_ref = db.reference('users', app=db_app)
existing_user_query = users_ref.order_by_child('phone_number').equal_to(phone_number).limit_to_first(1).get()
if existing_user_query:
for existing_uid, existing_user_data in existing_user_query.items():
if existing_user_data.get('account_type') == 'webapp_registered':
return jsonify({'error': f'This phone number is already registered to a web app user (UID: {existing_uid}).'}), 409
# Optionally, handle if already a whatsapp_managed account with this number
# if existing_user_data.get('account_type') == 'whatsapp_managed':
# return jsonify({'error': f'This phone number is already managed for another WhatsApp farmer (UID: {existing_uid}).'}), 409
# Generate a unique ID for this WhatsApp-managed farmer
# This UID will be used in the 'users' node. No Firebase Auth user created at this stage.
farmer_uid = f"wpfarmer_{str(uuid.uuid4())}"
profile_data = {
'uid': farmer_uid, # Store the UID within the profile too for convenience
'name': name,
'phone_number': phone_number,
'email': f"{farmer_uid}@whatsapp.tunasonga.internal", # Placeholder email
'location': location,
'roles': {'farmer': True, 'buyer': False, 'transporter': False},
'account_type': "whatsapp_managed",
'is_placeholder_account': True, # Indicates this was admin-created before web registration
'can_login_webapp': False, # Cannot log in to web app yet
'managed_by_admin_uid': admin_uid,
'created_at': datetime.now(timezone.utc).isoformat(),
'created_by_admin_uid': admin_uid, # Explicitly track creator
'whatsapp_interaction_log': [{ # Start a log
'timestamp': datetime.now(timezone.utc).isoformat(),
'action': 'account_created_by_admin',
'admin_uid': admin_uid,
'notes': f"Initial notes: {initial_notes}" if initial_notes else "Account created."
}]
}
db.reference(f'users/{farmer_uid}', app=db_app).set(profile_data)
return jsonify({
'success': True,
'message': f'WhatsApp-managed farmer account created for {name} ({phone_number}).',
'farmer_uid': farmer_uid,
'profile': profile_data
}), 201
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/whatsapp-farmers', methods=['GET'])
def admin_list_whatsapp_farmers():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
users_ref = db.reference('users', app=db_app)
# Querying for account_type. This requires an index on 'account_type' in Firebase rules for performance.
# ".indexOn": ["account_type"] under your "users" rules.
whatsapp_farmers_query = users_ref.order_by_child('account_type').equal_to('whatsapp_managed').get()
whatsapp_farmers = whatsapp_farmers_query or {}
return jsonify(whatsapp_farmers), 200
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/whatsapp-farmers/', methods=['GET'])
def admin_get_whatsapp_farmer(farmer_uid):
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
farmer_ref = db.reference(f'users/{farmer_uid}', app=db_app)
farmer_data = farmer_ref.get()
if not farmer_data or farmer_data.get('account_type') != 'whatsapp_managed':
return jsonify({'error': 'WhatsApp-managed farmer not found or not of correct type.'}), 404
return jsonify(farmer_data), 200
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/whatsapp-farmers//update', methods=['PUT'])
def admin_update_whatsapp_farmer(farmer_uid):
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED:
return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
farmer_ref = db.reference(f'users/{farmer_uid}', app=db_app)
farmer_data = farmer_ref.get()
if not farmer_data or farmer_data.get('account_type') != 'whatsapp_managed':
return jsonify({'error': 'WhatsApp-managed farmer not found or not of correct type.'}), 404
data = request.get_json()
updates = {}
allowed_fields_to_update = ['name', 'location', 'phone_number'] # Define what admin can change
for field in allowed_fields_to_update:
if field in data:
updates[field] = data[field]
if not updates and 'additional_notes' not in data : # Check if any valid fields or notes are provided
return jsonify({'error': 'No valid fields to update or notes provided.'}), 400
# Log the update action
if data.get('additional_notes'):
log_entry_key = db.reference(f'users/{farmer_uid}/whatsapp_interaction_log', app=db_app).push().key
log_entry = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'action': 'admin_update_notes',
'admin_uid': admin_uid,
'notes': data.get('additional_notes')
}
# To add to the log, we need to fetch existing logs or use a transaction if it's critical
# For simplicity here, we'll just push a new entry.
# If you want to update the main profile and add a log entry atomically,
# you'd construct a single update payload for farmer_ref.update()
db.reference(f'users/{farmer_uid}/whatsapp_interaction_log/{log_entry_key}', app=db_app).set(log_entry)
if updates: # If there are profile fields to update
updates['last_updated_by_admin_uid'] = admin_uid
updates['last_updated_at'] = datetime.now(timezone.utc).isoformat()
farmer_ref.update(updates)
updated_farmer_data = farmer_ref.get() # Get the latest data
return jsonify({
'success': True,
'message': f'WhatsApp-managed farmer {farmer_uid} updated.',
'profile': updated_farmer_data
}), 200
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
#--- END OF WHATSAPP-MANAGED FARMER ACCOUNT ENDPOINTS ---
@app.route('/api/admin/deals/pending', methods=['GET'])
def admin_get_pending_deals():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid, _, _ = verify_admin_or_facilitator(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
deals_ref = db.reference('deals', app=db_app).order_by_child('status').equal_to('accepted_by_farmer')
return jsonify(deals_ref.get() or {}), 200
except Exception as e: return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/admin/deals/action/', methods=['POST'])
def admin_action_on_deal(deal_id):
auth_header = request.headers.get('Authorization')
reviewer_uid = None
try:
reviewer_uid, _, _ = verify_admin_or_facilitator(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json(); action = data.get('action')
if action not in ['approve', 'reject']: return jsonify({'error': 'Invalid action.'}), 400
deal_ref = db.reference(f'deals/{deal_id}', app=db_app); deal_data = deal_ref.get()
if not deal_data or deal_data.get('status') != 'accepted_by_farmer': return jsonify({'error': 'Deal not found or not in correct state for admin action'}), 404
update_time = datetime.now(timezone.utc).isoformat()
farmer_id, buyer_id = deal_data.get('farmer_id'), deal_data.get('buyer_id')
message_text = "" # Renamed
if action == 'approve':
deal_ref.update({'status': 'active', 'admin_approved_by': reviewer_uid, 'admin_approved_at': update_time})
message_text = f"Your deal (ID: {deal_id}) has been approved by admin and is now active."
elif action == 'reject':
deal_ref.update({'status': 'rejected_by_admin', 'admin_rejected_by': reviewer_uid, 'admin_rejected_at': update_time})
message_text = f"Your deal (ID: {deal_id}) has been rejected by admin."
if message_text:
_send_system_notification(farmer_id, message_text, "deal_status_update", f"/deals/{deal_id}")
_send_system_notification(buyer_id, message_text, "deal_status_update", f"/deals/{deal_id}")
return jsonify({'success': True, 'message': message_text or "Action processed."}), 200
except Exception as e: return handle_route_errors(e, uid_context=reviewer_uid)
@app.route('/api/chat/send', methods=['POST'])
def send_chat_message():
auth_header = request.headers.get('Authorization');
uid = None
try:
uid = verify_token(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error'}), 503
data = request.get_json(); chat_room_id, msg_text = data.get('chat_room_id'), data.get('message_text')
if not chat_room_id or not msg_text: return jsonify({'error': 'chat_room_id and message_text required'}), 400
message_id = str(uuid.uuid4())
message_data = {'sender_id': uid, 'text': msg_text, 'timestamp': datetime.now(timezone.utc).isoformat()}
db.reference(f'chat_messages/{chat_room_id}/{message_id}', app=db_app).set(message_data)
return jsonify({'success': True, 'message_id': message_id}), 201
except Exception as e: return handle_route_errors(e, uid_context=uid)
@app.route('/api/chat/', methods=['GET'])
def get_chat_messages(chat_room_id):
auth_header = request.headers.get('Authorization');
uid = None
try:
uid = verify_token(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error'}), 503
messages_ref = db.reference(f'chat_messages/{chat_room_id}', app=db_app).order_by_child('timestamp')
return jsonify(messages_ref.get() or {}), 200
except Exception as e: return handle_route_errors(e, uid_context=uid)
@app.route('/api/admin/notifications/send', methods=['POST'])
def admin_send_notification():
auth_header = request.headers.get('Authorization')
admin_uid = None
try:
admin_uid = verify_admin(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
data = request.get_json()
message_content = data.get('message') # For the in-app notification
target_group = data.get('target_group', 'all')
target_users_list = data.get('target_users', [])
# New email-specific fields from frontend
send_as_email = data.get('send_as_email', False)
email_subject = data.get('email_subject')
email_body_html = data.get('email_body_html')
if not message_content:
return jsonify({'error': 'In-app notification message is required'}), 400
if send_as_email and (not email_subject or not email_body_html):
return jsonify({'error': 'If sending as email, email_subject and email_body_html are required.'}), 400
recipients_uids = set()
all_users_data = db.reference('users', app=db_app).get() or {}
if target_users_list and isinstance(target_users_list, list):
for uid_target in target_users_list:
if uid_target in all_users_data: recipients_uids.add(uid_target)
elif target_group == 'all':
recipients_uids.update(all_users_data.keys())
elif target_group in ['farmers', 'buyers', 'transporters']:
role_key = target_group[:-1] # 'farmers' -> 'farmer'
for uid_loop, u_data in all_users_data.items():
if u_data and u_data.get('roles', {}).get(role_key, False):
recipients_uids.add(uid_loop)
else:
return jsonify({'error': 'Invalid target_group or target_users not provided correctly'}), 400
sent_count = 0
for uid_recipient in recipients_uids:
# Call the upgraded notification function with all parameters
if _send_system_notification(
user_id=uid_recipient,
message_content=message_content,
notif_type="admin_broadcast",
send_email=send_as_email,
email_subject=email_subject,
email_body=email_body_html
):
sent_count += 1
return jsonify({'success': True, 'message': f"Broadcast notification dispatched for {sent_count} user(s)."}), 200
except Exception as e:
return handle_route_errors(e, uid_context=admin_uid)
@app.route('/api/user/notifications', methods=['GET'])
def get_user_notifications():
auth_header = request.headers.get('Authorization');
uid = None
try:
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error'}), 503
uid = verify_token(auth_header)
if not uid: return jsonify({'error': 'Authentication required.'}), 401
notifications_ref = db.reference(f'notifications/{uid}', app=db_app).order_by_child('created_at')
user_notifications = notifications_ref.get() or {}
sorted_notifications = sorted(user_notifications.items(), key=lambda item: item[1]['created_at'], reverse=True)
return jsonify(dict(sorted_notifications)), 200
except Exception as e: return handle_route_errors(e, uid_context=uid)
@app.route('/api/user/notifications//read', methods=['POST'])
def mark_notification_read(notification_id):
auth_header = request.headers.get('Authorization');
uid = None
try:
uid = verify_token(auth_header)
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error'}), 503
notif_ref = db.reference(f'notifications/{uid}/{notification_id}', app=db_app)
if not notif_ref.get(): return jsonify({'error': 'Notification not found'}), 404
notif_ref.update({'read': True, 'read_at': datetime.now(timezone.utc).isoformat()})
return jsonify({'success': True, 'message': 'Notification marked as read.'}), 200
except Exception as e: return handle_route_errors(e, uid_context=uid)
# Helper for AI chat to fetch platform data
def _fetch_platform_data_for_chat(query):
if not FIREBASE_INITIALIZED: return "Firebase not ready.", False
listings_ref = db.reference('listings', app=db_app).order_by_child('status').equal_to('active')
all_active_listings = listings_ref.get() or {}
relevant_listings = []
query_lower = query.lower()
for lid, ldata in all_active_listings.items():
if not ldata: continue
crop_type = ldata.get('crop_type', '').lower()
location = ldata.get('location', '').lower()
listing_type = ldata.get('listing_type', '')
# Simple keyword matching for demonstration
if any(keyword in query_lower for keyword in [crop_type, location, listing_type, 'maize', 'beans', 'harare', 'bulawayo']):
relevant_listings.append(ldata)
if not relevant_listings:
return "No active listings found matching your query.", False
summary = "Active Listings:\n"
for l in relevant_listings[:5]: # Limit to top 5 for brevity
summary += f"- {l.get('crop_type')} ({l.get('listing_type')}) in {l.get('location')}, Qty: {l.get('quantity')}, Price: {l.get('asking_price') or l.get('price_range')}\n"
return summary, True
# Helper for AI chat to fetch price trends
def _get_price_trend_analysis_for_chat(crop_type=None, location=None):
if not FIREBASE_INITIALIZED: return "Firebase not ready.", False
if not gemini_client: return "AI service not available for trend analysis.", False
all_deals = db.reference('deals', app=db_app).order_by_child('status').equal_to('completed').get() or {}
price_data_points = []
for deal_id, deal in all_deals.items():
if not deal: continue
listing_id = deal.get('listing_id')
listing_details = db.reference(f'listings/{listing_id}', app=db_app).get() if listing_id else None
if listing_details:
deal_crop_type, deal_location = listing_details.get('crop_type'), listing_details.get('location')
if crop_type and deal_crop_type and deal_crop_type.lower() != crop_type.lower(): continue
if location and deal_location and deal_location.lower() != location.lower(): continue
price_data_points.append({
'price': deal.get('agreed_price') or deal.get('proposed_price'),
'quantity': deal.get('agreed_quantity') or deal.get('proposed_quantity'),
'date': deal.get('admin_approved_at') or deal.get('created_at'),
'crop': deal_crop_type, 'location': deal_location
})
if not price_data_points:
return "Not enough historical data to generate price trends for the specified criteria.", False
data_summary_for_gemini = f"Recent transactions for {crop_type or 'various crops'} in {location or 'various locations'}:\n"
for point in price_data_points[:10]: # Limit data sent to Gemini
data_summary_for_gemini += f"- Crop: {point.get('crop')}, Price: {point.get('price')}, Qty: {point.get('quantity')}, Date: {point.get('date')}, Loc: {point.get('location')}\n"
prompt = f"""
Analyze agricultural transaction data for Tunasonga Agri. Provide brief price trend analysis (increasing, decreasing, stable? patterns?).
Focus on {crop_type if crop_type else 'common crops'}. Be concise for farmers/buyers. State if data is sparse.
Data: {data_summary_for_gemini}
Analysis:
"""
try:
response_obj = gemini_client.models.generate_content(model='gemini-2.0-flash', contents=[{'parts': [{'text': prompt}]}])
return response_obj.text.strip(), True
except Exception as e:
logger.error(f"Gemini error during price trend analysis: {e}")
return "Could not generate price trend analysis at this time due to an AI service error.", False
@app.route('/api/market/prices/trends', methods=['GET'])
def get_price_trends():
uid_context = "public_price_trends"
response_obj = None
try:
if not gemini_client: return jsonify({'error': 'AI service not available.'}), 503
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error: Firebase not ready.'}), 503
crop_type, location = request.args.get('crop_type'), request.args.get('location')
trend_analysis, success = _get_price_trend_analysis_for_chat(crop_type, location)
if not success:
return jsonify({'message': trend_analysis}), 200 # trend_analysis contains the error message
return jsonify({'crop_type': crop_type, 'location': location, 'trend_analysis': trend_analysis}), 200
except AttributeError as ae:
logger.error(f"Gemini Response Attribute Error in get_price_trends: {ae}. Response object: {response_obj}")
try: trend_analysis = response_obj.candidates[0].content.parts[0].text if response_obj and response_obj.candidates else "Error processing AI structure."
except Exception: trend_analysis = "Error processing AI response structure."
return jsonify({'crop_type': crop_type, 'location': location, 'trend_analysis': trend_analysis, 'error_detail': str(ae)}), 200
except Exception as e: return handle_route_errors(e, uid_context=uid_context)
@app.route("/api/ai/chat", methods=["POST"])
def ai_chat():
auth_header = request.headers.get("Authorization", "")
uid = None
intent = "general_agri_info" # start with a valid default rather than "unknown"
try:
if not FIREBASE_INITIALIZED or not gemini_client:
return jsonify({'error': 'Server or AI service not ready.'}), 503
uid = verify_token(auth_header)
data = request.get_json()
if not data:
return jsonify({"error": "Invalid request data."}), 400
user_message = data.get("message", "").strip()
if not user_message:
return jsonify({"error": "Message cannot be empty."}), 400
# 1) Build a more constrained classification prompt
classify_prompt = f"""
Classify the user’s query into exactly one of these categories (and nothing else):
platform_data_query
price_trend_query
general_agri_info
other
User Query: "{user_message}"
Return exactly one of the four category names above, with no extra words or punctuation.
"""
# 2) Call Gemini for classification, with explicit logging
try:
response_obj_gemini = gemini_client.models.generate_content(
model='gemini-2.0-flash',
contents=[{'parts': [{'text': classify_prompt}]}]
)
raw_intent = response_obj_gemini.text or ""
except Exception as classify_ex:
logger.error(f"Classification call failed: {classify_ex}")
raw_intent = ""
# 3) Normalize and validate
normalized = raw_intent.strip().replace('"', '').lower()
valid_intents = ["platform_data_query", "price_trend_query", "general_agri_info", "other"]
if normalized in valid_intents:
intent = normalized
else:
logger.warning(f"Unexpected or empty classification '{raw_intent}'. Defaulting intent to general_agri_info.")
intent = "general_agri_info"
# 4) Build context based on intent...
context_for_gemini = ""
if intent == "platform_data_query":
platform_data_summary, _ = _fetch_platform_data_for_chat(user_message)
if platform_data_summary:
context_for_gemini += f"Current Platform Data Context:\n{platform_data_summary}\n\n"
elif intent == "price_trend_query":
# … trend logic …
trend_analysis_summary, _ = _get_price_trend_analysis_for_chat(...)
if trend_analysis_summary:
context_for_gemini += f"Price Trend Analysis Context:\n{trend_analysis_summary}\n\n"
# 5) Main answer prompt
main_prompt = f"""
You are Tunasonga Agri Assistant, an AI for an agricultural marketplace in Zimbabwe and SADC.
Your goal is to provide helpful, concise, and accurate information. Your persona is professional, friendly, and supportive of farmers and agri-businesses.
The user's original query intent was classified as: {intent}
{context_for_gemini}
Based on the user's query and any provided context above, please formulate your answer to the user.
User Query: "{user_message}"
Specific Instructions based on intent:
- If the intent was 'platform_data_query': Use the "Current Platform Data Context" to answer. If the context says no items were found, relay that and suggest they browse the marketplace or refine their search. Do not invent listings.
- If the intent was 'price_trend_query': Use the "Price Trend Analysis Context". If the context says trends couldn't be generated, relay that. Do not invent trends.
- For 'general_agri_info': Use your broad agricultural knowledge. Focus on practices relevant to the SADC region, smallholder farmers, climate-smart agriculture, market access, and agri-business development. Provide actionable advice if possible.
- If the query is unclear, classified as "other", or if the context is insufficient for a specific query: Provide a polite general response, ask for clarification, or gently guide the user on how you can help (e.g., "I can help with finding produce, getting price trends, or general farming advice. What would you like to know?").
Keep your answers clear and easy to understand. Avoid overly technical jargon unless necessary and explain it.
Answer:
"""
try:
response_obj_gemini = gemini_client.models.generate_content(
model='gemini-2.0-flash',
contents=[{'parts': [{'text': main_prompt}]}]
)
ai_response_text = response_obj_gemini.text.strip() or "I’m having trouble generating a response right now."
except Exception as answer_ex:
logger.error(f"Answer generation failed: {answer_ex}")
ai_response_text = "I’m having trouble generating a response right now."
# 6) Save to Firebase history
try:
db.reference(f'ai_chat_history/{uid}/{str(uuid.uuid4())}', app=db_app).set({
'user_message': user_message,
'ai_response': ai_response_text,
'intent_classified': intent,
'timestamp': datetime.now(timezone.utc).isoformat()
})
except Exception as chat_history_error:
logger.error(f"Failed to store chat history for UID {uid}: {chat_history_error}")
# 7) Return JSON with valid intent
return jsonify({"response": ai_response_text, "intent": intent})
except AttributeError as ae:
# In this branch, always return a valid default intent
logger.error(f"AttributeError in ai_chat (UID: {uid}): {ae}")
ai_response_text = "I’m having a little trouble understanding that. Could you try rephrasing?"
return jsonify({"response": ai_response_text, "intent": "general_agri_info", "error_detail": "AI_RESPONSE_FORMAT_ISSUE"}), 200
except Exception as e:
# For any other exception, ensure we return a valid intent instead of "unknown"
logger.error(f"Unhandled exception in ai_chat (UID: {uid}): {e}")
return jsonify({"error": str(e), "intent": "general_agri_info"}), 500
@app.route('/api/user/ai-chat-history', methods=['GET'])
def get_ai_chat_history():
auth_header = request.headers.get('Authorization');
uid = None
try:
if not FIREBASE_INITIALIZED: return jsonify({'error': 'Server configuration error'}), 503
uid = verify_token(auth_header)
if not uid: return jsonify({'error': 'Authentication required.'}), 401
history_ref = db.reference(f'ai_chat_history/{uid}', app=db_app).order_by_child('timestamp')
chat_history = history_ref.get() or {}
sorted_history = sorted(chat_history.items(), key=lambda item: item[1]['timestamp'], reverse=True)
return jsonify(dict(sorted_history)), 200
except Exception as e: return handle_route_errors(e, uid_context=uid)
@app.route('/api/deals//payment/initiate', methods=['POST'])
def initiate_payment(deal_id):
auth_header = request.headers.get('Authorization')
buyer_uid = None
try:
if not paynow_client:
return jsonify({'error': 'Payment service is not configured.'}), 503
buyer_uid = verify_token(auth_header)
deal_ref = db.reference(f'deals/{deal_id}', app=db_app)
deal_data = deal_ref.get()
# --- SURGICAL VALIDATION ---
if not deal_data:
return jsonify({'error': 'Deal not found.'}), 404
if deal_data.get('buyer_id') != buyer_uid:
return jsonify({'error': 'You are not authorized to pay for this deal.'}), 403
if deal_data.get('status') != 'active':
return jsonify({'error': 'This deal is not active and cannot be paid for.'}), 400
if deal_data.get('payment', {}).get('status') == 'paid':
return jsonify({'error': 'This deal has already been paid for.'}), 400
# --- BACKEND CALCULATION ---
price = float(deal_data['proposed_price'])
quantity = int(deal_data['proposed_quantity'])
total_amount = round(price * quantity, 2)
# Create a new payment object
payment = paynow_client.create_payment(
f"TNSG-{deal_id}", # A unique reference for this transaction
deal_data.get('buyer_email', 'buyer@example.com') # Get buyer email if stored, otherwise a placeholder
)
payment.add(f"Deal {deal_id} for {deal_data.get('crop_type', 'produce')}", total_amount)
# Send the payment to Paynow
response = paynow_client.send(payment)
if not response.success:
logger.error(f"Paynow initiation failed for deal {deal_id}: {response.error}")
return jsonify({'error': 'Failed to initiate payment with the provider.', 'details': response.error}), 500
# --- UPDATE FIREBASE BEFORE REDIRECT ---
payment_update_payload = {
"status": "pending",
"paynow_reference": response.paynow_reference,
"poll_url": response.poll_url,
"amount": total_amount,
"currency": "USD", # Or make this dynamic if needed
"initiated_at": datetime.now(timezone.utc).isoformat()
}
deal_ref.child('payment').set(payment_update_payload)
# Return the redirect URL to the frontend
return jsonify({
'success': True,
'message': 'Payment initiated. Redirecting...',
'redirect_url': response.redirect_url
}), 200
except Exception as e:
return handle_route_errors(e, uid_context=buyer_uid)
@app.route('/api/payment/webhook/paynow', methods=['POST'])
def paynow_webhook():
try:
# The Paynow SDK does not have a built-in webhook handler, so we do it manually.
# This is a simplified representation. Refer to Paynow docs for the exact fields and hash method.
paynow_data = request.form.to_dict() # Paynow sends form data
logger.info(f"Received Paynow webhook: {paynow_data}")
paynow_reference = paynow_data.get('paynowreference')
merchant_reference = paynow_data.get('reference') # This is our "TNSG-deal_id"
status = paynow_data.get('status', '').lower()
received_hash = paynow_data.get('hash')
# --- CRITICAL: HASH VERIFICATION ---
# You MUST generate a hash from the received data using your Integration Key
# and compare it to the received_hash. If they don't match, discard the request.
# Example (pseudo-code, check Paynow docs for exact implementation):
# calculated_hash = generate_hash(paynow_data, PAYNOW_INTEGRATION_KEY)
# if calculated_hash != received_hash:
# logger.warning("Invalid hash on Paynow webhook. Discarding.")
# return jsonify({'error': 'Invalid hash'}), 403
if not merchant_reference or not merchant_reference.startswith('TNSG-'):
return jsonify({'error': 'Invalid reference format'}), 400
deal_id = merchant_reference.split('TNSG-')[1]
deal_ref = db.reference(f'deals/{deal_id}', app=db_app)
deal_data = deal_ref.get()
if not deal_data:
logger.error(f"Webhook for non-existent deal {deal_id} received.")
return jsonify({'error': 'Deal not found'}), 404
# Prevent processing old webhooks if deal is already paid
if deal_data.get('payment', {}).get('status') == 'paid':
logger.info(f"Webhook for already paid deal {deal_id} received. Ignoring.")
return jsonify({'status': 'ok'}), 200
payment_update_payload = {
"status": status,
"completed_at": datetime.now(timezone.utc).isoformat()
}
deal_ref.child('payment').update(payment_update_payload)
# --- NOTIFY USERS BASED ON STATUS ---
if status == 'paid':
farmer_id = deal_data.get('farmer_id')
buyer_id = deal_data.get('buyer_id')
# Find admins to notify
admins_ref = db.reference('users', app=db_app).order_by_child('is_admin').equal_to(True).get()
success_message = f"Payment for deal {deal_id} has been successfully received."
_send_system_notification(buyer_id, f"Your payment for deal {deal_id} was successful.", "payment_success", f"/deals/{deal_id}")
_send_system_notification(farmer_id, success_message, "payment_received", f"/deals/{deal_id}")
if admins_ref:
for admin_id, _ in admins_ref.items():
_send_system_notification(admin_id, success_message, "admin_payment_notification", f"/admin/deals/{deal_id}")
# Acknowledge receipt to Paynow
return jsonify({'status': 'ok'}), 200
except Exception as e:
logger.error(f"Error in Paynow webhook: {e}\n{traceback.format_exc()}")
return jsonify({'error': 'Internal server error'}), 500
if __name__ == '__main__':
app.run(debug=True, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))