# app.py
"""
Run:
uvicorn app:app --reload --port 8000
This app reads data.csv (expected columns below), computes metrics and serves:
- / : HTML dashboard (JS + Chart.js)
- /api/tasks : JSON data for charts
- /report/pdf : PDF export
- /report/excel : Excel export
Expected CSV columns (case sensitive):
Task ID,Task Name,Client,Phase,Value,Start Time,End Time,Duration (mins)
If Duration (mins) is missing, the app will compute it using Start Time and End Time.
Date format is flexible (pandas.parse).
"""
#
import io
import math
from fastapi import FastAPI, Query
from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse
from reportlab.lib.pagesizes import A4
from reportlab.platypus import SimpleDocTemplate, Table, TableStyle, Paragraph, Spacer
from reportlab.lib import colors
from reportlab.lib.styles import getSampleStyleSheet
from openpyxl import Workbook
from datetime import datetime
import pandas as pd
import uvicorn
app = FastAPI(title="Task Phase Dashboard (CSV)")
CSV_PATH = "data.csv" # file should be in the same folder as app.py
# Simpl HTML template (English labels). Uses Chart.js from CDN.
HTML_TEMPLATE = """
Task Phase Dashboard
Task Phase Dashboard (TASH)
Client:
Phase:
Task Name:
Value:
Delayed:
All
Yes
No
Reset
Tasks per Client
Avg Duration per Client (mins)
Avg Duration per Phase (mins)
Avg Duration per Task (mins)
Value Distribution
Tasks Over Time (by start date)
Task List
Task Name
Client
Phase
Creative Stage
Value
Duration (mins)
Start
End
Delayed
"""
########### Data processing utilities ###########
import pandas as pd
import re
import chardet
import math
def read_and_prepare(csv_path=CSV_PATH):
"""
Reads CSV and returns cleaned DataFrame:
- Handles encoding automatically
- Adds 'Creative Stage' if Phase includes image/video
- Removes rows not containing image/video
- Removes 'Emotion' from Phase
"""
# حاول نقرأ الملف بأكثر من ترميز
try:
df = pd.read_csv(csv_path, dtype=str, encoding="utf-8-sig").fillna("")
except UnicodeDecodeError:
with open(csv_path,'rb') as f:
raw_data = f.read(50000) # نقرأ أول 50 ألف بايت بس
detected = chardet.detect(raw_data)
encoding_used = detected.get("encoding", "utf-8")
print("🔎 Detected encoding:", encoding_used)
df = pd.read_csv(csv_path, dtype=str, encoding=encoding_used).fillna("")
# تأكد من وجود الأعمدة الأساسية
expected_cols = ["Task ID", "Task Name", "Client", "Phase", "Value", "Start Time", "End Time", "Duration (mins)", "creative stage"]
for c in expected_cols:
if c not in df.columns:
df[c] = ""
# نظّف عمود Phase: شيل كلمة Emotion (لو في الأول)
df["Phase"] = df["Phase"].str.replace(r"^\s*Emotion\s*", "", regex=True).str.strip()
# أضف عمود Creative Stage
df["creative stage"] = df["Phase"].apply(
lambda x: "CREATIVE" if re.search(r"(image|video)", str(x).lower()) else ""
)
# خليك بس في الصفوف اللي Phase فيها image أو video
df = df[df["Phase"].str.lower().str.contains("image|video", na=False)]
# Parse Start/End كتواريخ
def try_parse(s):
try:
return pd.to_datetime(s, errors="coerce")
except Exception:
return pd.NaT
df["Start_dt"] = df["Start Time"].apply(try_parse)
df["End_dt"] = df["End Time"].apply(try_parse)
# Duration (mins)
def duration_minutes(row):
val = row.get("Duration (mins)")
try:
if val and str(val).strip() != "":
return float(str(val).strip())
except:
pass
s, e = row.get("Start_dt"), row.get("End_dt")
if pd.notna(s) and pd.notna(e):
return (e - s).total_seconds() / 60.0
return float("nan")
df["Duration (mins)"] = df.apply(duration_minutes, axis=1)
# delayed flag
done_keywords = ["APPROVED", "DONE", "COMPLETED", "DELIVERED", "PO APPROVAL", "CLIENT APPROVAL"]
df["is_done"] = df["Value"].str.upper().apply(lambda v: any(k in v for k in done_keywords))
# delayed detection: mark as delayed if duration > mean + std (per Phase could be better)
mean = df["Duration (mins)"].mean(skipna=True)
std = df["Duration (mins)"].std(skipna=True) if not math.isnan(mean) else 0
threshold = (mean + std) if not math.isnan(mean) else None
def detect_delayed(dur):
if dur is None or pd.isna(dur):
return False
if threshold is None:
return False
return dur > threshold
df["delayed"] = df["Duration (mins)"].apply(detect_delayed)
# prepare a start_date string for timeline grouping (YYYY-MM-DD)
df["start_date"] = df["Start_dt"].dt.date.astype(str).fillna("unknown")
return df
import numpy as np
import math
def sanitize_for_json(obj):
if isinstance(obj, float):
if math.isnan(obj) or math.isinf(obj):
return 0
return obj
elif isinstance(obj, dict):
return {k: sanitize_for_json(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [sanitize_for_json(v) for v in obj]
else:
return obj
def build_response(df, filter_client=None, filter_phase=None,
filter_task=None, filter_value=None, filter_delayed=None):
dff = df.copy()
if filter_client: dff = dff[dff["Client"]==filter_client]
if filter_phase: dff = dff[dff["Phase"]==filter_phase]
if filter_task: dff = dff[dff["Task Name"]==filter_task]
if filter_value: dff = dff[dff["Value"]==filter_value]
if filter_delayed:
if filter_delayed.lower()=='yes': dff=dff[dff["delayed"]==True]
elif filter_delayed.lower()=='no': dff=dff[dff["delayed"]==False]
total_count = len(dff)
# لو مفيش نتائج
if total_count == 0:
return {
"total_count": 0,
"done_count": 0,
"progress_percent": 0,
"avg_duration": 0,
"clients": sorted(df["Client"].unique().tolist()),
"phases": sorted(df["Phase"].unique().tolist()),
"tasks_names": sorted(df["Task Name"].unique().tolist()),
"clients_filtered": [],
"client_counts": [],
"client_avg_duration": [],
"values_labels": [],
"values_counts": [],
"timeline": {"labels": [], "counts": []},
"tasks": [],
"message": "Not Found" # ← هنا الرسالة
}
# الباقي زي ما كان
done_count = int(dff["is_done"].sum())
progress_percent = (done_count/total_count*100) if total_count else 0
avg_duration = float(dff["Duration (mins)"].mean(skipna=True) or 0)
client_counts_series = dff["Client"].value_counts()
clients = list(client_counts_series.index)
client_counts = [int(x) for x in client_counts_series.values]
client_avg = dff.groupby("Client")["Duration (mins)"].mean().reindex(clients).fillna(0).tolist()
values_counts_series = dff["Value"].value_counts()
values_labels = list(values_counts_series.index)
values_counts = [int(x) for x in values_counts_series.values]
timeline_series = dff.groupby("start_date").size().sort_index()
timeline_labels = list(timeline_series.index)
timeline_counts = [int(x) for x in timeline_series.values]
tasks = dff[["Task ID","Task Name","Client","Phase","creative stage","Value","Start Time","End Time","Duration (mins)","delayed"]].fillna("").to_dict(orient="records")
clients_all = sorted(df["Client"].unique().tolist())
phases_all = sorted(df["Phase"].unique().tolist())
tasks_names = sorted(df["Task Name"].unique().tolist())
resp = {
"total_count": total_count,
"done_count": done_count,
"progress_percent": progress_percent,
"avg_duration": avg_duration,
"clients": clients_all,
"phases": phases_all,
"tasks_names": tasks_names,
"clients_filtered": clients,
"client_counts": client_counts,
"client_avg_duration": [round(x or 0,1) for x in client_avg],
"values_labels": values_labels,
"values_counts": values_counts,
"timeline": {"labels": timeline_labels, "counts": timeline_counts},
"tasks": tasks
}
return resp
########### End utilities ###########
@app.get("/", response_class=HTMLResponse)
def index():
return HTML_TEMPLATE
######
@app.get("/api/tasks")
def api_tasks(
client: str = Query(None),
phase: str = Query(None),
task: str = Query(None),
value: str = Query(None),
delayed: str = Query(None)
):
df = read_and_prepare()
resp = build_response(
df,
filter_client=client,
filter_phase=phase,
filter_task=task,
filter_value=value,
filter_delayed=delayed
)
resp = sanitize_for_json(resp)
return JSONResponse(resp)
@app.get("/report/pdf")
def report_pdf():
df = read_and_prepare()
resp = build_response(df)
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4)
styles = getSampleStyleSheet()
elements = [Paragraph('Task Phase Report', styles['Title']), Spacer(1, 8)]
elements.append(Paragraph(f"Total tasks: {resp['total_count']}", styles['Normal']))
elements.append(Paragraph(f"Completed: {resp['done_count']} ({resp['progress_percent']:.1f}%)", styles['Normal']))
elements.append(Paragraph(f"Average duration (mins): {resp['avg_duration']:.1f}", styles['Normal']))
elements.append(Spacer(1, 12))
# add table head
data_table = [["Task Name", "Client", "Phase", "Value", "Duration (mins)", "Start", "End", "Delayed"]]
for row in resp['tasks'][:200]: # limit rows in pdf
data_table.append([
row.get("Task Name", ""),
row.get("Client", ""),
row.get("Phase", ""),
row.get("Value", ""),
f"{row.get('Duration (mins)', ''):.1f}" if isinstance(row.get('Duration (mins)'), (int, float)) else row.get('Duration (mins)', ''),
row.get("Start Time", ""),
row.get("End Time", ""),
"Yes" if row.get("delayed") else "No"
])
table = Table(data_table, repeatRows=1, colWidths=[80,70,60,70,60,80,80,45])
table.setStyle(TableStyle([
('BACKGROUND',(0,0),(-1,0),colors.lightgrey),
('GRID',(0,0),(-1,-1),0.25,colors.grey),
('ALIGN',(0,0),(-1,-1),'LEFT'),
('FONT',(0,0),(-1,0),'Helvetica-Bold')
]))
elements.append(table)
doc.build(elements)
buffer.seek(0)
return StreamingResponse(buffer, media_type='application/pdf',
headers={'Content-Disposition':'attachment; filename="task_phase_report.pdf"'})
@app.get("/report/excel")
def report_excel():
df = read_and_prepare()
resp = build_response(df)
wb = Workbook()
ws = wb.active
ws.title = "Task Report"
headers = ["Task ID","Task Name","Client","Phase","Value","Duration (mins)","Start Time","End Time","Delayed"]
ws.append(headers)
for row in resp['tasks']:
ws.append([
row.get("Task ID",""),
row.get("Task Name",""),
row.get("Client",""),
row.get("Phase",""),
row.get("Value",""),
row.get("Duration (mins)",""),
row.get("Start Time",""),
row.get("End Time",""),
"Yes" if row.get("delayed") else "No"
])
buf = io.BytesIO()
wb.save(buf)
buf.seek(0)
return StreamingResponse(buf, media_type='application/vnd.openxmlformats-officedocument.spreadsheetml.sheet',
headers={'Content-Disposition':'attachment; filename="task_phase_report.xlsx"'})
if __name__ == "__main__":
uvicorn.run("app:app", host="0.0.0.0", port=7860)