Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import os | |
| import pandas as pd | |
| from io import StringIO | |
| import tempfile | |
| import sqlite3 | |
| import re | |
| from datetime import datetime | |
| import time | |
| # Import for LLM (using OpenAI as example - you can replace with your preferred LLM) | |
| try: | |
| from openai import OpenAI | |
| OPENAI_AVAILABLE = True | |
| except ImportError: | |
| OPENAI_AVAILABLE = False | |
| st.warning("OpenAI library not available. Using mock LLM processing.") | |
| # Create necessary directories | |
| UPLOAD_FOLDER = "upload" | |
| OUTPUT_FOLDER = "output" | |
| LOGS_FOLDER = "logs" | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(OUTPUT_FOLDER, exist_ok=True) | |
| os.makedirs(LOGS_FOLDER, exist_ok=True) | |
| # Mock LLM function if OpenAI not available | |
| def mock_llm_process(sql_query, prompt_instruction): | |
| """Mock LLM processing for demo purposes""" | |
| try: | |
| # Extract table name | |
| table_match = re.search(r'CREATE\s+TABLE\s+(\w+)', sql_query, re.IGNORECASE) | |
| table_name = table_match.group(1) if table_match else "results" | |
| # Extract column names | |
| columns = [] | |
| lines = sql_query.split('\n') | |
| for line in lines: | |
| if '(' in line and ')' not in line: | |
| continue # Skip the CREATE TABLE line | |
| if any(keyword in line.upper() for keyword in ['PRIMARY KEY', 'FOREIGN KEY', 'CONSTRAINT']): | |
| continue | |
| if ')' in line and '(' not in line: | |
| break | |
| column_match = re.match(r'\s*(\w+)\s+\w+', line) | |
| if column_match: | |
| columns.append(column_match.group(1)) | |
| if not columns: | |
| # Fallback: extract from SELECT statement | |
| select_match = re.search(r'SELECT\s+(.*?)\s+FROM', sql_query, re.IGNORECASE | re.DOTALL) | |
| if select_match: | |
| select_part = select_match.group(1) | |
| if '*' not in select_part: | |
| columns = [col.strip().split(' ')[-1].split('.')[-1] | |
| for col in select_part.split(',')] | |
| else: | |
| columns = ["id", "name", "value"] # Default columns | |
| if not columns: | |
| columns = ["column1", "column2", "column3"] | |
| # Generate mock data | |
| import random | |
| data = [] | |
| for i in range(5): # Generate 5 rows of mock data | |
| row = {} | |
| for col in columns: | |
| if 'id' in col.lower() or 'num' in col.lower(): | |
| row[col] = i + 1 | |
| elif 'name' in col.lower(): | |
| row[col] = f"Name_{i+1}" | |
| elif 'date' in col.lower(): | |
| row[col] = f"2023-01-0{i+1}" | |
| else: | |
| row[col] = f"Value_{random.randint(1, 100)}" | |
| data.append(row) | |
| return pd.DataFrame(data, columns=columns) | |
| except Exception as e: | |
| # Fallback dataframe | |
| return pd.DataFrame({ | |
| "result_id": [1, 2, 3], | |
| "processed_data": ["Sample data 1", "Sample data 2", "Sample data 3"], | |
| "note": [f"Mock LLM processed SQL (Error: {str(e)})", | |
| "This is demo data", | |
| "Replace with actual LLM integration"] | |
| }) | |
| # LLM Processing function | |
| def process_sql_with_llm(sql_content, custom_prompt=""): | |
| """ | |
| Process SQL query with LLM and return a DataFrame | |
| Replace this with your actual LLM implementation | |
| """ | |
| default_prompt = """ | |
| Analyze the following SQL query and generate appropriate sample data that | |
| would result from executing this query. Return the data in a structured format | |
| that can be easily converted to CSV. Consider the table structure, column names, | |
| and data types implied by the SQL. | |
| """ | |
| full_prompt = default_prompt + "\n" + custom_prompt + "\n\nSQL Query:\n" + sql_content | |
| if OPENAI_AVAILABLE: | |
| try: | |
| client = OpenAI() # Will use OPENAI_API_KEY from environment | |
| response = client.chat.completions.create( | |
| model="gpt-3.5-turbo", | |
| messages=[ | |
| {"role": "system", "content": "You are a SQL and data expert."}, | |
| {"role": "user", "content": full_prompt} | |
| ] | |
| ) | |
| # Parse LLM response to DataFrame (this will depend on your LLM's output format) | |
| # This is a simplified example - you'll need to adjust based on actual LLM responses | |
| llm_output = response.choices[0].message.content | |
| # Try to extract data from LLM response | |
| # This is a placeholder - implement based on your LLM's response format | |
| return mock_llm_process(sql_content, custom_prompt) | |
| except Exception as e: | |
| st.error(f"LLM API Error: {e}") | |
| return mock_llm_process(sql_content, custom_prompt) | |
| else: | |
| # Use mock processing if OpenAI not available | |
| return mock_llm_process(sql_content, custom_prompt) | |
| # Function to save dataframe as CSV | |
| def save_as_csv(df, filename): | |
| """Save DataFrame as CSV in output folder""" | |
| output_path = os.path.join(OUTPUT_FOLDER, filename) | |
| df.to_csv(output_path, index=False) | |
| return output_path | |
| # Function to log workflow events | |
| def log_workflow_event(event_type, description, file_name=""): | |
| """Log workflow events to a file""" | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| log_entry = f"{timestamp} | {event_type} | {file_name} | {description}\n" | |
| log_file = os.path.join(LOGS_FOLDER, "workflow.log") | |
| with open(log_file, "a") as f: | |
| f.write(log_entry) | |
| return log_entry | |
| # Function to get workflow logs | |
| def get_workflow_logs(): | |
| """Get all workflow logs""" | |
| log_file = os.path.join(LOGS_FOLDER, "workflow.log") | |
| if os.path.exists(log_file): | |
| with open(log_file, "r") as f: | |
| return f.readlines() | |
| return [] | |
| # Main App | |
| st.set_page_config(page_title="Agentic AI Agents", layout="wide") | |
| st.title("Agentic AI Agents") | |
| # Create tabs | |
| tab1, tab2, tab3, tab4 = st.tabs(["Data Engineer Agent", "QA Agent", "DevOps Agent", "Workflow"]) | |
| # Data Engineer Agent Tab | |
| with tab1: | |
| st.header("Data Engineer Agent") | |
| st.write("Upload SQL files for processing") | |
| # File uploader | |
| uploaded_file = st.file_uploader("Choose a SQL file", type=['sql', 'txt']) | |
| if uploaded_file is not None: | |
| # Display file details | |
| st.write(f"Filename: {uploaded_file.name}") | |
| st.write(f"File size: {uploaded_file.size} bytes") | |
| # Read file content | |
| stringio = StringIO(uploaded_file.getvalue().decode("utf-8")) | |
| sql_content = stringio.read() | |
| # Display SQL content | |
| st.subheader("SQL Content") | |
| st.code(sql_content, language="sql") | |
| # Save uploaded file | |
| upload_path = os.path.join(UPLOAD_FOLDER, uploaded_file.name) | |
| with open(upload_path, "w") as f: | |
| f.write(sql_content) | |
| # Log the upload event | |
| log_workflow_event("UPLOAD", "File uploaded successfully", uploaded_file.name) | |
| st.success(f"File saved to {upload_path}") | |
| # Custom prompt input | |
| st.subheader("Custom Processing Instructions") | |
| custom_prompt = st.text_area( | |
| "Add custom instructions for the LLM (optional):", | |
| placeholder="E.g., Focus on specific columns, apply certain transformations, etc." | |
| ) | |
| # Process button | |
| if st.button("Process SQL with LLM"): | |
| with st.spinner("Processing with LLM..."): | |
| try: | |
| # Process SQL with LLM | |
| result_df = process_sql_with_llm(sql_content, custom_prompt) | |
| # Display results | |
| st.subheader("Processed Results") | |
| st.dataframe(result_df) | |
| # Save as CSV | |
| csv_filename = os.path.splitext(uploaded_file.name)[0] + ".csv" | |
| csv_path = save_as_csv(result_df, csv_filename) | |
| # Log the processing event | |
| log_workflow_event("PROCESS", "SQL processed and CSV generated", csv_filename) | |
| st.success(f"Results saved to {csv_path}") | |
| # Provide download button | |
| csv_data = result_df.to_csv(index=False) | |
| st.download_button( | |
| label="Download CSV", | |
| data=csv_data, | |
| file_name=csv_filename, | |
| mime="text/csv" | |
| ) | |
| except Exception as e: | |
| st.error(f"Error processing file: {e}") | |
| log_workflow_event("ERROR", f"Processing error: {str(e)}", uploaded_file.name) | |
| # QA Agent Tab | |
| with tab2: | |
| st.header("QA Agent") | |
| st.write("Quality Assurance operations and testing") | |
| # List processed files | |
| output_files = [f for f in os.listdir(OUTPUT_FOLDER) if f.endswith('.csv')] | |
| if output_files: | |
| st.subheader("Available Processed Files") | |
| selected_file = st.selectbox("Select a file to review:", output_files) | |
| if selected_file: | |
| file_path = os.path.join(OUTPUT_FOLDER, selected_file) | |
| df = pd.read_csv(file_path) | |
| st.write(f"### {selected_file}") | |
| st.dataframe(df) | |
| # Basic QA metrics | |
| st.subheader("QA Metrics") | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric("Rows", len(df)) | |
| with col2: | |
| st.metric("Columns", len(df.columns)) | |
| with col3: | |
| missing_values = df.isnull().sum().sum() | |
| st.metric("Missing Values", missing_values) | |
| # Data quality checks | |
| st.subheader("Data Quality Checks") | |
| issues_found = 0 | |
| for col in df.columns: | |
| if df[col].isnull().any(): | |
| st.warning(f"Column '{col}' contains {df[col].isnull().sum()} null values") | |
| issues_found += 1 | |
| # Duplicate rows check | |
| duplicate_rows = df.duplicated().sum() | |
| if duplicate_rows > 0: | |
| st.warning(f"Found {duplicate_rows} duplicate rows") | |
| issues_found += 1 | |
| # Log QA review | |
| if st.button("Approve File for Deployment"): | |
| if issues_found == 0: | |
| log_workflow_event("QA_APPROVE", "File approved with no issues", selected_file) | |
| st.success(f"β {selected_file} approved for deployment!") | |
| else: | |
| if st.checkbox("Approve despite issues"): | |
| log_workflow_event("QA_APPROVE", f"File approved with {issues_found} known issues", selected_file) | |
| st.success(f"β {selected_file} approved for deployment (with known issues)!") | |
| else: | |
| st.info("Please acknowledge issues before approving") | |
| if st.button("Flag File for Re-processing"): | |
| log_workflow_event("QA_REJECT", f"File rejected due to {issues_found} issues", selected_file) | |
| st.warning(f"β οΈ {selected_file} flagged for re-processing") | |
| else: | |
| st.info("No processed files available. Upload and process SQL files in the Data Engineer Agent tab.") | |
| # DevOps Agent Tab | |
| with tab3: | |
| st.header("DevOps Agent") | |
| st.write("System monitoring and operations") | |
| # System status | |
| st.subheader("System Status") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.metric("Upload Folder", UPLOAD_FOLDER) | |
| upload_files = len([f for f in os.listdir(UPLOAD_FOLDER) if f.endswith(('.sql', '.txt'))]) | |
| st.metric("Uploaded SQL Files", upload_files) | |
| with col2: | |
| st.metric("Output Folder", OUTPUT_FOLDER) | |
| output_files_count = len([f for f in os.listdir(OUTPUT_FOLDER) if f.endswith('.csv')]) | |
| st.metric("Generated CSV Files", output_files_count) | |
| # Folder contents | |
| st.subheader("Upload Folder Contents") | |
| upload_files_list = os.listdir(UPLOAD_FOLDER) | |
| if upload_files_list: | |
| for file in upload_files_list: | |
| st.text(f"π {file}") | |
| else: | |
| st.info("No files in upload folder") | |
| st.subheader("Output Folder Contents") | |
| output_files_list = os.listdir(OUTPUT_FOLDER) | |
| if output_files_list: | |
| for file in output_files_list: | |
| col1, col2, col3 = st.columns([3, 1, 1]) | |
| with col1: | |
| st.text(f"π {file}") | |
| with col2: | |
| if st.button("Deploy", key=f"deploy_{file}"): | |
| log_workflow_event("DEPLOY", "File deployed to production", file) | |
| st.success(f"{file} deployed!") | |
| with col3: | |
| if st.button("Archive", key=f"archive_{file}"): | |
| log_workflow_event("ARCHIVE", "File archived", file) | |
| st.info(f"{file} archived") | |
| # System health | |
| st.subheader("System Health") | |
| st.progress(95) | |
| st.caption("System operational: 95%") | |
| # Cleanup options | |
| st.subheader("Maintenance") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button("Clear Upload Folder"): | |
| for file in os.listdir(UPLOAD_FOLDER): | |
| os.remove(os.path.join(UPLOAD_FOLDER, file)) | |
| log_workflow_event("MAINTENANCE", "Upload folder cleared") | |
| st.success("Upload folder cleared!") | |
| st.experimental_rerun() | |
| with col2: | |
| if st.button("Clear Output Folder"): | |
| for file in os.listdir(OUTPUT_FOLDER): | |
| os.remove(os.path.join(OUTPUT_FOLDER, file)) | |
| log_workflow_event("MAINTENANCE", "Output folder cleared") | |
| st.success("Output folder cleared!") | |
| st.experimental_rerun() | |
| # Workflow Tab | |
| with tab4: | |
| st.header("End-to-End Workflow") | |
| st.write("Visualizing the workflow from Data Engineering β QA β DevOps") | |
| # Workflow explanation | |
| st.markdown(""" | |
| ### Workflow Overview | |
| This system follows a structured workflow to ensure data quality and proper deployment: | |
| 1. **Data Engineer Agent**: Uploads and processes SQL files using LLM | |
| 2. **QA Agent**: Reviews processed data for quality and approves for deployment | |
| 3. **DevOps Agent**: Deploys approved files and maintains system health | |
| Each step must be completed before moving to the next, ensuring proper governance. | |
| """) | |
| # Create a visual workflow | |
| st.subheader("Workflow Visualization") | |
| # Get recent workflow for a specific file (if any) | |
| logs = get_workflow_logs() | |
| files_in_workflow = set() | |
| # Extract unique file names from logs | |
| for log in logs: | |
| parts = log.split(" | ") | |
| if len(parts) >= 4 and parts[2].strip(): | |
| files_in_workflow.add(parts[2].strip()) | |
| if files_in_workflow: | |
| selected_workflow_file = st.selectbox("Select a file to view its workflow:", list(files_in_workflow)) | |
| if selected_workflow_file: | |
| # Filter logs for this file | |
| file_logs = [log for log in logs if selected_workflow_file in log] | |
| st.write(f"### Workflow for: {selected_workflow_file}") | |
| # Create workflow status indicators | |
| col1, col2, col3 = st.columns(3) | |
| # Check status for each stage | |
| uploaded = any("UPLOAD" in log and selected_workflow_file in log for log in logs) | |
| processed = any("PROCESS" in log and selected_workflow_file in log for log in logs) | |
| qa_approved = any("QA_APPROVE" in log and selected_workflow_file in log for log in logs) | |
| deployed = any("DEPLOY" in log and selected_workflow_file in log for log in logs) | |
| archived = any("ARCHIVE" in log and selected_workflow_file in log for log in logs) | |
| with col1: | |
| st.markdown("### 1. Data Engineering") | |
| if uploaded: | |
| st.success("β File Uploaded") | |
| else: | |
| st.error("β Not Uploaded") | |
| if processed: | |
| st.success("β SQL Processed") | |
| else: | |
| st.warning("β³ Pending Processing") | |
| if uploaded and not processed: | |
| st.info("Next: Process the SQL file with LLM") | |
| with col2: | |
| st.markdown("### 2. QA Review") | |
| if processed and not qa_approved: | |
| st.warning("β³ Pending QA Approval") | |
| elif qa_approved: | |
| st.success("β QA Approved") | |
| qa_log = next((log for log in file_logs if "QA_APPROVE" in log), "") | |
| if "known issues" in qa_log: | |
| st.warning("Approved with known issues") | |
| else: | |
| st.error("β Cannot start QA") | |
| st.caption("Complete Data Engineering first") | |
| if processed and not qa_approved: | |
| st.info("Next: Review in QA Agent tab") | |
| with col3: | |
| st.markdown("### 3. DevOps Deployment") | |
| if qa_approved and not deployed: | |
| st.warning("β³ Ready for Deployment") | |
| elif deployed: | |
| st.success("β Deployed to Production") | |
| else: | |
| st.error("β Cannot Deploy") | |
| st.caption("Complete QA Approval first") | |
| if archived: | |
| st.info("ποΈ File Archived") | |
| if qa_approved and not deployed: | |
| st.info("Next: Deploy in DevOps Agent tab") | |
| # Show detailed timeline | |
| st.subheader("Detailed Timeline") | |
| for log in reversed(file_logs): # Show newest first | |
| parts = log.strip().split(" | ") | |
| if len(parts) >= 4: | |
| timestamp, event_type, filename, description = parts[:4] | |
| # Color code based on event type | |
| if event_type in ["UPLOAD", "PROCESS"]: | |
| st.info(f"**{timestamp}** - π οΈ **{event_type}**: {description}") | |
| elif event_type in ["QA_APPROVE", "QA_REJECT"]: | |
| if "QA_APPROVE" in event_type: | |
| st.success(f"**{timestamp}** - β **{event_type}**: {description}") | |
| else: | |
| st.error(f"**{timestamp}** - β **{event_type}**: {description}") | |
| elif event_type in ["DEPLOY", "ARCHIVE"]: | |
| st.success(f"**{timestamp}** - π **{event_type}**: {description}") | |
| elif event_type == "ERROR": | |
| st.error(f"**{timestamp}** - β **{event_type}**: {description}") | |
| else: | |
| st.write(f"**{timestamp}** - **{event_type}**: {description}") | |
| # Show workflow statistics | |
| st.subheader("Workflow Statistics") | |
| total_files = len(files_in_workflow) | |
| fully_processed = sum(1 for f in files_in_workflow if | |
| any("DEPLOY" in log and f in log for log in logs)) | |
| qa_pending = sum(1 for f in files_in_workflow if | |
| any("PROCESS" in log and f in log for log in logs) and | |
| not any("QA_APPROVE" in log and f in log for log in logs)) | |
| col1, col2, col3 = st.columns(3) | |
| col1.metric("Total Files in Workflow", total_files) | |
| col2.metric("Successfully Deployed", fully_processed) | |
| col3.metric("Pending QA Approval", qa_pending) | |
| # Show completion rate | |
| if total_files > 0: | |
| completion_rate = (fully_processed / total_files) * 100 | |
| st.progress(completion_rate / 100) | |
| st.caption(f"Workflow Completion Rate: {completion_rate:.1f}%") | |
| else: | |
| st.info("No workflow data available yet. Start by uploading a file in the Data Engineer Agent tab.") | |
| # Show sample workflow | |
| st.subheader("Sample Workflow") | |
| st.markdown(""" | |
| When you process a file, you'll see a workflow like this: | |
| **1. Data Engineering Phase** | |
| - β File Uploaded | |
| - β SQL Processed | |
| **2. QA Phase** | |
| - β QA Approved | |
| **3. DevOps Phase** | |
| - β Deployed to Production | |
| - ποΈ File Archived | |
| """) | |
| # Footer | |
| st.markdown("---") | |
| st.caption("Agentic AI Agents System - Built with Streamlit") |