# src/langgraph_workflow.py from typing import TypedDict, List, Dict import pandas as pd from langgraph.graph import StateGraph, END import os import time import re from pathlib import Path import base64 # Import your custom modules from src.data_loader import load_data from src.preprocessor import preprocess_data, get_feedback_distribution, get_instructor_rating_distribution, get_average_scores from src.chart_generator import ( plot_feedback_distribution_per_subject, plot_instructor_rating_distribution_per_subject, plot_avg_scores_per_subject, plot_avg_scores_per_department, plot_correlation_heatmap, plot_radar_chart_subject_department # Ensure this is imported ) from src.analysis import calculate_correlations from src.report_generator_llm import generate_analysis_text # Define the state class WorkflowState(TypedDict): file_path: str raw_df: pd.DataFrame | None processed_df: pd.DataFrame | None feedback_distribution: dict instructor_rating_distribution: dict average_scores: dict correlations: dict charts_output_dir: str # Directory where charts are saved chart_filepaths: Dict[str, str] # Map logical name to filepath analysis_text: str # Raw text analysis from LLM report_path: str # Path to the final assembled HTML report error_message: str | None # Define Node Functions def load_data_node(state: WorkflowState) -> WorkflowState: print("---NODE: LOAD DATA---") df = load_data(state["file_path"]) if df is None: return {**state, "error_message": "Failed to load data."} return {**state, "raw_df": df, "error_message": None} def preprocess_data_node(state: WorkflowState) -> WorkflowState: print("---NODE: PREPROCESS DATA---") if state["raw_df"] is None: return {**state, "error_message": "Preprocessing skipped: No raw data."} processed_df = preprocess_data(state["raw_df"].copy()) # Use .copy() feedback_dist = get_feedback_distribution(processed_df) instructor_rating_dist = get_instructor_rating_distribution(processed_df) avg_scores = get_average_scores(processed_df) # This returns a dict of DataFrames return { **state, "processed_df": processed_df, "feedback_distribution": feedback_dist, "instructor_rating_distribution": instructor_rating_dist, "average_scores": avg_scores } def generate_charts_node(state: WorkflowState) -> WorkflowState: print("---NODE: GENERATE CHARTS---") if state["processed_df"] is None or state["average_scores"] is None: return {**state, "error_message": "Chart generation skipped: Missing processed data or average scores."} charts = {} df = state["processed_df"] feedback_dist = state["feedback_distribution"] instructor_rating_dist = state["instructor_rating_distribution"] avg_scores_data = state["average_scores"] # This is a dict # Subject-wise distribution charts for subject in df['Subject'].unique(): fb_chart = plot_feedback_distribution_per_subject(feedback_dist, subject) if fb_chart: charts[f'feedback_dist_{subject.lower().replace(" ", "_")}'] = fb_chart ir_chart = plot_instructor_rating_distribution_per_subject(instructor_rating_dist, subject) if ir_chart: charts[f'instructor_rating_dist_{subject.lower().replace(" ", "_")}'] = ir_chart # Average scores charts charts['avg_scores_per_subject'] = plot_avg_scores_per_subject(avg_scores_data['avg_scores_subject']) charts['avg_scores_per_department'] = plot_avg_scores_per_department(avg_scores_data['avg_scores_dept']) # Correlation heatmap charts['correlation_heatmap_overall'] = plot_correlation_heatmap(df, "Overall") # for subject in df['Subject'].unique(): # corr_chart = plot_correlation_heatmap(df, subject) # if corr_chart: charts[f'correlation_heatmap_{subject.lower().replace(" ", "_")}'] = corr_chart # Radar charts per department for dept in df['Department'].unique(): radar_chart = plot_radar_chart_subject_department(avg_scores_data['avg_scores_subject_dept'], dept) if radar_chart: charts[f'radar_chart_{dept.lower().replace(" ", "_")}'] = radar_chart return {**state, "charts_b64": charts} def analyze_data_node(state: WorkflowState) -> WorkflowState: print("---NODE: ANALYZE DATA (CORRELATIONS)---") if state["processed_df"] is None: return {**state, "error_message": "Analysis skipped: Missing processed data."} correlations = calculate_correlations(state["processed_df"]) return {**state, "correlations": correlations} def generate_charts_node(state: WorkflowState) -> WorkflowState: print("---NODE: GENERATE CHARTS (Save as PNG)---") if state["processed_df"] is None or state["average_scores"] is None: return {**state, "error_message": "Chart generation skipped: Missing processed data or average scores."} # Create a unique directory for this run's charts timestamp = time.strftime("%Y%m%d_%H%M%S") charts_output_dir = os.path.join("outputs", f"charts_{timestamp}") os.makedirs(charts_output_dir, exist_ok=True) print(f"Charts will be saved in: {charts_output_dir}") chart_filepaths = {} df = state["processed_df"] feedback_dist = state["feedback_distribution"] instructor_rating_dist = state["instructor_rating_distribution"] avg_scores_data = state["average_scores"] # Define base filenames (keys will be used in template) chart_defs = { "avg_scores_subject": lambda: plot_avg_scores_per_subject(avg_scores_data['avg_scores_subject'], charts_output_dir, "avg_scores_subject"), "avg_scores_dept": lambda: plot_avg_scores_per_department(avg_scores_data['avg_scores_dept'], charts_output_dir, "avg_scores_dept"), "corr_heatmap_overall": lambda: plot_correlation_heatmap(df, charts_output_dir, "corr_heatmap_overall", "Overall"), } # Dynamically add subject/dept charts for subject in df['Subject'].unique(): safe_subj = re.sub(r'\W+', '', subject.lower().replace(" ", "_")) # Sanitize filename chart_defs[f"feedback_dist_{safe_subj}"] = lambda s=subject, fn=f"feedback_dist_{safe_subj}": plot_feedback_distribution_per_subject(feedback_dist, s, charts_output_dir, fn) chart_defs[f"instructor_rating_dist_{safe_subj}"] = lambda s=subject, fn=f"instructor_rating_dist_{safe_subj}": plot_instructor_rating_distribution_per_subject(instructor_rating_dist, s, charts_output_dir, fn) for dept in df['Department'].unique(): safe_dept = re.sub(r'\W+', '', dept.lower().replace(" ", "_")) chart_defs[f"radar_chart_{safe_dept}"] = lambda d=dept, fn=f"radar_chart_{safe_dept}": plot_radar_chart_subject_department(avg_scores_data['avg_scores_subject_dept'], d, charts_output_dir, fn) # Generate charts and collect filepaths for key, func in chart_defs.items(): try: filepath = func() if filepath: chart_filepaths[key] = filepath except Exception as e: print(f"Chart Gen Node: Error generating chart '{key}': {e}") return {**state, "chart_filepaths": chart_filepaths, "charts_output_dir": charts_output_dir} # analyze_data_node: No change needed def generate_analysis_text_node(state: WorkflowState) -> WorkflowState: # Renamed node print("---NODE: GENERATE ANALYSIS TEXT (LLM)---") if not all(k in state for k in ["average_scores", "feedback_distribution", "instructor_rating_distribution", "correlations"]): return {**state, "error_message": "Analysis text generation skipped: Missing data components."} # Prepare data for LLM (ensure it doesn't contain DataFrames directly if not needed by LLM) processed_data_for_llm = { "avg_scores_subject": state["average_scores"]["avg_scores_subject"], # Pass DF for markdown conversion inside LLM function "avg_scores_dept": state["average_scores"]["avg_scores_dept"], # Pass DF for markdown conversion inside LLM function "feedback_distribution": state["feedback_distribution"], "instructor_rating_distribution": state["instructor_rating_distribution"] } analysis_text = generate_analysis_text( # Call the NEW function processed_data=processed_data_for_llm, correlations=state["correlations"], ) if analysis_text.startswith("Error generating analysis:"): return {**state, "analysis_text": "", "error_message": analysis_text} # Propagate error else: return {**state, "analysis_text": analysis_text} def assemble_html_report_node(state: WorkflowState) -> WorkflowState: """Assembles the final HTML report with base64-encoded images from LLM text and chart paths.""" print("---NODE: ASSEMBLE HTML REPORT (with Base64 Images)---") if not state.get("analysis_text") or not state.get("chart_filepaths"): return {**state, "error_message": "HTML assembly skipped: Missing analysis text or chart paths."} # Determine output paths timestamp_suffix = os.path.basename(state.get("charts_output_dir", f"run_{time.strftime('%Y%m%d_%H%M%S')}")).replace('charts_', '') report_filename = f"evaluation_report_{timestamp_suffix}.html" report_filepath = os.path.join("outputs", report_filename) os.makedirs("outputs", exist_ok=True) # Ensure main 'outputs' directory exists # --- 1. Parse LLM Text (Same as before) --- analysis_sections = {} current_section_key = "preamble" # Default key current_section_title = "Introduction" # Default title current_content = [] lines = state["analysis_text"].replace('\r\n', '\n').split('\n') for line in lines: match = re.match(r'^##\s+(.+)', line.strip()) # Matches "## Heading Title" if match: if current_content: # Save previous section analysis_sections[current_section_key] = { "title": current_section_title, "content": "\n".join(current_content).strip() } current_section_title = match.group(1).strip() current_section_key = current_section_title.lower().replace(" ", "_").replace("&", "and") current_content = [] else: if line.strip(): current_content.append(f"

