Agentic_AI / src /streamlit_app.py
Chang-Gore2025's picture
Update src/streamlit_app.py
63ba67c verified
import streamlit as st
import os
import pandas as pd
from io import StringIO
import tempfile
import sqlite3
import re
from datetime import datetime
import time
# Import for LLM (using OpenAI as example - you can replace with your preferred LLM)
try:
from openai import OpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
st.warning("OpenAI library not available. Using mock LLM processing.")
# Create necessary directories
UPLOAD_FOLDER = "upload"
OUTPUT_FOLDER = "output"
LOGS_FOLDER = "logs"
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
os.makedirs(OUTPUT_FOLDER, exist_ok=True)
os.makedirs(LOGS_FOLDER, exist_ok=True)
# Mock LLM function if OpenAI not available
def mock_llm_process(sql_query, prompt_instruction):
"""Mock LLM processing for demo purposes"""
try:
# Extract table name
table_match = re.search(r'CREATE\s+TABLE\s+(\w+)', sql_query, re.IGNORECASE)
table_name = table_match.group(1) if table_match else "results"
# Extract column names
columns = []
lines = sql_query.split('\n')
for line in lines:
if '(' in line and ')' not in line:
continue # Skip the CREATE TABLE line
if any(keyword in line.upper() for keyword in ['PRIMARY KEY', 'FOREIGN KEY', 'CONSTRAINT']):
continue
if ')' in line and '(' not in line:
break
column_match = re.match(r'\s*(\w+)\s+\w+', line)
if column_match:
columns.append(column_match.group(1))
if not columns:
# Fallback: extract from SELECT statement
select_match = re.search(r'SELECT\s+(.*?)\s+FROM', sql_query, re.IGNORECASE | re.DOTALL)
if select_match:
select_part = select_match.group(1)
if '*' not in select_part:
columns = [col.strip().split(' ')[-1].split('.')[-1]
for col in select_part.split(',')]
else:
columns = ["id", "name", "value"] # Default columns
if not columns:
columns = ["column1", "column2", "column3"]
# Generate mock data
import random
data = []
for i in range(5): # Generate 5 rows of mock data
row = {}
for col in columns:
if 'id' in col.lower() or 'num' in col.lower():
row[col] = i + 1
elif 'name' in col.lower():
row[col] = f"Name_{i+1}"
elif 'date' in col.lower():
row[col] = f"2023-01-0{i+1}"
else:
row[col] = f"Value_{random.randint(1, 100)}"
data.append(row)
return pd.DataFrame(data, columns=columns)
except Exception as e:
# Fallback dataframe
return pd.DataFrame({
"result_id": [1, 2, 3],
"processed_data": ["Sample data 1", "Sample data 2", "Sample data 3"],
"note": [f"Mock LLM processed SQL (Error: {str(e)})",
"This is demo data",
"Replace with actual LLM integration"]
})
# LLM Processing function
def process_sql_with_llm(sql_content, custom_prompt=""):
"""
Process SQL query with LLM and return a DataFrame
Replace this with your actual LLM implementation
"""
default_prompt = """
Analyze the following SQL query and generate appropriate sample data that
would result from executing this query. Return the data in a structured format
that can be easily converted to CSV. Consider the table structure, column names,
and data types implied by the SQL.
"""
full_prompt = default_prompt + "\n" + custom_prompt + "\n\nSQL Query:\n" + sql_content
if OPENAI_AVAILABLE:
try:
client = OpenAI() # Will use OPENAI_API_KEY from environment
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are a SQL and data expert."},
{"role": "user", "content": full_prompt}
]
)
# Parse LLM response to DataFrame (this will depend on your LLM's output format)
# This is a simplified example - you'll need to adjust based on actual LLM responses
llm_output = response.choices[0].message.content
# Try to extract data from LLM response
# This is a placeholder - implement based on your LLM's response format
return mock_llm_process(sql_content, custom_prompt)
except Exception as e:
st.error(f"LLM API Error: {e}")
return mock_llm_process(sql_content, custom_prompt)
else:
# Use mock processing if OpenAI not available
return mock_llm_process(sql_content, custom_prompt)
# Function to save dataframe as CSV
def save_as_csv(df, filename):
"""Save DataFrame as CSV in output folder"""
output_path = os.path.join(OUTPUT_FOLDER, filename)
df.to_csv(output_path, index=False)
return output_path
# Function to log workflow events
def log_workflow_event(event_type, description, file_name=""):
"""Log workflow events to a file"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"{timestamp} | {event_type} | {file_name} | {description}\n"
log_file = os.path.join(LOGS_FOLDER, "workflow.log")
with open(log_file, "a") as f:
f.write(log_entry)
return log_entry
# Function to get workflow logs
def get_workflow_logs():
"""Get all workflow logs"""
log_file = os.path.join(LOGS_FOLDER, "workflow.log")
if os.path.exists(log_file):
with open(log_file, "r") as f:
return f.readlines()
return []
# Main App
st.set_page_config(page_title="Agentic AI Agents", layout="wide")
st.title("Agentic AI Agents")
# Create tabs
tab1, tab2, tab3, tab4 = st.tabs(["Data Engineer Agent", "QA Agent", "DevOps Agent", "Workflow"])
# Data Engineer Agent Tab
with tab1:
st.header("Data Engineer Agent")
st.write("Upload SQL files for processing")
# File uploader
uploaded_file = st.file_uploader("Choose a SQL file", type=['sql', 'txt'])
if uploaded_file is not None:
# Display file details
st.write(f"Filename: {uploaded_file.name}")
st.write(f"File size: {uploaded_file.size} bytes")
# Read file content
stringio = StringIO(uploaded_file.getvalue().decode("utf-8"))
sql_content = stringio.read()
# Display SQL content
st.subheader("SQL Content")
st.code(sql_content, language="sql")
# Save uploaded file
upload_path = os.path.join(UPLOAD_FOLDER, uploaded_file.name)
with open(upload_path, "w") as f:
f.write(sql_content)
# Log the upload event
log_workflow_event("UPLOAD", "File uploaded successfully", uploaded_file.name)
st.success(f"File saved to {upload_path}")
# Custom prompt input
st.subheader("Custom Processing Instructions")
custom_prompt = st.text_area(
"Add custom instructions for the LLM (optional):",
placeholder="E.g., Focus on specific columns, apply certain transformations, etc."
)
# Process button
if st.button("Process SQL with LLM"):
with st.spinner("Processing with LLM..."):
try:
# Process SQL with LLM
result_df = process_sql_with_llm(sql_content, custom_prompt)
# Display results
st.subheader("Processed Results")
st.dataframe(result_df)
# Save as CSV
csv_filename = os.path.splitext(uploaded_file.name)[0] + ".csv"
csv_path = save_as_csv(result_df, csv_filename)
# Log the processing event
log_workflow_event("PROCESS", "SQL processed and CSV generated", csv_filename)
st.success(f"Results saved to {csv_path}")
# Provide download button
csv_data = result_df.to_csv(index=False)
st.download_button(
label="Download CSV",
data=csv_data,
file_name=csv_filename,
mime="text/csv"
)
except Exception as e:
st.error(f"Error processing file: {e}")
log_workflow_event("ERROR", f"Processing error: {str(e)}", uploaded_file.name)
# QA Agent Tab
with tab2:
st.header("QA Agent")
st.write("Quality Assurance operations and testing")
# List processed files
output_files = [f for f in os.listdir(OUTPUT_FOLDER) if f.endswith('.csv')]
if output_files:
st.subheader("Available Processed Files")
selected_file = st.selectbox("Select a file to review:", output_files)
if selected_file:
file_path = os.path.join(OUTPUT_FOLDER, selected_file)
df = pd.read_csv(file_path)
st.write(f"### {selected_file}")
st.dataframe(df)
# Basic QA metrics
st.subheader("QA Metrics")
col1, col2, col3 = st.columns(3)
with col1:
st.metric("Rows", len(df))
with col2:
st.metric("Columns", len(df.columns))
with col3:
missing_values = df.isnull().sum().sum()
st.metric("Missing Values", missing_values)
# Data quality checks
st.subheader("Data Quality Checks")
issues_found = 0
for col in df.columns:
if df[col].isnull().any():
st.warning(f"Column '{col}' contains {df[col].isnull().sum()} null values")
issues_found += 1
# Duplicate rows check
duplicate_rows = df.duplicated().sum()
if duplicate_rows > 0:
st.warning(f"Found {duplicate_rows} duplicate rows")
issues_found += 1
# Log QA review
if st.button("Approve File for Deployment"):
if issues_found == 0:
log_workflow_event("QA_APPROVE", "File approved with no issues", selected_file)
st.success(f"βœ… {selected_file} approved for deployment!")
else:
if st.checkbox("Approve despite issues"):
log_workflow_event("QA_APPROVE", f"File approved with {issues_found} known issues", selected_file)
st.success(f"βœ… {selected_file} approved for deployment (with known issues)!")
else:
st.info("Please acknowledge issues before approving")
if st.button("Flag File for Re-processing"):
log_workflow_event("QA_REJECT", f"File rejected due to {issues_found} issues", selected_file)
st.warning(f"⚠️ {selected_file} flagged for re-processing")
else:
st.info("No processed files available. Upload and process SQL files in the Data Engineer Agent tab.")
# DevOps Agent Tab
with tab3:
st.header("DevOps Agent")
st.write("System monitoring and operations")
# System status
st.subheader("System Status")
col1, col2 = st.columns(2)
with col1:
st.metric("Upload Folder", UPLOAD_FOLDER)
upload_files = len([f for f in os.listdir(UPLOAD_FOLDER) if f.endswith(('.sql', '.txt'))])
st.metric("Uploaded SQL Files", upload_files)
with col2:
st.metric("Output Folder", OUTPUT_FOLDER)
output_files_count = len([f for f in os.listdir(OUTPUT_FOLDER) if f.endswith('.csv')])
st.metric("Generated CSV Files", output_files_count)
# Folder contents
st.subheader("Upload Folder Contents")
upload_files_list = os.listdir(UPLOAD_FOLDER)
if upload_files_list:
for file in upload_files_list:
st.text(f"πŸ“„ {file}")
else:
st.info("No files in upload folder")
st.subheader("Output Folder Contents")
output_files_list = os.listdir(OUTPUT_FOLDER)
if output_files_list:
for file in output_files_list:
col1, col2, col3 = st.columns([3, 1, 1])
with col1:
st.text(f"πŸ“Š {file}")
with col2:
if st.button("Deploy", key=f"deploy_{file}"):
log_workflow_event("DEPLOY", "File deployed to production", file)
st.success(f"{file} deployed!")
with col3:
if st.button("Archive", key=f"archive_{file}"):
log_workflow_event("ARCHIVE", "File archived", file)
st.info(f"{file} archived")
# System health
st.subheader("System Health")
st.progress(95)
st.caption("System operational: 95%")
# Cleanup options
st.subheader("Maintenance")
col1, col2 = st.columns(2)
with col1:
if st.button("Clear Upload Folder"):
for file in os.listdir(UPLOAD_FOLDER):
os.remove(os.path.join(UPLOAD_FOLDER, file))
log_workflow_event("MAINTENANCE", "Upload folder cleared")
st.success("Upload folder cleared!")
st.experimental_rerun()
with col2:
if st.button("Clear Output Folder"):
for file in os.listdir(OUTPUT_FOLDER):
os.remove(os.path.join(OUTPUT_FOLDER, file))
log_workflow_event("MAINTENANCE", "Output folder cleared")
st.success("Output folder cleared!")
st.experimental_rerun()
# Workflow Tab
with tab4:
st.header("End-to-End Workflow")
st.write("Visualizing the workflow from Data Engineering β†’ QA β†’ DevOps")
# Workflow explanation
st.markdown("""
### Workflow Overview
This system follows a structured workflow to ensure data quality and proper deployment:
1. **Data Engineer Agent**: Uploads and processes SQL files using LLM
2. **QA Agent**: Reviews processed data for quality and approves for deployment
3. **DevOps Agent**: Deploys approved files and maintains system health
Each step must be completed before moving to the next, ensuring proper governance.
""")
# Create a visual workflow
st.subheader("Workflow Visualization")
# Get recent workflow for a specific file (if any)
logs = get_workflow_logs()
files_in_workflow = set()
# Extract unique file names from logs
for log in logs:
parts = log.split(" | ")
if len(parts) >= 4 and parts[2].strip():
files_in_workflow.add(parts[2].strip())
if files_in_workflow:
selected_workflow_file = st.selectbox("Select a file to view its workflow:", list(files_in_workflow))
if selected_workflow_file:
# Filter logs for this file
file_logs = [log for log in logs if selected_workflow_file in log]
st.write(f"### Workflow for: {selected_workflow_file}")
# Create workflow status indicators
col1, col2, col3 = st.columns(3)
# Check status for each stage
uploaded = any("UPLOAD" in log and selected_workflow_file in log for log in logs)
processed = any("PROCESS" in log and selected_workflow_file in log for log in logs)
qa_approved = any("QA_APPROVE" in log and selected_workflow_file in log for log in logs)
deployed = any("DEPLOY" in log and selected_workflow_file in log for log in logs)
archived = any("ARCHIVE" in log and selected_workflow_file in log for log in logs)
with col1:
st.markdown("### 1. Data Engineering")
if uploaded:
st.success("βœ… File Uploaded")
else:
st.error("❌ Not Uploaded")
if processed:
st.success("βœ… SQL Processed")
else:
st.warning("⏳ Pending Processing")
if uploaded and not processed:
st.info("Next: Process the SQL file with LLM")
with col2:
st.markdown("### 2. QA Review")
if processed and not qa_approved:
st.warning("⏳ Pending QA Approval")
elif qa_approved:
st.success("βœ… QA Approved")
qa_log = next((log for log in file_logs if "QA_APPROVE" in log), "")
if "known issues" in qa_log:
st.warning("Approved with known issues")
else:
st.error("❌ Cannot start QA")
st.caption("Complete Data Engineering first")
if processed and not qa_approved:
st.info("Next: Review in QA Agent tab")
with col3:
st.markdown("### 3. DevOps Deployment")
if qa_approved and not deployed:
st.warning("⏳ Ready for Deployment")
elif deployed:
st.success("βœ… Deployed to Production")
else:
st.error("❌ Cannot Deploy")
st.caption("Complete QA Approval first")
if archived:
st.info("πŸ—ƒοΈ File Archived")
if qa_approved and not deployed:
st.info("Next: Deploy in DevOps Agent tab")
# Show detailed timeline
st.subheader("Detailed Timeline")
for log in reversed(file_logs): # Show newest first
parts = log.strip().split(" | ")
if len(parts) >= 4:
timestamp, event_type, filename, description = parts[:4]
# Color code based on event type
if event_type in ["UPLOAD", "PROCESS"]:
st.info(f"**{timestamp}** - πŸ› οΈ **{event_type}**: {description}")
elif event_type in ["QA_APPROVE", "QA_REJECT"]:
if "QA_APPROVE" in event_type:
st.success(f"**{timestamp}** - βœ… **{event_type}**: {description}")
else:
st.error(f"**{timestamp}** - ❌ **{event_type}**: {description}")
elif event_type in ["DEPLOY", "ARCHIVE"]:
st.success(f"**{timestamp}** - πŸš€ **{event_type}**: {description}")
elif event_type == "ERROR":
st.error(f"**{timestamp}** - ❌ **{event_type}**: {description}")
else:
st.write(f"**{timestamp}** - **{event_type}**: {description}")
# Show workflow statistics
st.subheader("Workflow Statistics")
total_files = len(files_in_workflow)
fully_processed = sum(1 for f in files_in_workflow if
any("DEPLOY" in log and f in log for log in logs))
qa_pending = sum(1 for f in files_in_workflow if
any("PROCESS" in log and f in log for log in logs) and
not any("QA_APPROVE" in log and f in log for log in logs))
col1, col2, col3 = st.columns(3)
col1.metric("Total Files in Workflow", total_files)
col2.metric("Successfully Deployed", fully_processed)
col3.metric("Pending QA Approval", qa_pending)
# Show completion rate
if total_files > 0:
completion_rate = (fully_processed / total_files) * 100
st.progress(completion_rate / 100)
st.caption(f"Workflow Completion Rate: {completion_rate:.1f}%")
else:
st.info("No workflow data available yet. Start by uploading a file in the Data Engineer Agent tab.")
# Show sample workflow
st.subheader("Sample Workflow")
st.markdown("""
When you process a file, you'll see a workflow like this:
**1. Data Engineering Phase**
- βœ… File Uploaded
- βœ… SQL Processed
**2. QA Phase**
- βœ… QA Approved
**3. DevOps Phase**
- βœ… Deployed to Production
- πŸ—ƒοΈ File Archived
""")
# Footer
st.markdown("---")
st.caption("Agentic AI Agents System - Built with Streamlit")