Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| """ | |
| Combined Multimodal AI Suite | |
| - TorchTransformers-Diffusion-CV-SFT functionality (Camera, PDF, OCR, diffusion image gen, etc.) | |
| - GPT-4o Omni: Text, Audio, Image, Video processing with chat and paper search | |
| - Python Code Interpreter for code generation and execution | |
| This app integrates all modalities and adds an “Integrated Workflow” tab that enables you to: | |
| • Upload documents (e.g. double-page papers) | |
| • Extract text via OCR and image processing | |
| • Prompt GPT to generate Python code based on the extracted text | |
| • Display and execute the generated code | |
| Developed with Streamlit. | |
| """ | |
| import aiofiles | |
| import asyncio | |
| import base64 | |
| import fitz | |
| import glob | |
| import logging | |
| import os | |
| import pandas as pd | |
| import pytz | |
| import random | |
| import re | |
| import requests | |
| import shutil | |
| import streamlit as st | |
| import time | |
| import torch | |
| import zipfile | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from diffusers import StableDiffusionPipeline | |
| from io import BytesIO | |
| from openai import OpenAI | |
| from PIL import Image | |
| from transformers import AutoModelForCausalLM, AutoTokenizer, AutoModel | |
| from typing import Optional | |
| # --- Additional Imports from GPT-4o Omni --- | |
| import cv2 | |
| import json | |
| import streamlit.components.v1 as components | |
| import textract | |
| from audio_recorder_streamlit import audio_recorder | |
| from bs4 import BeautifulSoup | |
| from collections import deque | |
| from dotenv import load_dotenv | |
| from gradio_client import Client, handle_file | |
| from huggingface_hub import InferenceClient | |
| from moviepy import VideoFileClip | |
| from urllib.parse import quote | |
| from xml.etree import ElementTree as ET | |
| import openai | |
| # --- Code Interpreter Imports --- | |
| import io | |
| import sys | |
| from contextlib import redirect_stdout | |
| import mistune | |
| # Load environment variables | |
| load_dotenv() | |
| # ------------------ Global Configuration ------------------ | |
| st.set_page_config( | |
| page_title="Combined Multimodal AI Suite 🚀", | |
| page_icon="🤖", | |
| layout="wide", | |
| initial_sidebar_state="expanded", | |
| menu_items={ | |
| 'Get Help': 'https://huggingface.co/awacke1', | |
| 'Report a Bug': 'https://huggingface.co/spaces/awacke1', | |
| 'About': "Combined Multimodal AI Suite: Camera, OCR, Chat, Code Generation & Execution" | |
| } | |
| ) | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| log_records = [] | |
| class LogCaptureHandler(logging.Handler): | |
| def emit(self, record): | |
| log_records.append(record) | |
| logger.addHandler(LogCaptureHandler()) | |
| # ------------------ Session State Defaults ------------------ | |
| if 'history' not in st.session_state: | |
| st.session_state.history = [] | |
| if 'messages' not in st.session_state: | |
| st.session_state.messages = [] | |
| if 'gallery_files' not in st.session_state: | |
| st.session_state.gallery_files = [] | |
| if 'builder' not in st.session_state: | |
| st.session_state.builder = None | |
| if 'model_loaded' not in st.session_state: | |
| st.session_state.model_loaded = False | |
| if 'processing' not in st.session_state: | |
| st.session_state.processing = {} | |
| if 'asset_checkboxes' not in st.session_state: | |
| st.session_state.asset_checkboxes = {} | |
| if 'downloaded_pdfs' not in st.session_state: | |
| st.session_state.downloaded_pdfs = {} | |
| if 'unique_counter' not in st.session_state: | |
| st.session_state.unique_counter = 0 | |
| # ------------------ Utility Functions ------------------ | |
| def generate_filename(prompt, file_type): | |
| """Generates a safe filename based on prompt and file type.""" | |
| central = pytz.timezone('US/Central') | |
| safe_date_time = datetime.now(central).strftime("%m%d_%H%M") | |
| replaced_prompt = prompt.replace(" ", "_").replace("\n", "_") | |
| safe_prompt = "".join(x for x in replaced_prompt if x.isalnum() or x == "_")[:90] | |
| return f"{safe_date_time}_{safe_prompt}.{file_type}" | |
| def get_download_link(file_path, mime_type="application/octet-stream", label="Download"): | |
| with open(file_path, "rb") as f: | |
| b64 = base64.b64encode(f.read()).decode() | |
| return f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">{label}</a>' | |
| def zip_directory(directory_path, zip_path): | |
| with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: | |
| for root, _, files in os.walk(directory_path): | |
| for file in files: | |
| zipf.write(os.path.join(root, file), | |
| os.path.relpath(os.path.join(root, file), os.path.dirname(directory_path))) | |
| def get_gallery_files(file_types=["png", "pdf", "md"]): | |
| return sorted(list({f for ext in file_types for f in glob.glob(f"*.{ext}")})) | |
| def download_pdf(url, output_path): | |
| try: | |
| response = requests.get(url, stream=True, timeout=10) | |
| if response.status_code == 200: | |
| with open(output_path, "wb") as f: | |
| for chunk in response.iter_content(chunk_size=8192): | |
| f.write(chunk) | |
| return True | |
| except requests.RequestException as e: | |
| logger.error(f"Failed to download {url}: {e}") | |
| return False | |
| # ------------------ Model & Diffusion Builders ------------------ | |
| class ModelConfig: | |
| name: str | |
| base_model: str | |
| size: str | |
| domain: Optional[str] = None | |
| model_type: str = "causal_lm" | |
| def model_path(self): | |
| return f"models/{self.name}" | |
| class DiffusionConfig: | |
| name: str | |
| base_model: str | |
| size: str | |
| domain: Optional[str] = None | |
| def model_path(self): | |
| return f"diffusion_models/{self.name}" | |
| class ModelBuilder: | |
| def __init__(self): | |
| self.config = None | |
| self.model = None | |
| self.tokenizer = None | |
| self.jokes = [ | |
| "Why did the AI go to therapy? Too many layers to unpack! 😂", | |
| "Training complete! Time for a binary coffee break. ☕", | |
| "I told my neural network a joke; it couldn't stop dropping bits! 🤖" | |
| ] | |
| def load_model(self, model_path: str, config: Optional[ModelConfig] = None): | |
| with st.spinner(f"Loading model from {model_path}..."): | |
| self.model = AutoModelForCausalLM.from_pretrained(model_path) | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| if self.tokenizer.pad_token is None: | |
| self.tokenizer.pad_token = self.tokenizer.eos_token | |
| if config: | |
| self.config = config | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| self.model.to(device) | |
| st.success(f"Model loaded! {random.choice(self.jokes)}") | |
| return self | |
| def save_model(self, path: str): | |
| with st.spinner("Saving model..."): | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| self.model.save_pretrained(path) | |
| self.tokenizer.save_pretrained(path) | |
| st.success(f"Model saved at {path}!") | |
| class DiffusionBuilder: | |
| def __init__(self): | |
| self.config = None | |
| self.pipeline = None | |
| def load_model(self, model_path: str, config: Optional[DiffusionConfig] = None): | |
| with st.spinner(f"Loading diffusion model from {model_path}..."): | |
| self.pipeline = StableDiffusionPipeline.from_pretrained(model_path, torch_dtype=torch.float32).to("cpu") | |
| if config: | |
| self.config = config | |
| st.success("Diffusion model loaded!") | |
| return self | |
| def save_model(self, path: str): | |
| with st.spinner("Saving diffusion model..."): | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| self.pipeline.save_pretrained(path) | |
| st.success(f"Diffusion model saved at {path}!") | |
| def generate(self, prompt: str): | |
| return self.pipeline(prompt, num_inference_steps=20).images[0] | |
| # ------------------ OCR & Image Processing Functions ------------------ | |
| async def process_ocr(image, output_file): | |
| start_time = time.time() | |
| status = st.empty() | |
| status.text("Processing OCR... (0s)") | |
| tokenizer = AutoTokenizer.from_pretrained("ucaslcl/GOT-OCR2_0", trust_remote_code=True) | |
| model = AutoModel.from_pretrained("ucaslcl/GOT-OCR2_0", trust_remote_code=True, torch_dtype=torch.float32).to("cpu").eval() | |
| temp_file = f"temp_{int(time.time())}.png" | |
| image.save(temp_file) | |
| result = model.chat(tokenizer, temp_file, ocr_type='ocr') | |
| os.remove(temp_file) | |
| elapsed = int(time.time() - start_time) | |
| status.text(f"OCR completed in {elapsed}s!") | |
| async with aiofiles.open(output_file, "w") as f: | |
| await f.write(result) | |
| return result | |
| async def process_image_gen(prompt, output_file): | |
| start_time = time.time() | |
| status = st.empty() | |
| status.text("Generating image... (0s)") | |
| # Use diffusion builder from session if available; otherwise load a default | |
| if st.session_state.get('builder') and isinstance(st.session_state.builder, DiffusionBuilder): | |
| pipeline = st.session_state.builder.pipeline | |
| else: | |
| pipeline = StableDiffusionPipeline.from_pretrained("OFA-Sys/small-stable-diffusion-v0", torch_dtype=torch.float32).to("cpu") | |
| gen_image = pipeline(prompt, num_inference_steps=20).images[0] | |
| elapsed = int(time.time() - start_time) | |
| status.text(f"Image generation completed in {elapsed}s!") | |
| gen_image.save(output_file) | |
| return gen_image | |
| def process_image_with_prompt(image, prompt, model="gpt-4o-mini", detail="auto"): | |
| buffered = BytesIO() | |
| image.save(buffered, format="PNG") | |
| img_str = base64.b64encode(buffered.getvalue()).decode("utf-8") | |
| messages = [{ | |
| "role": "user", | |
| "content": [ | |
| {"type": "text", "text": prompt}, | |
| {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{img_str}", "detail": detail}} | |
| ] | |
| }] | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'), organization=os.getenv('OPENAI_ORG_ID')) | |
| try: | |
| response = client.chat.completions.create(model=model, messages=messages, max_tokens=300) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| def process_text_with_prompt(text, prompt, model="gpt-4o-mini"): | |
| messages = [{"role": "user", "content": f"{prompt}\n\n{text}"}] | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'), organization=os.getenv('OPENAI_ORG_ID')) | |
| try: | |
| response = client.chat.completions.create(model=model, messages=messages, max_tokens=300) | |
| return response.choices[0].message.content | |
| except Exception as e: | |
| return f"Error: {str(e)}" | |
| # ------------------ PDF Processing Functions ------------------ | |
| async def process_pdf_snapshot(pdf_path, mode="single"): | |
| start_time = time.time() | |
| status = st.empty() | |
| status.text(f"Processing PDF Snapshot ({mode})... (0s)") | |
| try: | |
| doc = fitz.open(pdf_path) | |
| output_files = [] | |
| if mode == "single": | |
| page = doc[0] | |
| pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) | |
| output_file = generate_filename("single_snapshot", "png") | |
| pix.save(output_file) | |
| output_files.append(output_file) | |
| elif mode == "twopage": | |
| for i in range(min(2, len(doc))): | |
| page = doc[i] | |
| pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) | |
| output_file = generate_filename(f"twopage_{i}", "png") | |
| pix.save(output_file) | |
| output_files.append(output_file) | |
| elif mode == "allpages": | |
| for i in range(len(doc)): | |
| page = doc[i] | |
| pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) | |
| output_file = generate_filename(f"page_{i}", "png") | |
| pix.save(output_file) | |
| output_files.append(output_file) | |
| doc.close() | |
| elapsed = int(time.time() - start_time) | |
| status.text(f"PDF Snapshot ({mode}) completed in {elapsed}s!") | |
| return output_files | |
| except Exception as e: | |
| status.error(f"Error: {str(e)}") | |
| return [] | |
| # ------------------ GPT & Chat Functions ------------------ | |
| def process_text(text_input): | |
| if text_input: | |
| st.session_state.messages.append({"role": "user", "content": text_input}) | |
| with st.chat_message("user"): | |
| st.markdown(text_input) | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'), organization=os.getenv('OPENAI_ORG_ID')) | |
| with st.chat_message("assistant"): | |
| completion = client.chat.completions.create( | |
| model="gpt-4o-2024-05-13", | |
| messages=st.session_state.messages, | |
| stream=False | |
| ) | |
| return_text = completion.choices[0].message.content | |
| st.write("Assistant: " + return_text) | |
| st.session_state.messages.append({"role": "assistant", "content": return_text}) | |
| return return_text | |
| def process_text2(text_input, model="gpt-4o-2024-05-13"): | |
| if text_input: | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'), organization=os.getenv('OPENAI_ORG_ID')) | |
| completion = client.chat.completions.create( | |
| model=model, | |
| messages=st.session_state.messages, | |
| stream=False | |
| ) | |
| return_text = completion.choices[0].message.content | |
| st.write("Assistant: " + return_text) | |
| st.session_state.messages.append({"role": "assistant", "content": return_text}) | |
| return return_text | |
| # ------------------ Audio & Video Processing Functions ------------------ | |
| def SpeechSynthesis(result): | |
| documentHTML5 = f''' | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>Read It Aloud</title> | |
| <script type="text/javascript"> | |
| function readAloud() {{ | |
| const text = document.getElementById("textArea").value; | |
| const speech = new SpeechSynthesisUtterance(text); | |
| window.speechSynthesis.speak(speech); | |
| }} | |
| </script> | |
| </head> | |
| <body> | |
| <h1>🔊 Read It Aloud</h1> | |
| <textarea id="textArea" rows="10" cols="80">{result}</textarea> | |
| <br> | |
| <button onclick="readAloud()">🔊 Read Aloud</button> | |
| </body> | |
| </html> | |
| ''' | |
| components.html(documentHTML5, width=1280, height=300) | |
| def process_audio(audio_input, text_input=''): | |
| if audio_input: | |
| # Save and read audio bytes | |
| with open("temp_audio.wav", "wb") as file: | |
| file.write(audio_input.getvalue()) | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'), organization=os.getenv('OPENAI_ORG_ID')) | |
| transcription = client.audio.transcriptions.create(model="whisper-1", file=open("temp_audio.wav", "rb")) | |
| st.session_state.messages.append({"role": "user", "content": transcription.text}) | |
| with st.chat_message("assistant"): | |
| st.markdown(transcription.text) | |
| SpeechSynthesis(transcription.text) | |
| filename = generate_filename(transcription.text, "md") | |
| with open(filename, "w", encoding="utf-8") as f: | |
| f.write(transcription.text) | |
| return transcription.text | |
| def process_video_and_audio(video_input): | |
| if video_input: | |
| # Save video file | |
| video_path = video_input.name | |
| with open(video_path, "wb") as f: | |
| f.write(video_input.getbuffer()) | |
| # Extract frames | |
| base64Frames = [] | |
| video = cv2.VideoCapture(video_path) | |
| total_frames = int(video.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| fps = video.get(cv2.CAP_PROP_FPS) | |
| frames_to_skip = int(fps * 1) # 1 second per frame | |
| curr_frame = 0 | |
| while curr_frame < total_frames - 1: | |
| video.set(cv2.CAP_PROP_POS_FRAMES, curr_frame) | |
| success, frame = video.read() | |
| if not success: | |
| break | |
| _, buffer = cv2.imencode(".jpg", frame) | |
| base64Frames.append(base64.b64encode(buffer).decode("utf-8")) | |
| curr_frame += frames_to_skip | |
| video.release() | |
| # Audio transcription from video | |
| try: | |
| clip = VideoFileClip(video_path) | |
| audio_path = f"{os.path.splitext(video_path)[0]}.mp3" | |
| clip.audio.write_audiofile(audio_path, bitrate="32k") | |
| clip.audio.close() | |
| clip.close() | |
| with open(audio_path, "rb") as f: | |
| audio_data = f.read() | |
| client = OpenAI(api_key=os.getenv('OPENAI_API_KEY'), organization=os.getenv('OPENAI_ORG_ID')) | |
| transcription = client.audio.transcriptions.create(model="whisper-1", file=BytesIO(audio_data)) | |
| except Exception as e: | |
| transcription = type("Dummy", (), {"text": "No transcript available."})() | |
| # Display frames and transcript | |
| st.markdown("### Video Frames") | |
| for frame_b64 in base64Frames: | |
| st.image(f"data:image/jpg;base64,{frame_b64}", use_column_width=True) | |
| st.markdown("### Audio Transcription") | |
| st.write(transcription.text) | |
| return transcription.text | |
| # ------------------ Python Code Executor Functions ------------------ | |
| def extract_python_code(markdown_text): | |
| pattern = r"```python\s*(.*?)\s*```" | |
| matches = re.findall(pattern, markdown_text, re.DOTALL) | |
| return matches | |
| def execute_code(code): | |
| buffer = io.StringIO() | |
| local_vars = {} | |
| try: | |
| with redirect_stdout(buffer): | |
| exec(code, {}, local_vars) | |
| output = buffer.getvalue() | |
| return output, None | |
| except Exception as e: | |
| return None, str(e) | |
| finally: | |
| buffer.close() | |
| def create_and_save_file(filename, prompt, response, should_save=True): | |
| if not should_save: | |
| return | |
| base_filename, ext = os.path.splitext(filename) | |
| if ext in ['.txt', '.htm', '.md']: | |
| with open(f"{base_filename}.md", 'w', encoding='utf-8') as file: | |
| file.write(response) | |
| # ------------------ Integrated Workflow Function ------------------ | |
| def integrated_workflow(): | |
| st.header("Integrated Workflow: From Paper to Code") | |
| st.markdown(""" | |
| 1. **Upload a PDF or Image** of a paper (double-page images work best). | |
| 2. **Run OCR** to extract text. | |
| 3. **Generate Python Code** based on the extracted text using GPT. | |
| 4. **Review and Execute** the generated code. | |
| """) | |
| uploaded_file = st.file_uploader("Upload PDF or Image", type=["pdf", "png", "jpg", "jpeg"], key="integrated_file") | |
| if uploaded_file: | |
| # Save the uploaded file | |
| file_path = f"uploaded_{uploaded_file.name}" | |
| with open(file_path, "wb") as f: | |
| f.write(uploaded_file.getvalue()) | |
| st.success(f"Uploaded file saved as {file_path}") | |
| # If PDF, show first page snapshot; if image, load directly. | |
| if uploaded_file.type == "application/pdf": | |
| mode = st.selectbox("Snapshot Mode", ["single", "twopage", "allpages"]) | |
| snapshots = asyncio.run(process_pdf_snapshot(file_path, mode)) | |
| for snapshot in snapshots: | |
| st.image(Image.open(snapshot), caption=f"Snapshot: {snapshot}", use_column_width=True) | |
| else: | |
| st.image(Image.open(file_path), caption="Uploaded Image", use_column_width=True) | |
| # Run OCR on the file (using first page or the image itself) | |
| if st.button("Run OCR on File"): | |
| if uploaded_file.type == "application/pdf": | |
| doc = fitz.open(file_path) | |
| page = doc[0] | |
| pix = page.get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) | |
| temp_img = f"ocr_{os.path.basename(file_path)}.png" | |
| pix.save(temp_img) | |
| doc.close() | |
| image = Image.open(temp_img) | |
| else: | |
| image = Image.open(file_path) | |
| ocr_output_file = generate_filename("ocr_output", "txt") | |
| ocr_result = asyncio.run(process_ocr(image, ocr_output_file)) | |
| st.text_area("OCR Output", ocr_result, height=200) | |
| # Use extracted OCR text as prompt to generate python code | |
| st.markdown("### Generate Python Code from OCR Text") | |
| code_prompt = st.text_area("Edit Prompt for Code Generation", value=f"Generate a Python script that processes the following scientific text:\n\n{ocr_result}", height=200) | |
| if st.button("Generate Code"): | |
| code_generated = process_text_with_prompt(ocr_result, code_prompt, model="gpt-4o-mini") | |
| st.code(code_generated, language="python") | |
| # Save generated code | |
| code_filename = generate_filename("generated_code", "py") | |
| with open(code_filename, "w", encoding="utf-8") as f: | |
| f.write(code_generated) | |
| st.markdown(get_download_link(code_filename, "text/plain", "Download Generated Code"), unsafe_allow_html=True) | |
| # Optionally execute the generated code | |
| if st.button("Execute Generated Code"): | |
| output, error = execute_code(code_generated) | |
| if error: | |
| st.error(f"Error executing code:\n{error}") | |
| else: | |
| st.success("Code executed successfully. Output:") | |
| st.code(output) | |
| # ------------------ Sidebar: Asset Gallery & Logs ------------------ | |
| def update_gallery(): | |
| container = st.sidebar.empty() | |
| all_files = get_gallery_files() | |
| if all_files: | |
| container.markdown("### Asset Gallery") | |
| cols = container.columns(2) | |
| for idx, file in enumerate(all_files[:st.session_state.get('gallery_size', 5)]): | |
| with cols[idx % 2]: | |
| if file.endswith('.png'): | |
| st.image(Image.open(file), caption=os.path.basename(file), use_column_width=True) | |
| else: | |
| st.markdown(os.path.basename(file)) | |
| if st.button("Delete "+os.path.basename(file), key="del_"+file): | |
| os.remove(file) | |
| st.experimental_rerun() | |
| update_gallery() | |
| st.sidebar.subheader("Action Logs") | |
| for record in log_records: | |
| st.sidebar.write(f"{record.asctime} - {record.levelname} - {record.message}") | |
| # ------------------ Main App Navigation ------------------ | |
| st.title("Combined Multimodal AI Suite") | |
| tabs = st.tabs(["Home", "Camera & Images", "PDF & Documents", "Multimodal Chat", "Code Executor", "Integrated Workflow"]) | |
| # --- Home Tab --- | |
| with tabs[0]: | |
| st.header("Welcome to the Combined Multimodal AI Suite") | |
| st.markdown(""" | |
| This application integrates multiple AI functionalities: | |
| - **Camera & Image Processing:** Capture images, generate new images using diffusion models. | |
| - **PDF & Document Processing:** Download PDFs, perform OCR, and generate markdown summaries. | |
| - **Multimodal Chat:** Chat with GPT-4o using text, audio, image, and video inputs. | |
| - **Code Executor:** Write, generate, and execute Python code interactively. | |
| - **Integrated Workflow:** Seamlessly extract text from papers and generate & run Python code. | |
| Use the tabs above to explore each modality. | |
| """) | |
| # --- Camera & Images Tab --- | |
| with tabs[1]: | |
| st.header("Camera & Image Processing") | |
| st.subheader("Capture and Process Images") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| cam0_img = st.camera_input("Take a picture - Cam 0", key="cam0") | |
| if cam0_img: | |
| filename = generate_filename("cam0_snapshot", "png") | |
| with open(filename, "wb") as f: | |
| f.write(cam0_img.getvalue()) | |
| st.image(Image.open(filename), caption="Camera 0 Snapshot", use_column_width=True) | |
| st.session_state.history.append(f"Captured {filename}") | |
| with col2: | |
| cam1_img = st.camera_input("Take a picture - Cam 1", key="cam1") | |
| if cam1_img: | |
| filename = generate_filename("cam1_snapshot", "png") | |
| with open(filename, "wb") as f: | |
| f.write(cam1_img.getvalue()) | |
| st.image(Image.open(filename), caption="Camera 1 Snapshot", use_column_width=True) | |
| st.session_state.history.append(f"Captured {filename}") | |
| st.markdown("---") | |
| st.subheader("Generate New Image with Diffusion") | |
| prompt_img = st.text_input("Enter prompt for image generation", "A neon futuristic cityscape") | |
| if st.button("Generate Image"): | |
| output_file = generate_filename("gen_output", "png") | |
| result_img = asyncio.run(process_image_gen(prompt_img, output_file)) | |
| st.image(result_img, caption="Generated Image", use_column_width=True) | |
| # --- PDF & Documents Tab --- | |
| with tabs[2]: | |
| st.header("PDF & Document Processing") | |
| st.subheader("Download and Process PDFs") | |
| url_input = st.text_area("Enter PDF URLs (one per line)", height=100) | |
| if st.button("Download PDFs"): | |
| urls = [u.strip() for u in url_input.splitlines() if u.strip()] | |
| progress_bar = st.progress(0) | |
| for idx, url in enumerate(urls): | |
| output_path = generate_filename(url, "pdf") | |
| if download_pdf(url, output_path): | |
| st.session_state.downloaded_pdfs[url] = output_path | |
| st.success(f"Downloaded: {output_path}") | |
| progress_bar.progress((idx + 1) / len(urls)) | |
| st.markdown("---") | |
| st.subheader("OCR & PDF Snapshot") | |
| all_assets = get_gallery_files() | |
| selected_asset = st.selectbox("Select an asset", all_assets) if all_assets else None | |
| if selected_asset and st.button("Run OCR on Selected"): | |
| if selected_asset.endswith('.png'): | |
| image = Image.open(selected_asset) | |
| else: | |
| doc = fitz.open(selected_asset) | |
| pix = doc[0].get_pixmap(matrix=fitz.Matrix(2.0, 2.0)) | |
| image = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| doc.close() | |
| output_file = generate_filename("ocr_output", "txt") | |
| ocr_result = asyncio.run(process_ocr(image, output_file)) | |
| st.text_area("OCR Result", ocr_result, height=200) | |
| st.markdown("---") | |
| st.subheader("Markdown Gallery") | |
| md_files = sorted(glob.glob("*.md")) | |
| if md_files: | |
| for md in md_files: | |
| st.markdown(f"**{md}**") | |
| st.markdown(get_download_link(md, "text/markdown", "Download MD"), unsafe_allow_html=True) | |
| # --- Multimodal Chat Tab --- | |
| with tabs[3]: | |
| st.header("Multimodal Chat") | |
| st.markdown("Chat with GPT-4o using text, audio, image, or video inputs.") | |
| mode = st.selectbox("Select Mode", ["Text", "Image", "Audio", "Video"]) | |
| if mode == "Text": | |
| text_input = st.text_input("Enter your text prompt") | |
| if st.button("Send Text"): | |
| response = process_text(text_input) | |
| st.markdown(response) | |
| elif mode == "Image": | |
| text_prompt = st.text_input("Enter prompt for image analysis", "Describe this image and list 10 facts.") | |
| image_file = st.file_uploader("Upload an image", type=["png", "jpg", "jpeg"], key="chat_image") | |
| if image_file: | |
| image = Image.open(image_file) | |
| st.image(image, caption="Uploaded Image", use_column_width=True) | |
| response = process_image_with_prompt(image, text_prompt) | |
| st.markdown(response) | |
| elif mode == "Audio": | |
| st.markdown("Record or upload an audio file for transcription.") | |
| audio_bytes = audio_recorder() | |
| if audio_bytes: | |
| st.audio(audio_bytes, format="audio/wav") | |
| transcription = process_audio(audio_bytes) | |
| st.markdown(transcription) | |
| elif mode == "Video": | |
| video_file = st.file_uploader("Upload a video file", type=["mp4", "webm"], key="chat_video") | |
| if video_file: | |
| transcript = process_video_and_audio(video_file) | |
| st.markdown("Video Transcript:") | |
| st.write(transcript) | |
| st.markdown("---") | |
| st.subheader("Chat History") | |
| for msg in st.session_state.messages: | |
| with st.chat_message(msg["role"]): | |
| st.markdown(msg["content"]) | |
| # --- Code Executor Tab --- | |
| with tabs[4]: | |
| st.header("Python Code Executor") | |
| st.markdown("Enter Python code below or upload a .py/.md file. The code will be executed in a sandboxed environment.") | |
| uploaded_file = st.file_uploader("Upload Python (.py) or Markdown (.md) file", type=["py", "md"], key="code_file") | |
| if 'code' not in st.session_state: | |
| st.session_state.code = """import streamlit as st | |
| st.write("Hello from the Python Code Executor!")""" | |
| if uploaded_file is None: | |
| code_input = st.text_area("Python Code Editor:", value=st.session_state.code, height=400, key="code_editor") | |
| else: | |
| content = uploaded_file.getvalue().decode() | |
| if uploaded_file.type == "text/markdown": | |
| code_blocks = extract_python_code(content) | |
| if code_blocks: | |
| code_input = code_blocks[0] | |
| else: | |
| st.error("No Python code block found in the markdown file!") | |
| code_input = "" | |
| else: | |
| code_input = content | |
| st.code(code_input, language='python') | |
| col1, col2 = st.columns([1,1]) | |
| with col1: | |
| if st.button("▶️ Run Code"): | |
| if code_input: | |
| output, error = execute_code(code_input) | |
| if error: | |
| st.error(f"Error:\n{error}") | |
| elif output: | |
| st.code(output) | |
| else: | |
| st.success("Code executed with no output.") | |
| else: | |
| st.warning("Please enter some code!") | |
| with col2: | |
| if st.button("🗑️ Clear Code"): | |
| st.session_state.code = "" | |
| st.experimental_rerun() | |
| with st.expander("How to use the Code Executor"): | |
| st.markdown(""" | |
| - Enter or upload Python code. | |
| - Click **Run Code** to execute. | |
| - The output (or any errors) will be displayed below. | |
| """) | |
| # --- Integrated Workflow Tab --- | |
| with tabs[5]: | |
| integrated_workflow() | |
| # ------------------ Chat Input at Bottom ------------------ | |
| if prompt := st.chat_input("GPT-4o Multimodal ChatBot - How can I help you?"): | |
| st.session_state.messages.append({"role": "user", "content": prompt}) | |
| with st.chat_message("user"): | |
| st.markdown(prompt) | |
| with st.chat_message("assistant"): | |
| response = process_text2(prompt) | |
| st.session_state.messages.append({"role": "assistant", "content": response}) | |