# app.py """ Run: uvicorn app:app --reload --port 8000 This app reads data.csv (expected columns below), computes metrics and serves: - / : HTML dashboard (JS + Chart.js) - /api/tasks : JSON data for charts - /report/pdf : PDF export - /report/excel : Excel export Expected CSV columns (case sensitive): Task ID,Task Name,Client,Phase,Value,Start Time,End Time,Duration (mins) If Duration (mins) is missing, the app will compute it using Start Time and End Time. Date format is flexible (pandas.parse). """ # import io import math from fastapi import FastAPI, Query from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse from reportlab.lib.pagesizes import A4 from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer from reportlab.lib import colors from reportlab.lib.styles import getSampleStyleSheet from openpyxl import Workbook from datetime import datetime import pandas as pd import uvicorn app = FastAPI(title="Task Phase Dashboard (CSV)") CSV_PATH = "data.csv" # file should be in the same folder as app.py # Simpl HTML template (English labels). Uses Chart.js from CDN. HTML_TEMPLATE = """ Task Phase Dashboard

Task Phase Dashboard (TASH)

Total tasks
Completed
Avg Duration (mins)

Tasks per Client

Avg Duration per Client (mins)

Avg Duration per Phase (mins)

Avg Duration per Task (mins)

Value Distribution

Tasks Over Time (by start date)

Task List

