sphere / app.py
Waqasjan123's picture
Update app.py
b1e6745 verified
import gradio as gr
import pandas as pd
import openpyxl
import os
import warnings
import tempfile
import time
from datetime import datetime
import pytz
# Suppress warnings
warnings.filterwarnings('ignore', category=UserWarning, module='openpyxl')
# Set Timezone to Pakistan Standard Time
PKT = pytz.timezone('Asia/Karachi')
# --- CONFIGURATION ---
# If Excel gives a relative link (e.g. /sites/...), we add this prefix.
SHAREPOINT_DOMAIN = "https://sphereconsultingpk.sharepoint.com"
def process_excel_streaming(file_obj):
logs = []
def log(message):
now_pkt = datetime.now(PKT)
timestamp = now_pkt.strftime("%H:%M:%S")
logs.append(f"[{timestamp}] {message}")
return "\n".join(logs)
if file_obj is None:
yield log("❌ Error: Please upload a file first."), None
return
try:
original_filename = os.path.basename(file_obj.name)
yield log("πŸ“‚ Reading Excel file..."), None
time.sleep(0.5)
# --- 1. LOAD DATA ---
try:
df_all = pd.read_excel(file_obj.name, sheet_name="Transaction List", header=3)
yield log(f"βœ… Successfully loaded sheet 'Transaction List'."), None
except ValueError:
yield log("❌ Error: Could not find sheet 'Transaction List'. Check your tab name."), None
return
df_all.columns = df_all.columns.str.strip()
total_rows = len(df_all)
yield log(f"πŸ“Š Total Rows Detected: {total_rows}"), None
# --- 2. FILTER STATUS ---
col_status = 'Odoo import status'
df_to_process = df_all.copy()
if col_status in df_all.columns:
posted_mask = df_all[col_status].astype(str).str.strip().str.lower() == 'posted'
posted_count = posted_mask.sum()
df_to_process = df_all[~posted_mask].copy()
yield log(f"πŸ›‘ Skipped: {posted_count} rows (Already 'posted')."), None
if len(df_to_process) == 0:
yield log("βœ… All rows are already posted! No file generated."), None
return
# --- 3. πŸ” STRICT VALIDATION ---
yield log("πŸ” Running Strict Data Validation..."), None
error_list = []
for index, row in df_to_process.iterrows():
excel_row_num = index + 5
# 1. Check JV No
if pd.isna(row.get('JV No.')) or str(row.get('JV No.')).strip() == '':
error_list.append(f"Row {excel_row_num}: Missing 'JV No.'")
# 2. Check Date
if pd.isna(row.get('Date')):
error_list.append(f"Row {excel_row_num}: Missing 'Date'")
# 3. Check Vendor
if pd.isna(row.get('Paid To / Vendor')) or str(row.get('Paid To / Vendor')).strip() == '':
error_list.append(f"Row {excel_row_num}: Missing 'Paid To / Vendor'")
# 4. Check Account Code (COA)
if pd.isna(row.get('COA Code Odoo')) or str(row.get('COA Code Odoo')).strip() == '':
error_list.append(f"Row {excel_row_num}: Missing 'COA Code Odoo' (Account Code)")
# 5. Check Amount
if pd.isna(row.get('Amount')):
error_list.append(f"Row {excel_row_num}: Missing 'Amount'")
# 6. --- NEW: CHECK PROJECT vs ANALYTIC MATCH ---
project_val = row.get('Project')
analytic_val = row.get('Analytical Account Odoo')
# Only check if Project column has a value
if pd.notna(project_val) and str(project_val).strip() != "":
p_str = str(project_val).strip().lower()
# Check if Analytic is empty when Project is present
if pd.isna(analytic_val) or str(analytic_val).strip() == "":
error_list.append(f"Row {excel_row_num}: Project '{project_val}' is set, but 'Analytical Account Odoo' is empty.")
else:
a_str = str(analytic_val).strip().lower()
# THE CHECK: Is project name inside the analytic string?
if p_str not in a_str:
error_list.append(f"Row {excel_row_num}: Mismatch! Project '{project_val}' not found in Analytic '{analytic_val}'")
# --- IF ERRORS FOUND, STOP IMMEDIATELY ---
if error_list:
yield log("❌ VALIDATION FAILED! Fix these errors in Excel and re-upload:"), None
yield log("-" * 40), None
# Print first 25 errors to log
for err in error_list[:25]:
yield log(f"β›” {err}"), None
if len(error_list) > 25:
yield log(f"... and {len(error_list) - 25} more errors."), None
yield log("-" * 40), None
yield log("🚫 Process Aborted. No file generated."), None
return
yield log("βœ… All Data & Project Matches Validated Successfully!"), None
# --- 4. FORMATTING DATES ---
df_to_process['Date'] = pd.to_datetime(df_to_process['Date'])
df_to_process['Date_Str'] = df_to_process['Date'].dt.strftime('%Y-%m-%d')
date_check = df_to_process.groupby('JV No.')['Date'].nunique()
bad_jvs = date_check[date_check > 1]
if not bad_jvs.empty:
yield log(f"β›” CRITICAL ERROR: JV has multiple dates: {bad_jvs.index.tolist()}"), None
return
# --- 5. EXTRACT HYPERLINKS & FIX PREFIX ---
yield log("πŸ”— Extracting & Fixing Hyperlinks..."), None
wb = openpyxl.load_workbook(file_obj.name, data_only=False)
ws = wb["Transaction List"]
url_col_letter = None
for cell in ws[4]:
if cell.value and str(cell.value).strip() == "URL":
url_col_letter = cell.column_letter
break
if url_col_letter:
urls = []
for index in df_to_process.index:
excel_row = index + 5
try:
cell = ws[f"{url_col_letter}{excel_row}"]
# Get raw URL
if cell.hyperlink:
raw_url = cell.hyperlink.target
else:
raw_url = None
# Fix Logic: Check for relative paths
if raw_url:
if not raw_url.startswith(('http://', 'https://')):
# Remove leading slash if present to avoid double slash
clean_path = raw_url.lstrip('/')
urls.append(f"{SHAREPOINT_DOMAIN}/{clean_path}")
else:
urls.append(raw_url)
else:
urls.append(None)
except:
urls.append(None)
df_to_process['Extracted_URL'] = urls
else:
df_to_process['Extracted_URL'] = None
# --- 6. TRANSFORM ---
yield log("πŸ› οΈ Mapping to Odoo Schema..."), None
def extract_id_part(val):
if pd.isna(val): return None
val_str = str(val).strip()
if '*' in val_str:
return val_str.split('*')[0].strip()
return val_str
def format_analytic(val):
if pd.isna(val): return None
val_str = str(val)
if '*' in val_str:
return f'{{"{val_str.split("*")[0]}": 100.0}}'
return None
col_analytic = 'Analytical Account Odoo'
if col_analytic in df_to_process.columns:
df_to_process['Formatted_Analytic'] = df_to_process[col_analytic].apply(format_analytic)
else:
df_to_process['Formatted_Analytic'] = None
df_to_process['Vendor_ID_Only'] = df_to_process['Paid To / Vendor'].apply(extract_id_part)
odoo_df = pd.DataFrame()
odoo_df['partner_id/.id'] = df_to_process['Vendor_ID_Only']
odoo_df['invoice_date'] = df_to_process['Date_Str']
odoo_df['invoice_date_due'] = df_to_process['Date_Str']
odoo_df['ref'] = df_to_process['JV No.']
odoo_df['x_studio_vouchers_link'] = df_to_process['Extracted_URL']
odoo_df['Extract status'] = "Draft"
odoo_df['move_type'] = "Vendor Bill"
odoo_df['invoice_line_ids/name'] = df_to_process['Detail']
odoo_df['invoice_line_ids/account_id'] = df_to_process['COA Code Odoo']
odoo_df['invoice_line_ids/price_unit'] = df_to_process['Amount']
odoo_df['invoice_line_ids/analytic_distribution'] = df_to_process['Formatted_Analytic']
odoo_df['invoice_line_ids/x_studio_location'] = df_to_process['Location']
# --- 7. GROUPING ---
yield log("πŸ“¦ Grouping lines..."), None
odoo_df = odoo_df.sort_values(by=['partner_id/.id', 'ref'])
is_duplicate = odoo_df.duplicated(subset=['ref', 'partner_id/.id'], keep='first')
header_cols = [
'partner_id/.id',
'invoice_date',
'invoice_date_due',
'ref',
'x_studio_vouchers_link',
'Extract status',
'move_type'
]
odoo_df.loc[is_duplicate, header_cols] = ""
yield log("✨ Formatting complete."), None
# --- 8. SAVE ---
yield log("πŸ’Ύ Generating file..."), None
name_no_ext = os.path.splitext(original_filename)[0]
short_name = name_no_ext[:33].strip()
now_pkt = datetime.now(PKT)
timestamp = now_pkt.strftime("%Y%m%d-%H%M%S")
final_name = f"{short_name}-{timestamp}-odoo.xlsx"
output_path = os.path.join(tempfile.gettempdir(), final_name)
odoo_df.to_excel(output_path, index=False)
yield log(f"πŸŽ‰ DONE! File ready: {final_name}"), output_path
except Exception as e:
yield log(f"❌ ERROR: {str(e)}"), None
def auth_checker(username, password):
correct_pass = os.environ.get("APP_PASSWORD")
if not correct_pass: return True
return password == correct_pass
with gr.Blocks(title="Smart Odoo Converter for Sphere") as app:
with gr.Row():
with gr.Column(scale=8):
gr.Markdown("# Smart Excel to Odoo Converter for Sphere")
with gr.Column(scale=2):
gr.Button("Sign Out", link="/logout", size="sm")
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=1):
file_input = gr.File(label="1. Upload Excel File", file_types=[".xlsx"], height=150)
process_btn = gr.Button("2. Analyze & Convert", variant="primary", size="lg")
with gr.Column(scale=1):
log_output = gr.Textbox(label="Live Analysis Log", lines=12, max_lines=15, interactive=False, autoscroll=True)
file_output = gr.File(label="3. Download Result", interactive=False)
process_btn.click(fn=process_excel_streaming, inputs=file_input, outputs=[log_output, file_output])
if __name__ == "__main__":
app.launch(auth=auth_checker)