# src/langgraph_workflow.py from typing import TypedDict, List, Dict import pandas as pd from langgraph.graph import StateGraph, END import os import time import re from pathlib import Path import base64 # Import your custom modules from src.data_loader import load_data from src.preprocessor import preprocess_data, get_feedback_distribution, get_instructor_rating_distribution, get_average_scores from src.chart_generator import ( plot_feedback_distribution_per_subject, plot_instructor_rating_distribution_per_subject, plot_avg_scores_per_subject, plot_avg_scores_per_department, plot_correlation_heatmap, plot_radar_chart_subject_department # Ensure this is imported ) from src.analysis import calculate_correlations from src.report_generator_llm import generate_analysis_text # Define the state class WorkflowState(TypedDict): file_path: str raw_df: pd.DataFrame | None processed_df: pd.DataFrame | None feedback_distribution: dict instructor_rating_distribution: dict average_scores: dict correlations: dict charts_output_dir: str # Directory where charts are saved chart_filepaths: Dict[str, str] # Map logical name to filepath analysis_text: str # Raw text analysis from LLM report_path: str # Path to the final assembled HTML report error_message: str | None # Define Node Functions def load_data_node(state: WorkflowState) -> WorkflowState: print("---NODE: LOAD DATA---") df = load_data(state["file_path"]) if df is None: return {**state, "error_message": "Failed to load data."} return {**state, "raw_df": df, "error_message": None} def preprocess_data_node(state: WorkflowState) -> WorkflowState: print("---NODE: PREPROCESS DATA---") if state["raw_df"] is None: return {**state, "error_message": "Preprocessing skipped: No raw data."} processed_df = preprocess_data(state["raw_df"].copy()) # Use .copy() feedback_dist = get_feedback_distribution(processed_df) instructor_rating_dist = get_instructor_rating_distribution(processed_df) avg_scores = get_average_scores(processed_df) # This returns a dict of DataFrames return { **state, "processed_df": processed_df, "feedback_distribution": feedback_dist, "instructor_rating_distribution": instructor_rating_dist, "average_scores": avg_scores } def generate_charts_node(state: WorkflowState) -> WorkflowState: print("---NODE: GENERATE CHARTS---") if state["processed_df"] is None or state["average_scores"] is None: return {**state, "error_message": "Chart generation skipped: Missing processed data or average scores."} charts = {} df = state["processed_df"] feedback_dist = state["feedback_distribution"] instructor_rating_dist = state["instructor_rating_distribution"] avg_scores_data = state["average_scores"] # This is a dict # Subject-wise distribution charts for subject in df['Subject'].unique(): fb_chart = plot_feedback_distribution_per_subject(feedback_dist, subject) if fb_chart: charts[f'feedback_dist_{subject.lower().replace(" ", "_")}'] = fb_chart ir_chart = plot_instructor_rating_distribution_per_subject(instructor_rating_dist, subject) if ir_chart: charts[f'instructor_rating_dist_{subject.lower().replace(" ", "_")}'] = ir_chart # Average scores charts charts['avg_scores_per_subject'] = plot_avg_scores_per_subject(avg_scores_data['avg_scores_subject']) charts['avg_scores_per_department'] = plot_avg_scores_per_department(avg_scores_data['avg_scores_dept']) # Correlation heatmap charts['correlation_heatmap_overall'] = plot_correlation_heatmap(df, "Overall") # for subject in df['Subject'].unique(): # corr_chart = plot_correlation_heatmap(df, subject) # if corr_chart: charts[f'correlation_heatmap_{subject.lower().replace(" ", "_")}'] = corr_chart # Radar charts per department for dept in df['Department'].unique(): radar_chart = plot_radar_chart_subject_department(avg_scores_data['avg_scores_subject_dept'], dept) if radar_chart: charts[f'radar_chart_{dept.lower().replace(" ", "_")}'] = radar_chart return {**state, "charts_b64": charts} def analyze_data_node(state: WorkflowState) -> WorkflowState: print("---NODE: ANALYZE DATA (CORRELATIONS)---") if state["processed_df"] is None: return {**state, "error_message": "Analysis skipped: Missing processed data."} correlations = calculate_correlations(state["processed_df"]) return {**state, "correlations": correlations} def generate_charts_node(state: WorkflowState) -> WorkflowState: print("---NODE: GENERATE CHARTS (Save as PNG)---") if state["processed_df"] is None or state["average_scores"] is None: return {**state, "error_message": "Chart generation skipped: Missing processed data or average scores."} # Create a unique directory for this run's charts timestamp = time.strftime("%Y%m%d_%H%M%S") charts_output_dir = os.path.join("outputs", f"charts_{timestamp}") os.makedirs(charts_output_dir, exist_ok=True) print(f"Charts will be saved in: {charts_output_dir}") chart_filepaths = {} df = state["processed_df"] feedback_dist = state["feedback_distribution"] instructor_rating_dist = state["instructor_rating_distribution"] avg_scores_data = state["average_scores"] # Define base filenames (keys will be used in template) chart_defs = { "avg_scores_subject": lambda: plot_avg_scores_per_subject(avg_scores_data['avg_scores_subject'], charts_output_dir, "avg_scores_subject"), "avg_scores_dept": lambda: plot_avg_scores_per_department(avg_scores_data['avg_scores_dept'], charts_output_dir, "avg_scores_dept"), "corr_heatmap_overall": lambda: plot_correlation_heatmap(df, charts_output_dir, "corr_heatmap_overall", "Overall"), } # Dynamically add subject/dept charts for subject in df['Subject'].unique(): safe_subj = re.sub(r'\W+', '', subject.lower().replace(" ", "_")) # Sanitize filename chart_defs[f"feedback_dist_{safe_subj}"] = lambda s=subject, fn=f"feedback_dist_{safe_subj}": plot_feedback_distribution_per_subject(feedback_dist, s, charts_output_dir, fn) chart_defs[f"instructor_rating_dist_{safe_subj}"] = lambda s=subject, fn=f"instructor_rating_dist_{safe_subj}": plot_instructor_rating_distribution_per_subject(instructor_rating_dist, s, charts_output_dir, fn) for dept in df['Department'].unique(): safe_dept = re.sub(r'\W+', '', dept.lower().replace(" ", "_")) chart_defs[f"radar_chart_{safe_dept}"] = lambda d=dept, fn=f"radar_chart_{safe_dept}": plot_radar_chart_subject_department(avg_scores_data['avg_scores_subject_dept'], d, charts_output_dir, fn) # Generate charts and collect filepaths for key, func in chart_defs.items(): try: filepath = func() if filepath: chart_filepaths[key] = filepath except Exception as e: print(f"Chart Gen Node: Error generating chart '{key}': {e}") return {**state, "chart_filepaths": chart_filepaths, "charts_output_dir": charts_output_dir} # analyze_data_node: No change needed def generate_analysis_text_node(state: WorkflowState) -> WorkflowState: # Renamed node print("---NODE: GENERATE ANALYSIS TEXT (LLM)---") if not all(k in state for k in ["average_scores", "feedback_distribution", "instructor_rating_distribution", "correlations"]): return {**state, "error_message": "Analysis text generation skipped: Missing data components."} # Prepare data for LLM (ensure it doesn't contain DataFrames directly if not needed by LLM) processed_data_for_llm = { "avg_scores_subject": state["average_scores"]["avg_scores_subject"], # Pass DF for markdown conversion inside LLM function "avg_scores_dept": state["average_scores"]["avg_scores_dept"], # Pass DF for markdown conversion inside LLM function "feedback_distribution": state["feedback_distribution"], "instructor_rating_distribution": state["instructor_rating_distribution"] } analysis_text = generate_analysis_text( # Call the NEW function processed_data=processed_data_for_llm, correlations=state["correlations"], ) if analysis_text.startswith("Error generating analysis:"): return {**state, "analysis_text": "", "error_message": analysis_text} # Propagate error else: return {**state, "analysis_text": analysis_text} def assemble_html_report_node(state: WorkflowState) -> WorkflowState: """Assembles the final HTML report with base64-encoded images from LLM text and chart paths.""" print("---NODE: ASSEMBLE HTML REPORT (with Base64 Images)---") if not state.get("analysis_text") or not state.get("chart_filepaths"): return {**state, "error_message": "HTML assembly skipped: Missing analysis text or chart paths."} # Determine output paths timestamp_suffix = os.path.basename(state.get("charts_output_dir", f"run_{time.strftime('%Y%m%d_%H%M%S')}")).replace('charts_', '') report_filename = f"evaluation_report_{timestamp_suffix}.html" report_filepath = os.path.join("outputs", report_filename) os.makedirs("outputs", exist_ok=True) # Ensure main 'outputs' directory exists # --- 1. Parse LLM Text (Same as before) --- analysis_sections = {} current_section_key = "preamble" # Default key current_section_title = "Introduction" # Default title current_content = [] lines = state["analysis_text"].replace('\r\n', '\n').split('\n') for line in lines: match = re.match(r'^##\s+(.+)', line.strip()) # Matches "## Heading Title" if match: if current_content: # Save previous section analysis_sections[current_section_key] = { "title": current_section_title, "content": "\n".join(current_content).strip() } current_section_title = match.group(1).strip() current_section_key = current_section_title.lower().replace(" ", "_").replace("&", "and") current_content = [] else: if line.strip(): current_content.append(f"
{line.strip()}
") # Wrap non-heading lines inif current_content: # Save the last section analysis_sections[current_section_key] = { "title": current_section_title, "content": "\n".join(current_content).strip() } if not analysis_sections and state["analysis_text"]: # Fallback if no "##" headings analysis_sections["full_analysis"] = {"title": "Full Analysis", "content": f"
{state['analysis_text']}
"} # --- 2. Load chart images and convert to base64 --- chart_base64_data = {} for key, full_chart_path_str in state["chart_filepaths"].items(): try: # Read the image file and convert to base64 with open(full_chart_path_str, "rb") as img_file: img_data = img_file.read() # Convert binary data to base64 string and make it ready for HTML embedding base64_encoded = base64.b64encode(img_data).decode('utf-8') # Store the base64 data with appropriate data URI prefix chart_base64_data[key] = f"data:image/png;base64,{base64_encoded}" except Exception as e: print(f"Warning: Failed to convert image to base64 for {key} ({full_chart_path_str}): {e}") # Skip this image - it won't be included in the report # --- 3. Build HTML Content --- html_parts = [] # --- HTML Head and Styles --- html_parts.append(f"""No analysis provided for this section.
")) # Embed specific charts for this section if configured if "charts" in config: html_parts.append("") html_parts.append("