import streamlit as st import os import pandas as pd from io import StringIO import tempfile import sqlite3 import re from datetime import datetime import time # Import for LLM (using OpenAI as example - you can replace with your preferred LLM) try: from openai import OpenAI OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False st.warning("OpenAI library not available. Using mock LLM processing.") # Create necessary directories UPLOAD_FOLDER = "upload" OUTPUT_FOLDER = "output" LOGS_FOLDER = "logs" os.makedirs(UPLOAD_FOLDER, exist_ok=True) os.makedirs(OUTPUT_FOLDER, exist_ok=True) os.makedirs(LOGS_FOLDER, exist_ok=True) # Mock LLM function if OpenAI not available def mock_llm_process(sql_query, prompt_instruction): """Mock LLM processing for demo purposes""" try: # Extract table name table_match = re.search(r'CREATE\s+TABLE\s+(\w+)', sql_query, re.IGNORECASE) table_name = table_match.group(1) if table_match else "results" # Extract column names columns = [] lines = sql_query.split('\n') for line in lines: if '(' in line and ')' not in line: continue # Skip the CREATE TABLE line if any(keyword in line.upper() for keyword in ['PRIMARY KEY', 'FOREIGN KEY', 'CONSTRAINT']): continue if ')' in line and '(' not in line: break column_match = re.match(r'\s*(\w+)\s+\w+', line) if column_match: columns.append(column_match.group(1)) if not columns: # Fallback: extract from SELECT statement select_match = re.search(r'SELECT\s+(.*?)\s+FROM', sql_query, re.IGNORECASE | re.DOTALL) if select_match: select_part = select_match.group(1) if '*' not in select_part: columns = [col.strip().split(' ')[-1].split('.')[-1] for col in select_part.split(',')] else: columns = ["id", "name", "value"] # Default columns if not columns: columns = ["column1", "column2", "column3"] # Generate mock data import random data = [] for i in range(5): # Generate 5 rows of mock data row = {} for col in columns: if 'id' in col.lower() or 'num' in col.lower(): row[col] = i + 1 elif 'name' in col.lower(): row[col] = f"Name_{i+1}" elif 'date' in col.lower(): row[col] = f"2023-01-0{i+1}" else: row[col] = f"Value_{random.randint(1, 100)}" data.append(row) return pd.DataFrame(data, columns=columns) except Exception as e: # Fallback dataframe return pd.DataFrame({ "result_id": [1, 2, 3], "processed_data": ["Sample data 1", "Sample data 2", "Sample data 3"], "note": [f"Mock LLM processed SQL (Error: {str(e)})", "This is demo data", "Replace with actual LLM integration"] }) # LLM Processing function def process_sql_with_llm(sql_content, custom_prompt=""): """ Process SQL query with LLM and return a DataFrame Replace this with your actual LLM implementation """ default_prompt = """ Analyze the following SQL query and generate appropriate sample data that would result from executing this query. Return the data in a structured format that can be easily converted to CSV. Consider the table structure, column names, and data types implied by the SQL. """ full_prompt = default_prompt + "\n" + custom_prompt + "\n\nSQL Query:\n" + sql_content if OPENAI_AVAILABLE: try: client = OpenAI() # Will use OPENAI_API_KEY from environment response = client.chat.completions.create( model="gpt-3.5-turbo", messages=[ {"role": "system", "content": "You are a SQL and data expert."}, {"role": "user", "content": full_prompt} ] ) # Parse LLM response to DataFrame (this will depend on your LLM's output format) # This is a simplified example - you'll need to adjust based on actual LLM responses llm_output = response.choices[0].message.content # Try to extract data from LLM response # This is a placeholder - implement based on your LLM's response format return mock_llm_process(sql_content, custom_prompt) except Exception as e: st.error(f"LLM API Error: {e}") return mock_llm_process(sql_content, custom_prompt) else: # Use mock processing if OpenAI not available return mock_llm_process(sql_content, custom_prompt) # Function to save dataframe as CSV def save_as_csv(df, filename): """Save DataFrame as CSV in output folder""" output_path = os.path.join(OUTPUT_FOLDER, filename) df.to_csv(output_path, index=False) return output_path # Function to log workflow events def log_workflow_event(event_type, description, file_name=""): """Log workflow events to a file""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") log_entry = f"{timestamp} | {event_type} | {file_name} | {description}\n" log_file = os.path.join(LOGS_FOLDER, "workflow.log") with open(log_file, "a") as f: f.write(log_entry) return log_entry # Function to get workflow logs def get_workflow_logs(): """Get all workflow logs""" log_file = os.path.join(LOGS_FOLDER, "workflow.log") if os.path.exists(log_file): with open(log_file, "r") as f: return f.readlines() return [] # Main App st.set_page_config(page_title="Agentic AI Agents", layout="wide") st.title("Agentic AI Agents") # Create tabs tab1, tab2, tab3, tab4 = st.tabs(["Data Engineer Agent", "QA Agent", "DevOps Agent", "Workflow"]) # Data Engineer Agent Tab with tab1: st.header("Data Engineer Agent") st.write("Upload SQL files for processing") # File uploader uploaded_file = st.file_uploader("Choose a SQL file", type=['sql', 'txt']) if uploaded_file is not None: # Display file details st.write(f"Filename: {uploaded_file.name}") st.write(f"File size: {uploaded_file.size} bytes") # Read file content stringio = StringIO(uploaded_file.getvalue().decode("utf-8")) sql_content = stringio.read() # Display SQL content st.subheader("SQL Content") st.code(sql_content, language="sql") # Save uploaded file upload_path = os.path.join(UPLOAD_FOLDER, uploaded_file.name) with open(upload_path, "w") as f: f.write(sql_content) # Log the upload event log_workflow_event("UPLOAD", "File uploaded successfully", uploaded_file.name) st.success(f"File saved to {upload_path}") # Custom prompt input st.subheader("Custom Processing Instructions") custom_prompt = st.text_area( "Add custom instructions for the LLM (optional):", placeholder="E.g., Focus on specific columns, apply certain transformations, etc." ) # Process button if st.button("Process SQL with LLM"): with st.spinner("Processing with LLM..."): try: # Process SQL with LLM result_df = process_sql_with_llm(sql_content, custom_prompt) # Display results st.subheader("Processed Results") st.dataframe(result_df) # Save as CSV csv_filename = os.path.splitext(uploaded_file.name)[0] + ".csv" csv_path = save_as_csv(result_df, csv_filename) # Log the processing event log_workflow_event("PROCESS", "SQL processed and CSV generated", csv_filename) st.success(f"Results saved to {csv_path}") # Provide download button csv_data = result_df.to_csv(index=False) st.download_button( label="Download CSV", data=csv_data, file_name=csv_filename, mime="text/csv" ) except Exception as e: st.error(f"Error processing file: {e}") log_workflow_event("ERROR", f"Processing error: {str(e)}", uploaded_file.name) # QA Agent Tab with tab2: st.header("QA Agent") st.write("Quality Assurance operations and testing") # List processed files output_files = [f for f in os.listdir(OUTPUT_FOLDER) if f.endswith('.csv')] if output_files: st.subheader("Available Processed Files") selected_file = st.selectbox("Select a file to review:", output_files) if selected_file: file_path = os.path.join(OUTPUT_FOLDER, selected_file) df = pd.read_csv(file_path) st.write(f"### {selected_file}") st.dataframe(df) # Basic QA metrics st.subheader("QA Metrics") col1, col2, col3 = st.columns(3) with col1: st.metric("Rows", len(df)) with col2: st.metric("Columns", len(df.columns)) with col3: missing_values = df.isnull().sum().sum() st.metric("Missing Values", missing_values) # Data quality checks st.subheader("Data Quality Checks") issues_found = 0 for col in df.columns: if df[col].isnull().any(): st.warning(f"Column '{col}' contains {df[col].isnull().sum()} null values") issues_found += 1 # Duplicate rows check duplicate_rows = df.duplicated().sum() if duplicate_rows > 0: st.warning(f"Found {duplicate_rows} duplicate rows") issues_found += 1 # Log QA review if st.button("Approve File for Deployment"): if issues_found == 0: log_workflow_event("QA_APPROVE", "File approved with no issues", selected_file) st.success(f"✅ {selected_file} approved for deployment!") else: if st.checkbox("Approve despite issues"): log_workflow_event("QA_APPROVE", f"File approved with {issues_found} known issues", selected_file) st.success(f"✅ {selected_file} approved for deployment (with known issues)!") else: st.info("Please acknowledge issues before approving") if st.button("Flag File for Re-processing"): log_workflow_event("QA_REJECT", f"File rejected due to {issues_found} issues", selected_file) st.warning(f"⚠️ {selected_file} flagged for re-processing") else: st.info("No processed files available. Upload and process SQL files in the Data Engineer Agent tab.") # DevOps Agent Tab with tab3: st.header("DevOps Agent") st.write("System monitoring and operations") # System status st.subheader("System Status") col1, col2 = st.columns(2) with col1: st.metric("Upload Folder", UPLOAD_FOLDER) upload_files = len([f for f in os.listdir(UPLOAD_FOLDER) if f.endswith(('.sql', '.txt'))]) st.metric("Uploaded SQL Files", upload_files) with col2: st.metric("Output Folder", OUTPUT_FOLDER) output_files_count = len([f for f in os.listdir(OUTPUT_FOLDER) if f.endswith('.csv')]) st.metric("Generated CSV Files", output_files_count) # Folder contents st.subheader("Upload Folder Contents") upload_files_list = os.listdir(UPLOAD_FOLDER) if upload_files_list: for file in upload_files_list: st.text(f"📄 {file}") else: st.info("No files in upload folder") st.subheader("Output Folder Contents") output_files_list = os.listdir(OUTPUT_FOLDER) if output_files_list: for file in output_files_list: col1, col2, col3 = st.columns([3, 1, 1]) with col1: st.text(f"📊 {file}") with col2: if st.button("Deploy", key=f"deploy_{file}"): log_workflow_event("DEPLOY", "File deployed to production", file) st.success(f"{file} deployed!") with col3: if st.button("Archive", key=f"archive_{file}"): log_workflow_event("ARCHIVE", "File archived", file) st.info(f"{file} archived") # System health st.subheader("System Health") st.progress(95) st.caption("System operational: 95%") # Cleanup options st.subheader("Maintenance") col1, col2 = st.columns(2) with col1: if st.button("Clear Upload Folder"): for file in os.listdir(UPLOAD_FOLDER): os.remove(os.path.join(UPLOAD_FOLDER, file)) log_workflow_event("MAINTENANCE", "Upload folder cleared") st.success("Upload folder cleared!") st.experimental_rerun() with col2: if st.button("Clear Output Folder"): for file in os.listdir(OUTPUT_FOLDER): os.remove(os.path.join(OUTPUT_FOLDER, file)) log_workflow_event("MAINTENANCE", "Output folder cleared") st.success("Output folder cleared!") st.experimental_rerun() # Workflow Tab with tab4: st.header("End-to-End Workflow") st.write("Visualizing the workflow from Data Engineering → QA → DevOps") # Workflow explanation st.markdown(""" ### Workflow Overview This system follows a structured workflow to ensure data quality and proper deployment: 1. **Data Engineer Agent**: Uploads and processes SQL files using LLM 2. **QA Agent**: Reviews processed data for quality and approves for deployment 3. **DevOps Agent**: Deploys approved files and maintains system health Each step must be completed before moving to the next, ensuring proper governance. """) # Create a visual workflow st.subheader("Workflow Visualization") # Get recent workflow for a specific file (if any) logs = get_workflow_logs() files_in_workflow = set() # Extract unique file names from logs for log in logs: parts = log.split(" | ") if len(parts) >= 4 and parts[2].strip(): files_in_workflow.add(parts[2].strip()) if files_in_workflow: selected_workflow_file = st.selectbox("Select a file to view its workflow:", list(files_in_workflow)) if selected_workflow_file: # Filter logs for this file file_logs = [log for log in logs if selected_workflow_file in log] st.write(f"### Workflow for: {selected_workflow_file}") # Create workflow status indicators col1, col2, col3 = st.columns(3) # Check status for each stage uploaded = any("UPLOAD" in log and selected_workflow_file in log for log in logs) processed = any("PROCESS" in log and selected_workflow_file in log for log in logs) qa_approved = any("QA_APPROVE" in log and selected_workflow_file in log for log in logs) deployed = any("DEPLOY" in log and selected_workflow_file in log for log in logs) archived = any("ARCHIVE" in log and selected_workflow_file in log for log in logs) with col1: st.markdown("### 1. Data Engineering") if uploaded: st.success("✅ File Uploaded") else: st.error("❌ Not Uploaded") if processed: st.success("✅ SQL Processed") else: st.warning("⏳ Pending Processing") if uploaded and not processed: st.info("Next: Process the SQL file with LLM") with col2: st.markdown("### 2. QA Review") if processed and not qa_approved: st.warning("⏳ Pending QA Approval") elif qa_approved: st.success("✅ QA Approved") qa_log = next((log for log in file_logs if "QA_APPROVE" in log), "") if "known issues" in qa_log: st.warning("Approved with known issues") else: st.error("❌ Cannot start QA") st.caption("Complete Data Engineering first") if processed and not qa_approved: st.info("Next: Review in QA Agent tab") with col3: st.markdown("### 3. DevOps Deployment") if qa_approved and not deployed: st.warning("⏳ Ready for Deployment") elif deployed: st.success("✅ Deployed to Production") else: st.error("❌ Cannot Deploy") st.caption("Complete QA Approval first") if archived: st.info("🗃️ File Archived") if qa_approved and not deployed: st.info("Next: Deploy in DevOps Agent tab") # Show detailed timeline st.subheader("Detailed Timeline") for log in reversed(file_logs): # Show newest first parts = log.strip().split(" | ") if len(parts) >= 4: timestamp, event_type, filename, description = parts[:4] # Color code based on event type if event_type in ["UPLOAD", "PROCESS"]: st.info(f"**{timestamp}** - 🛠️ **{event_type}**: {description}") elif event_type in ["QA_APPROVE", "QA_REJECT"]: if "QA_APPROVE" in event_type: st.success(f"**{timestamp}** - ✅ **{event_type}**: {description}") else: st.error(f"**{timestamp}** - ❌ **{event_type}**: {description}") elif event_type in ["DEPLOY", "ARCHIVE"]: st.success(f"**{timestamp}** - 🚀 **{event_type}**: {description}") elif event_type == "ERROR": st.error(f"**{timestamp}** - ❌ **{event_type}**: {description}") else: st.write(f"**{timestamp}** - **{event_type}**: {description}") # Show workflow statistics st.subheader("Workflow Statistics") total_files = len(files_in_workflow) fully_processed = sum(1 for f in files_in_workflow if any("DEPLOY" in log and f in log for log in logs)) qa_pending = sum(1 for f in files_in_workflow if any("PROCESS" in log and f in log for log in logs) and not any("QA_APPROVE" in log and f in log for log in logs)) col1, col2, col3 = st.columns(3) col1.metric("Total Files in Workflow", total_files) col2.metric("Successfully Deployed", fully_processed) col3.metric("Pending QA Approval", qa_pending) # Show completion rate if total_files > 0: completion_rate = (fully_processed / total_files) * 100 st.progress(completion_rate / 100) st.caption(f"Workflow Completion Rate: {completion_rate:.1f}%") else: st.info("No workflow data available yet. Start by uploading a file in the Data Engineer Agent tab.") # Show sample workflow st.subheader("Sample Workflow") st.markdown(""" When you process a file, you'll see a workflow like this: **1. Data Engineering Phase** - ✅ File Uploaded - ✅ SQL Processed **2. QA Phase** - ✅ QA Approved **3. DevOps Phase** - ✅ Deployed to Production - 🗃️ File Archived """) # Footer st.markdown("---") st.caption("Agentic AI Agents System - Built with Streamlit")