{line.strip()}

") # Wrap non-heading lines in

if current_content: # Save the last section analysis_sections[current_section_key] = { "title": current_section_title, "content": "\n".join(current_content).strip() } if not analysis_sections and state["analysis_text"]: # Fallback if no "##" headings analysis_sections["full_analysis"] = {"title": "Full Analysis", "content": f"

{state['analysis_text']}

"} # --- 2. Load chart images and convert to base64 --- chart_base64_data = {} for key, full_chart_path_str in state["chart_filepaths"].items(): try: # Read the image file and convert to base64 with open(full_chart_path_str, "rb") as img_file: img_data = img_file.read() # Convert binary data to base64 string and make it ready for HTML embedding base64_encoded = base64.b64encode(img_data).decode('utf-8') # Store the base64 data with appropriate data URI prefix chart_base64_data[key] = f"data:image/png;base64,{base64_encoded}" except Exception as e: print(f"Warning: Failed to convert image to base64 for {key} ({full_chart_path_str}): {e}") # Skip this image - it won't be included in the report # --- 3. Build HTML Content --- html_parts = [] # --- HTML Head and Styles --- html_parts.append(f""" Training Evaluation Report - {timestamp_suffix}

Training Evaluation Report

""") # --- Add LLM Analysis Sections --- # Define the order and specific chart integration section_order_and_config = { "executive_summary": {"title": "Executive Summary"}, "overall_performance_analysis": { "title": "Overall Performance Analysis", "charts": ["avg_scores_subject", "avg_scores_dept"] }, "subject_performance_analysis": {"title": "Subject Performance Analysis"}, # Will list subject charts under 'All Charts' "department_performance_analysis": {"title": "Department Performance Analysis"}, # Will list dept charts under 'All Charts' "correlation_insights": { "title": "Correlation Insights", "charts": ["corr_heatmap_overall"] }, "key_insights_and_recommendations": {"title": "Key Insights and Recommendations"} } # Chart titles for display (more user-friendly than keys) chart_display_titles = { 'avg_scores_subject': "Average Scores per Subject", 'avg_scores_dept': "Average Scores per Department", 'corr_heatmap_overall': "Overall Correlation: Feedback vs. Instructor Rating" # Dynamic titles for subject/dept charts will be generated } for section_key, config in section_order_and_config.items(): section_data = analysis_sections.get(section_key) if section_data: html_parts.append(f"
") html_parts.append(f"

{section_data.get('title', config.get('title', 'Analysis Section'))}

") html_parts.append(section_data.get('content', "

No analysis provided for this section.

")) # Embed specific charts for this section if configured if "charts" in config: html_parts.append("
") # Use flex container for chart_key in config["charts"]: if chart_key in chart_base64_data: display_title = chart_display_titles.get(chart_key, chart_key.replace("_", " ").title()) html_parts.append("
") # Flex basis html_parts.append(f"
{display_title}
") html_parts.append(f"{display_title}") html_parts.append("
") html_parts.append("
") html_parts.append("
") # --- Add "All Charts" Section with Tags/Categories --- html_parts.append("
") html_parts.append("

All Generated Charts

") # Categorize charts categorized_charts = { "Overview & Averages": [], "Subject Distributions": [], "Department Radars": [], "Correlations": [] } for key, base64_data in chart_base64_data.items(): display_title = chart_display_titles.get(key) if not display_title: # Generate dynamic titles if key.startswith("avg_scores_"): display_title = key.replace("avg_scores_", "Average Scores ").replace("_", " ").title() elif key.startswith("feedback_dist_"): display_title = f"Feedback Distribution: {key.replace('feedback_dist_', '').replace('_', ' ').title()}" elif key.startswith("instructor_rating_dist_"): display_title = f"Instructor Rating: {key.replace('instructor_rating_dist_', '').replace('_', ' ').title()}" elif key.startswith("radar_chart_"): display_title = f"Radar: {key.replace('radar_chart_', '').replace('_', ' ').title()} Dept." elif key.startswith("corr_heatmap_"): display_title = f"Correlation: {key.replace('corr_heatmap_', '').replace('_', ' ').title()}" else: display_title = key.replace("_", " ").title() chart_display_titles[key] = display_title # Store for consistency # Assign to categories if key.startswith("avg_scores_"): categorized_charts["Overview & Averages"].append((display_title, base64_data)) elif key.startswith("feedback_dist_") or key.startswith("instructor_rating_dist_"): categorized_charts["Subject Distributions"].append((display_title, base64_data)) elif key.startswith("radar_chart_"): categorized_charts["Department Radars"].append((display_title, base64_data)) elif key.startswith("corr_heatmap_"): categorized_charts["Correlations"].append((display_title, base64_data)) else: # Fallback category if needed if "Other Charts" not in categorized_charts: categorized_charts["Other Charts"] = [] categorized_charts["Other Charts"].append((display_title, base64_data)) for category, charts_in_category in categorized_charts.items(): if charts_in_category: html_parts.append(f"

{category}

") for title, base64_data in charts_in_category: html_parts.append("
") # 3 cards per row approx html_parts.append(f"
{title}
") html_parts.append(f"{title}") html_parts.append("
") html_parts.append("
") html_parts.append("
") # End chart-tags-section # --- HTML Footer --- html_parts.append("""
""") # --- Join and Save --- final_html_content = "\n".join(html_parts) try: with open(report_filepath, "w", encoding="utf-8") as f: f.write(final_html_content) print(f"Assembled HTML report (with base64 images) saved to: {report_filepath}") return {**state, "report_path": report_filepath, "error_message": None} except Exception as e: print(f"Error saving assembled HTML report: {e}") return {**state, "report_path": None, "error_message": f"Failed to save HTML report: {e}"} # --- Define Workflow Graph --- def create_workflow(): workflow = StateGraph(WorkflowState) workflow.add_node("load_data", load_data_node) workflow.add_node("preprocess_data", preprocess_data_node) workflow.add_node("generate_charts", generate_charts_node) workflow.add_node("analyze_data", analyze_data_node) workflow.add_node("generate_analysis_text", generate_analysis_text_node) # Renamed workflow.add_node("assemble_html_report", assemble_html_report_node) # Added workflow.set_entry_point("load_data") workflow.add_edge("load_data", "preprocess_data") workflow.add_edge("preprocess_data", "generate_charts") workflow.add_edge("generate_charts", "analyze_data") workflow.add_edge("analyze_data", "generate_analysis_text") workflow.add_edge("generate_analysis_text", "assemble_html_report") workflow.add_edge("assemble_html_report", END) # Add conditional edges / error handling if needed later app_graph = workflow.compile() return app_graph