Spaces:
Sleeping
Sleeping
| # app.py | |
| import sys | |
| import os | |
| import traceback | |
| import gradio as gr | |
| import logging | |
| import asyncio | |
| import torch | |
| from concurrent.futures import ThreadPoolExecutor | |
| from datetime import datetime | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from ai_core import AICore | |
| from aegis_integration import AegisBridge | |
| from aegis_integration.config import AEGIS_CONFIG | |
| from components.search_engine import SearchEngine | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Initialize language model | |
| logger.info("Initializing language model...") | |
| model_name = "gpt2-large" # Using larger model for better responses | |
| try: | |
| # Initialize components with proper error handling | |
| try: | |
| # Initialize tokenizer with padding | |
| tokenizer = AutoTokenizer.from_pretrained(model_name) | |
| tokenizer.pad_token = tokenizer.eos_token | |
| logger.info("Tokenizer initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Error initializing tokenizer: {e}") | |
| raise | |
| try: | |
| # Load model with optimal settings | |
| model = AutoModelForCausalLM.from_pretrained( | |
| model_name, | |
| pad_token_id=tokenizer.eos_token_id, | |
| repetition_penalty=1.2 | |
| ) | |
| logger.info("Model loaded successfully") | |
| except Exception as e: | |
| logger.error(f"Error loading model: {e}") | |
| raise | |
| # Use GPU if available | |
| try: | |
| if torch.cuda.is_available(): | |
| model = model.cuda() | |
| logger.info("Using GPU for inference") | |
| else: | |
| logger.info("Using CPU for inference") | |
| # Set to evaluation mode | |
| model.eval() | |
| except Exception as e: | |
| logger.error(f"Error configuring model device: {e}") | |
| raise | |
| try: | |
| # Initialize AI Core with full component setup | |
| ai_core = AICore() | |
| ai_core.model = model | |
| ai_core.tokenizer = tokenizer | |
| ai_core.model_id = model_name | |
| # Initialize cognitive processor with default modes | |
| from cognitive_processor import CognitiveProcessor | |
| cognitive_modes = ["scientific", "creative", "quantum", "philosophical"] | |
| ai_core.cognitive_processor = CognitiveProcessor( | |
| modes=cognitive_modes, | |
| quantum_state={"coherence": 0.5} | |
| ) | |
| logger.info( | |
| f"AI Core initialized successfully with modes: {cognitive_modes}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error initializing AI Core: {e}") | |
| raise | |
| # Initialize AEGIS | |
| aegis_bridge = AegisBridge(ai_core, AEGIS_CONFIG) | |
| ai_core.set_aegis_bridge(aegis_bridge) | |
| # Initialize cocoon manager | |
| try: | |
| from utils.cocoon_manager import CocoonManager | |
| cocoon_manager = CocoonManager("./cocoons") | |
| cocoon_manager.load_cocoons() | |
| # Set up AI core with cocoon data | |
| ai_core.cocoon_manager = cocoon_manager | |
| ai_core.quantum_state = cocoon_manager.get_latest_quantum_state() | |
| logger.info( | |
| f"Loaded {len(cocoon_manager.cocoon_data)} existing cocoons " | |
| f"with quantum coherence {ai_core.quantum_state.get('coherence', 0.5)}" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error initializing cocoon manager: {e}") | |
| # Initialize with defaults if cocoon loading fails | |
| ai_core.quantum_state = {"coherence": 0.5} | |
| logger.info("Core systems initialized successfully") | |
| except Exception as e: | |
| logger.error(f"Error initializing model: {e}") | |
| sys.exit(1) | |
| async def process_message(message: str, history: list) -> tuple: | |
| """Process chat messages with improved context management""" | |
| try: | |
| # Clean input | |
| message = message.strip() | |
| if not message: | |
| return "", history | |
| try: | |
| # Get response from AI core asynchronously | |
| if hasattr(ai_core, 'generate_text_async'): | |
| response = await ai_core.generate_text_async(message) | |
| else: | |
| # Fallback to sync version in ThreadPoolExecutor | |
| loop = asyncio.get_event_loop() | |
| with ThreadPoolExecutor() as pool: | |
| response = await loop.run_in_executor( | |
| pool, | |
| ai_core.generate_text, | |
| message | |
| ) | |
| # Clean and validate response | |
| if response is None: | |
| raise ValueError("Generated response is None") | |
| if len(response) > 1000: # Increased safety check limit | |
| response = response[:997] + "..." | |
| # Update history | |
| history.append((message, response)) | |
| return "", history | |
| except Exception as e: | |
| logger.error(f"Error generating response: {e}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in chat: {str(e)}\n{traceback.format_exc()}") | |
| error_msg = ( | |
| "I apologize, but I encountered an error processing your request. " | |
| "Please try again with a different query." | |
| ) | |
| history.append((message, error_msg)) | |
| return "", history | |
| def clear_history(): | |
| """Clear the chat history and AI core memory""" | |
| ai_core.response_memory = [] # Clear AI memory | |
| ai_core.last_clean_time = datetime.now() | |
| return [], [] | |
| # Initialize search engine | |
| search_engine = SearchEngine() | |
| async def search_knowledge(query: str) -> str: | |
| """Perform a search and return formatted results""" | |
| try: | |
| return await search_engine.get_knowledge(query) | |
| except Exception as e: | |
| logger.error(f"Search error: {e}") | |
| return f"I encountered an error while searching: {str(e)}" | |
| def sync_search(query: str) -> str: | |
| """Synchronous wrapper for search function""" | |
| return asyncio.run(search_knowledge(query)) | |
| # Create the Gradio interface with improved chat components and search | |
| with gr.Blocks(title="Codette", theme=gr.themes.Soft()) as iface: | |
| gr.Markdown("""# 🤖 Codette | |
| Your AI programming assistant with chat and search capabilities.""") | |
| with gr.Tabs(): | |
| with gr.Tab("Chat"): | |
| chatbot = gr.Chatbot( | |
| [], | |
| elem_id="chatbot", | |
| bubble_full_width=False, | |
| avatar_images=("👤", "🤖"), | |
| height=500, | |
| show_label=False, | |
| container=True | |
| ) | |
| with gr.Row(): | |
| txt = gr.Textbox( | |
| show_label=False, | |
| placeholder="Type your message here...", | |
| container=False, | |
| scale=8, | |
| autofocus=True | |
| ) | |
| submit_btn = gr.Button("Send", scale=1, variant="primary") | |
| with gr.Row(): | |
| clear_btn = gr.Button("Clear Chat") | |
| # Set up chat event handlers with proper async queuing | |
| txt.submit( | |
| process_message, | |
| [txt, chatbot], | |
| [txt, chatbot], | |
| api_name="chat_submit", | |
| queue=True # Enable queuing for async | |
| ).then( | |
| lambda: None, # Cleanup callback | |
| None, | |
| None, | |
| api_name=None | |
| ) | |
| submit_btn.click( | |
| process_message, | |
| [txt, chatbot], | |
| [txt, chatbot], | |
| api_name="chat_button", | |
| queue=True # Enable queuing for async | |
| ).then( | |
| lambda: None, # Cleanup callback | |
| None, | |
| None, | |
| api_name=None | |
| ) | |
| clear_btn.click( | |
| clear_history, | |
| None, | |
| [chatbot, txt], | |
| queue=False, | |
| api_name="clear_chat" | |
| ) | |
| with gr.Tab("Search"): | |
| gr.Markdown("""### 🔍 Knowledge Search | |
| Search through Codette's knowledge base for information about AI, programming, and technology.""") | |
| with gr.Row(): | |
| search_input = gr.Textbox( | |
| show_label=False, | |
| placeholder="Enter your search query...", | |
| container=False, | |
| scale=8 | |
| ) | |
| search_btn = gr.Button("Search", scale=1, variant="primary") | |
| search_output = gr.Markdown() | |
| # Set up search event handlers | |
| search_btn.click(sync_search, search_input, search_output) | |
| search_input.submit(sync_search, search_input, search_output) | |
| # Run the Gradio interface with proper async handling | |
| async def shutdown(): | |
| """Cleanup function for graceful shutdown""" | |
| try: | |
| # Save final quantum state if available | |
| if hasattr(ai_core, 'cocoon_manager') and ai_core.cocoon_manager: | |
| try: | |
| ai_core.cocoon_manager.save_cocoon({ | |
| "type": "shutdown", | |
| "quantum_state": ai_core.quantum_state | |
| }) | |
| logger.info("Final quantum state saved") | |
| except Exception as e: | |
| logger.error(f"Error saving final quantum state: {e}") | |
| # Shutdown AI core | |
| try: | |
| await ai_core.shutdown() | |
| logger.info("AI Core shutdown complete") | |
| except Exception as e: | |
| logger.error(f"Error shutting down AI Core: {e}") | |
| # Clear CUDA cache if GPU was used | |
| if torch.cuda.is_available(): | |
| try: | |
| torch.cuda.empty_cache() | |
| logger.info("CUDA cache cleared") | |
| except Exception as e: | |
| logger.error(f"Error clearing CUDA cache: {e}") | |
| except Exception as e: | |
| logger.error(f"Error during shutdown: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| try: | |
| # Set up exception handling | |
| def handle_exception(loop, context): | |
| msg = context.get("exception", context["message"]) | |
| logger.error(f"Caught exception: {msg}") | |
| # Set up asyncio event loop with proper error handling | |
| loop = asyncio.new_event_loop() | |
| loop.set_exception_handler(handle_exception) | |
| asyncio.set_event_loop(loop) | |
| # Launch Gradio interface | |
| iface.queue().launch( | |
| prevent_thread_lock=True, | |
| share=False, | |
| server_name="127.0.0.1", | |
| show_error=True | |
| ) | |
| try: | |
| # Keep the main loop running | |
| loop.run_forever() | |
| except Exception as e: | |
| logger.error(f"Error in main loop: {e}") | |
| traceback.print_exc() | |
| except KeyboardInterrupt: | |
| logger.info("Shutting down gracefully...") | |
| try: | |
| loop.run_until_complete(shutdown()) | |
| except Exception as e: | |
| logger.error(f"Error during shutdown: {e}") | |
| finally: | |
| try: | |
| tasks = asyncio.all_tasks(loop) | |
| for task in tasks: | |
| task.cancel() | |
| loop.run_until_complete(asyncio.gather(*tasks, return_exceptions=True)) | |
| loop.close() | |
| except Exception as e: | |
| logger.error(f"Error closing loop: {e}") | |
| sys.exit(1) | |
| sys.exit(0) |