import gradio as gr
import threading
import time
from datetime import datetime
from report_generator import (
generate_csv_report,
generate_xlsx_report,
generate_pdf_report,
generate_diagram_report,
)
import cohere
# Initialize the Cohere client using your provided API key.
COHERE_API_KEY = "ffwtfyfLwqSOtw65psZardfqhUxMr1h7u5vzYotI"
co = cohere.Client(COHERE_API_KEY)
# Define labeled examples for classification as dictionaries.
cohere_examples = [
{"text": "generate a report on unemployment", "label": "report"},
{"text": "create a report on market trends", "label": "report"},
{"text": "generate diagram of sales data", "label": "diagram"},
{"text": "create a diagram of quarterly revenue", "label": "diagram"}
]
def cohere_parse_command(command):
"""Uses Cohere's classify endpoint to determine the intent of the command."""
try:
response = co.classify(
model="large",
inputs=[command],
examples=cohere_examples
)
prediction = response.classifications[0].prediction
return prediction
except Exception as e:
print("Cohere classification error, falling back to keyword search:", e)
lower = command.lower()
if "diagram" in lower:
return "diagram"
elif "report" in lower:
return "report"
else:
return "unknown"
# Global containers for pending tasks and generated files.
task_bucket = [] # Queue of tasks waiting to be processed.
generated_files = [] # Global list of filenames generated.
bucket_lock = threading.Lock()
def process_single_task(task):
"""Process one task with a countdown timer and update its details."""
print(f"[{datetime.now()}] Processing task: {task['command']}")
# Simulate processing delay.
for remaining in range(5, 0, -1):
task["timer"] = remaining
time.sleep(1)
intent = cohere_parse_command(task["command"])
cmd = task["command"].lower()
# Pass the entire command as "subject" for dynamic content.
if intent == "report":
if "csv" in cmd:
filename = generate_csv_report(subject=task["command"])
task["result"] = f"CSV report generated for '{task['command']}'"
elif "xlsx" in cmd:
filename = generate_xlsx_report(subject=task["command"])
task["result"] = f"XLSX report generated for '{task['command']}'"
else:
filename = generate_pdf_report(subject=task["command"])
task["result"] = f"PDF report generated for '{task['command']}'"
elif intent == "diagram":
filename = generate_diagram_report()
task["result"] = f"Diagram generated for '{task['command']}'"
else:
filename = None
task["result"] = f"Unrecognized command: {task['command']}"
if filename:
task["file"] = filename
generated_files.append(filename)
task["completed_at"] = datetime.now().strftime("%H:%M:%S")
task["status"] = "Complete"
task["details"] = f"Task '{task['command']}' processed with result: {task['result']}"
print(f"[{datetime.now()}] Task completed: {task}")
def background_task_processor():
"""Continuously monitors the task bucket and spawns a new thread per task."""
while True:
time.sleep(1)
task = None
with bucket_lock:
if task_bucket:
task = task_bucket.pop(0)
if task:
task["status"] = "In Progress"
threading.Thread(target=process_single_task, args=(task,), daemon=True).start()
def submit_task(command, current_tasks):
"""Adds a new task when the user clicks Submit (command remains visible)."""
if command.strip() == "":
return current_tasks, generated_files, command
new_task = {
"command": command,
"submitted_at": datetime.now().strftime("%H:%M:%S"),
"timer": "",
"result": "",
"file": "",
"completed_at": "",
"status": "Pending",
"details": ""
}
with bucket_lock:
task_bucket.append(new_task)
current_tasks.append(new_task)
return current_tasks, generated_files, command
def build_tasks_html(tasks):
"""Builds an HTML table showing task details."""
html = """
"
return html
def refresh_ui(tasks, files):
"""Refreshes the UI by rebuilding the task list and updating the files state."""
# Update files state from the global generated_files list.
files = generated_files[:]
tasks_html = build_tasks_html(tasks)
return tasks, files, tasks_html, files
def update_dropdown(files):
"""Updates the dropdown choices for preview from the list of generated files."""
return files
def preview_file(file_path):
"""Returns HTML for previewing the file based on its type."""
if not file_path:
return "No file selected."
if file_path.endswith(".png"):
return f'
'
elif file_path.endswith(".pdf"):
return f'