Task Name Client Phase Creative Stage Value Duration (mins) Start End Delayed
""" ########### Data processing utilities ########### import pandas as pd import re import chardet import math def read_and_prepare(csv_path=CSV_PATH): """ Reads CSV and returns cleaned DataFrame: - Handles encoding automatically - Adds 'Creative Stage' if Phase includes image/video - Removes rows not containing image/video - Removes 'Emotion' from Phase """ # حاول نقرأ الملف بأكثر من ترميز try: df = pd.read_csv(csv_path, dtype=str, encoding="utf-8-sig").fillna("") except UnicodeDecodeError: with open(csv_path,'rb') as f: raw_data = f.read(50000) # نقرأ أول 50 ألف بايت بس detected = chardet.detect(raw_data) encoding_used = detected.get("encoding", "utf-8") print("🔎 Detected encoding:", encoding_used) df = pd.read_csv(csv_path, dtype=str, encoding=encoding_used).fillna("") # تأكد من وجود الأعمدة الأساسية expected_cols = ["Task ID", "Task Name", "Client", "Phase", "Value", "Start Time", "End Time", "Duration (mins)", "creative stage"] for c in expected_cols: if c not in df.columns: df[c] = "" # نظّف عمود Phase: شيل كلمة Emotion (لو في الأول) df["Phase"] = df["Phase"].str.replace(r"^\s*Emotion\s*", "", regex=True).str.strip() # أضف عمود Creative Stage df["creative stage"] = df["Phase"].apply( lambda x: "CREATIVE" if re.search(r"(image|video)", str(x).lower()) else "" ) # خليك بس في الصفوف اللي Phase فيها image أو video df = df[df["Phase"].str.lower().str.contains("image|video", na=False)] # Parse Start/End كتواريخ def try_parse(s): try: return pd.to_datetime(s, errors="coerce") except Exception: return pd.NaT df["Start_dt"] = df["Start Time"].apply(try_parse) df["End_dt"] = df["End Time"].apply(try_parse) # Duration (mins) def duration_minutes(row): val = row.get("Duration (mins)") try: if val and str(val).strip() != "": return float(str(val).strip()) except: pass s, e = row.get("Start_dt"), row.get("End_dt") if pd.notna(s) and pd.notna(e): return (e - s).total_seconds() / 60.0 return float("nan") df["Duration (mins)"] = df.apply(duration_minutes, axis=1) # delayed flag done_keywords = ["APPROVED", "DONE", "COMPLETED", "DELIVERED", "PO APPROVAL", "CLIENT APPROVAL"] df["is_done"] = df["Value"].str.upper().apply(lambda v: any(k in v for k in done_keywords)) # delayed detection: mark as delayed if duration > mean + std (per Phase could be better) mean = df["Duration (mins)"].mean(skipna=True) std = df["Duration (mins)"].std(skipna=True) if not math.isnan(mean) else 0 threshold = (mean + std) if not math.isnan(mean) else None def detect_delayed(dur): if dur is None or pd.isna(dur): return False if threshold is None: return False return dur > threshold df["delayed"] = df["Duration (mins)"].apply(detect_delayed) # prepare a start_date string for timeline grouping (YYYY-MM-DD) df["start_date"] = df["Start_dt"].dt.date.astype(str).fillna("unknown") return df import numpy as np import math def sanitize_for_json(obj): if isinstance(obj, float): if math.isnan(obj) or math.isinf(obj): return 0 return obj elif isinstance(obj, dict): return {k: sanitize_for_json(v) for k, v in obj.items()} elif isinstance(obj, list): return [sanitize_for_json(v) for v in obj] else: return obj def build_response(df, filter_client=None, filter_phase=None, filter_task=None, filter_value=None, filter_delayed=None): dff = df.copy() if filter_client: dff = dff[dff["Client"]==filter_client] if filter_phase: dff = dff[dff["Phase"]==filter_phase] if filter_task: dff = dff[dff["Task Name"]==filter_task] if filter_value: dff = dff[dff["Value"]==filter_value] if filter_delayed: if filter_delayed.lower()=='yes': dff=dff[dff["delayed"]==True] elif filter_delayed.lower()=='no': dff=dff[dff["delayed"]==False] total_count = len(dff) # لو مفيش نتائج if total_count == 0: return { "total_count": 0, "done_count": 0, "progress_percent": 0, "avg_duration": 0, "clients": sorted(df["Client"].unique().tolist()), "phases": sorted(df["Phase"].unique().tolist()), "tasks_names": sorted(df["Task Name"].unique().tolist()), "clients_filtered": [], "client_counts": [], "client_avg_duration": [], "values_labels": [], "values_counts": [], "timeline": {"labels": [], "counts": []}, "tasks": [], "message": "Not Found" # ← هنا الرسالة } # الباقي زي ما كان done_count = int(dff["is_done"].sum()) progress_percent = (done_count/total_count*100) if total_count else 0 avg_duration = float(dff["Duration (mins)"].mean(skipna=True) or 0) client_counts_series = dff["Client"].value_counts() clients = list(client_counts_series.index) client_counts = [int(x) for x in client_counts_series.values] client_avg = dff.groupby("Client")["Duration (mins)"].mean().reindex(clients).fillna(0).tolist() values_counts_series = dff["Value"].value_counts() values_labels = list(values_counts_series.index) values_counts = [int(x) for x in values_counts_series.values] timeline_series = dff.groupby("start_date").size().sort_index() timeline_labels = list(timeline_series.index) timeline_counts = [int(x) for x in timeline_series.values] tasks = dff[["Task ID","Task Name","Client","Phase","creative stage","Value","Start Time","End Time","Duration (mins)","delayed"]].fillna("").to_dict(orient="records") clients_all = sorted(df["Client"].unique().tolist()) phases_all = sorted(df["Phase"].unique().tolist()) tasks_names = sorted(df["Task Name"].unique().tolist()) resp = { "total_count": total_count, "done_count": done_count, "progress_percent": progress_percent, "avg_duration": avg_duration, "clients": clients_all, "phases": phases_all, "tasks_names": tasks_names, "clients_filtered": clients, "client_counts": client_counts, "client_avg_duration": [round(x or 0,1) for x in client_avg], "values_labels": values_labels, "values_counts": values_counts, "timeline": {"labels": timeline_labels, "counts": timeline_counts}, "tasks": tasks } return resp ########### End utilities ########### @app.get("/", response_class=HTMLResponse) def index(): return HTML_TEMPLATE ###### @app.get("/api/tasks") def api_tasks( client: str = Query(None), phase: str = Query(None), task: str = Query(None), value: str = Query(None), delayed: str = Query(None) ): df = read_and_prepare() resp = build_response( df, filter_client=client, filter_phase=phase, filter_task=task, filter_value=value, filter_delayed=delayed ) resp = sanitize_for_json(resp) return JSONResponse(resp) @app.get("/report/pdf") def report_pdf(): df = read_and_prepare() resp = build_response(df) buffer = io.BytesIO() doc = SimpleDocTemplate(buffer, pagesize=A4) styles = getSampleStyleSheet() elements = [Paragraph('Task Phase Report', styles['Title']), Spacer(1, 8)] elements.append(Paragraph(f"Total tasks: {resp['total_count']}", styles['Normal'])) elements.append(Paragraph(f"Completed: {resp['done_count']} ({resp['progress_percent']:.1f}%)", styles['Normal'])) elements.append(Paragraph(f"Average duration (mins): {resp['avg_duration']:.1f}", styles['Normal'])) elements.append(Spacer(1, 12)) # add table head data_table = [["Task Name", "Client", "Phase", "Value", "Duration (mins)", "Start", "End", "Delayed"]] for row in resp['tasks'][:200]: # limit rows in pdf data_table.append([ row.get("Task Name", ""), row.get("Client", ""), row.get("Phase", ""), row.get("Value", ""), f"{row.get('Duration (mins)', ''):.1f}" if isinstance(row.get('Duration (mins)'), (int, float)) else row.get('Duration (mins)', ''), row.get("Start Time", ""), row.get("End Time", ""), "Yes" if row.get("delayed") else "No" ]) table = Table(data_table, repeatRows=1, colWidths=[80,70,60,70,60,80,80,45]) table.setStyle(TableStyle([ ('BACKGROUND',(0,0),(-1,0),colors.lightgrey), ('GRID',(0,0),(-1,-1),0.25,colors.grey), ('ALIGN',(0,0),(-1,-1),'LEFT'), ('FONT',(0,0),(-1,0),'Helvetica-Bold') ])) elements.append(table) doc.build(elements) buffer.seek(0) return StreamingResponse(buffer, media_type='application/pdf', headers={'Content-Disposition':'attachment; filename="task_phase_report.pdf"'}) @app.get("/report/excel") def report_excel(): df = read_and_prepare() resp = build_response(df) wb = Workbook() ws = wb.active ws.title = "Task Report" headers = ["Task ID","Task Name","Client","Phase","Value","Duration (mins)","Start Time","End Time","Delayed"] ws.append(headers) for row in resp['tasks']: ws.append([ row.get("Task ID",""), row.get("Task Name",""), row.get("Client",""), row.get("Phase",""), row.get("Value",""), row.get("Duration (mins)",""), row.get("Start Time",""), row.get("End Time",""), "Yes" if row.get("delayed") else "No" ]) buf = io.BytesIO() wb.save(buf) buf.seek(0) return StreamingResponse(buf, media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet', headers={'Content-Disposition':'attachment; filename="task_phase_report.xlsx"'}) if __name__ == "__main__": uvicorn.run("app:app", host="0.0.0.0", port=7860)