Spaces:
Runtime error
Runtime error
| import threading | |
| from datetime import datetime | |
| import gradio as gr | |
| import logging | |
| import json | |
| import re | |
| import torch | |
| import tempfile | |
| import os | |
| from pathlib import Path | |
| from typing import Dict, List, Tuple, Optional, Any, Union | |
| from dataclasses import dataclass, field | |
| from enum import Enum # We're bringing in the big guns with Enum! | |
| # Our magical models and transformers | |
| from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline | |
| from sentence_transformers import SentenceTransformer | |
| import faiss | |
| import numpy as np | |
| from PIL import Image | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(name)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S', | |
| handlers=[logging.StreamHandler(), logging.FileHandler('gradio_builder.log')] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Constants with a touch of mystery | |
| DEFAULT_PORT = 7860 | |
| MODEL_CACHE_DIR = Path("model_cache") | |
| TEMPLATE_DIR = Path("templates") | |
| TEMP_DIR = Path("temp") | |
| DATABASE_PATH = Path("code_database.json") | |
| # Ensure our directories exist, like a well-organized wizard | |
| for directory in [MODEL_CACHE_DIR, TEMPLATE_DIR, TEMP_DIR]: | |
| directory.mkdir(parents=True, exist_ok=True) | |
| class Template: | |
| code: str | |
| description: str | |
| components: List[str] = field(default_factory=list) | |
| def __post_init__(self): | |
| """ | |
| Init like a boss with some post-initialization magic. | |
| """ | |
| self.components = self._extract_components() | |
| class TemplateManager: | |
| def __init__(self, template_dir: Path): | |
| self.template_dir = template_dir | |
| self.templates: Dict[str, Template] = {} | |
| self._load_templates() | |
| def _load_templates(self): | |
| """ | |
| Load templates with grace and elegance. | |
| """ | |
| for file_path in self.template_dir.glob("*.json"): | |
| try: | |
| with open(file_path, 'r') as f: | |
| template_data = json.load(f) | |
| self.templates[template_data['description']] = Template(**template_data) | |
| except (json.JSONDecodeError, KeyError) as e: | |
| logger.error(f"Oh no! An error loading template from {file_path}: {e}") | |
| def save_template(self, name: str, template: Template) -> bool: | |
| """ | |
| Save a template with care and precision. | |
| """ | |
| file_path = self.template_dir / f"{name}.json" | |
| try: | |
| with open(file_path, 'w') as f: | |
| json.dump(dataclasses.asdict(template), f, indent=2) | |
| return True | |
| except Exception as e: | |
| logger.error(f"An unfortunate error saving template to {file_path}: {e}") | |
| return False | |
| def get_template(self, name: str) -> Optional[str]: | |
| """ | |
| Retrieve a template with finesse. | |
| """ | |
| return self.templates.get(name, {}).get('code', "") | |
| class RAGSystem: | |
| def __init__(self, model_name: str = "gpt3-incredible", device: str = "cuda" if torch.cuda.is_available() else "cpu", embedding_model="all-knowing-embedder"): | |
| self.device = device | |
| self.embedding_model = None | |
| self.code_embeddings = None | |
| self.index = None | |
| self.database = {'codes': [], 'embeddings': []} | |
| self.pipe = None | |
| try: | |
| self.tokenizer = AutoTokenizer.from_pretrained(model_name, cache_dir=MODEL_CACHE_DIR) | |
| self.model = AutoModelForCausalLM.from_pretrained(model_name, cache_dir=MODEL_CACHE_DIR).to(device) | |
| self.pipe = pipeline("text-generation", model=self.model, tokenizer=self.tokenizer, device=self.device) | |
| self.embedding_model = SentenceTransformer(embedding_model) | |
| self._load_database() | |
| logger.info("RAG system initialized with incredible power!") | |
| except Exception as e: | |
| logger.error(f"A dark force prevented loading the language model or embedding model: {e}. The placeholder generation shall be used.") | |
| def _load_database(self): | |
| """ | |
| Load the code database with ancient knowledge. | |
| """ | |
| if DATABASE_PATH.exists(): | |
| try: | |
| with open(DATABASE_PATH, 'r', encoding='utf-8') as f: | |
| self.database = json.load(f) | |
| self.code_embeddings = np.array(self.database['embeddings']) | |
| logger.info("Ancient code database has been loaded.") | |
| self._build_index() | |
| except (json.JSONDecodeError, KeyError) as e: | |
| logger.error(f"A curse has been cast upon the code database: {e}. A new database shall be created.") | |
| self.database = {'codes': [], 'embeddings': []} | |
| self.code_embeddings = np.array([]) | |
| self._build_index() | |
| else: | |
| logger.info("No code database has been found. A new one shall be created.") | |
| self.database = {'codes': [], 'embeddings': []} | |
| self.code_embeddings = np.array([]) | |
| self._build_index() | |
| if self.embedding_model and len(self.database['codes']) != len(self.database['embeddings']): | |
| logger.warning("A mysterious mismatch between codes and embeddings has occurred. The embeddings shall be rebuilt.") | |
| self.rebuild_embeddings() | |
| elif self.embedding_model is None: | |
| logger.warning("Embeddings are not supported in this realm. Proceed with caution.") | |
| def _build_index(self): | |
| """ | |
| Construct an index with magical efficiency. | |
| """ | |
| if len(self.code_embeddings) > 0 and self.embedding_model: | |
| self.index = faiss.IndexFlatL2(self.code_embeddings.shape[1]) # L2 distance, the measure of true similarity | |
| self.index.add(self.code_embeddings) | |
| def add_to_database(self, code: str): | |
| """ | |
| Add a code snippet to the database with care. | |
| """ | |
| try: | |
| if self.embedding_model is None: | |
| raise ValueError("The embedding model has not been summoned.") | |
| embedding = self.embedding_model.encode(code) | |
| self.database['codes'].append(code) | |
| self.database['embeddings'].append(embedding.tolist()) | |
| self.code_embeddings = np.vstack((self.code_embeddings, embedding)) if len(self.code_embeddings) > 0 else np.array([embedding]) | |
| self.index.add(np.array([embedding])) | |
| self._save_database() | |
| logger.info(f"A new code snippet has been added to the ancient database. Total size: {len(self.database['codes'])}.") | |
| except Exception as e: | |
| logger.error(f"A dark force prevented adding to the database: {e}") | |
| def _save_database(self): | |
| """ | |
| Save the database with eternal preservation. | |
| """ | |
| try: | |
| with open(DATABASE_PATH, 'w', encoding='utf-8') as f: | |
| json.dump(self.database, f, indent=2) | |
| logger.info(f"The ancient database has been saved to {DATABASE_PATH}.") | |
| except Exception as e: | |
| logger.error(f"A curse has been cast upon saving the database: {e}") | |
| def rebuild_embeddings(self): | |
| """ | |
| Rebuild embeddings with renewed power. | |
| """ | |
| try: | |
| if self.embedding_model is None: | |
| raise ValueError("The embedding model has not been summoned.") | |
| embeddings = self.embedding_model.encode(self.database['codes']) | |
| self.code_embeddings = embeddings | |
| self.database['embeddings'] = embeddings.tolist() | |
| self._build_index() | |
| self._save_database() | |
| logger.info("The embeddings have been rebuilt and saved with enhanced power.") | |
| except Exception as e: | |
| logger.error(f"A dark force prevented rebuilding the embeddings: {e}") | |
| def retrieve_similar_code(self, description: str, top_k: int = 3) -> List[str]: | |
| """ | |
| Retrieve similar code with uncanny accuracy. | |
| """ | |
| if self.embedding_model is None or self.index is None: | |
| logger.warning("The embedding model or index is missing. Similar code retrieval is beyond our reach.") | |
| return [] | |
| try: | |
| embedding = self.embedding_model.encode(description) | |
| D, I = self.index.search(np.array([embedding]), top_k) | |
| logger.info(f"{top_k} similar code snippets have been retrieved for the description: {description}. Prepare to be amazed!") | |
| return [self.database['codes'][i] for i in I[0]] | |
| except Exception as e: | |
| logger.error(f"A dark force prevented retrieving similar code: {e}. The retrieval shall be attempted again.") | |
| return [] | |
| def generate_code(self, description: str, template_code: str) -> str: | |
| """ | |
| Generate code with incredible creativity. | |
| """ | |
| retrieved_codes = self.retrieve_similar_code(description) | |
| prompt = f"Description: {description} Retrieved Code Snippets: {''.join([f'```python {code} ```' for code in retrieved_codes])} Template: ```python {template_code} ``` Generated Code: ```python " | |
| if self.pipe: | |
| try: | |
| generated_text = self.pipe(prompt, max_length=500, num_return_sequences=1)[0]['generated_text'] | |
| generated_code = generated_text.split("Generated Code:")[1].strip().split('```')[0] | |
| logger.info("Incredible code has been generated!") | |
| return generated_code | |
| except Exception as e: | |
| logger.error(f"A dark force prevented code generation with the language model: {e}. The template code shall be returned.") | |
| return template_code | |
| else: | |
| logger.warning("The text generation pipeline is beyond our reach. A placeholder code shall be returned.") | |
| return f"# Placeholder code generation. Description: {description} {template_code}" | |
| class GradioInterface: | |
| def __init__(self): | |
| self.template_manager = TemplateManager(TEMPLATE_DIR) | |
| self.rag_system = RAGSystem() | |
| self.interface = self._build_interface() | |
| def _extract_components(self, code: str) -> List[str]: | |
| """ | |
| Extract components with precision and clarity. | |
| """ | |
| components = [] | |
| function_matches = re.findall(r'def (\w+)\(', code) # Parenthesis, the key to accuracy | |
| components.extend(function_matches) | |
| class_matches = re.findall(r'class (\w+)\:', code) # Colon, the revealer of classes | |
| components.extend(class_matches) | |
| logger.info(f"Components have been extracted: {components}") | |
| return components | |
| def _get_template_choices(self) -> List[str]: | |
| """ | |
| Present template choices with elegance. | |
| """ | |
| return list(self.template_manager.templates.keys()) | |
| def _build_interface(self) -> gr.Blocks: | |
| """ | |
| Construct the Gradio interface with style and functionality. | |
| """ | |
| with gr.Blocks() as interface: | |
| gr.Markdown("## Code Generation Interface") | |
| description_input = gr.Textbox(label="Description", placeholder="Enter a description for the code you wish to bring to life.") | |
| code_output = gr.Textbox(label="Generated Code", interactive=False) | |
| generate_button = gr.Button("Generate Code") | |
| template_choice = gr.Dropdown(label="Select Template", choices=self._get_template_choices(), value=None) | |
| save_button = gr.Button("Save as Template") | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| def generate_code_wrapper(description, template_choice): | |
| """ | |
| Generate code with a simple button click. | |
| """ | |
| try: | |
| template_code = self.template_manager.get_template(template_choice) if template_choice else "" | |
| generated_code, status = self.rag_system.generate_code(description, template_code) | |
| return generated_code, status | |
| except Exception as e: | |
| return "", f"A dark force prevented code generation: {e}" | |
| def save_template_wrapper(code, name, description): | |
| """ | |
| Save a template with ease and security. | |
| """ | |
| try: | |
| if not name: | |
| return code, "A template name must be provided to seal its destiny." | |
| if not code: | |
| return code, "Code cannot be empty. It must be filled with potential." | |
| components = self._extract_components(code) | |
| template = Template(code=code, description=name, components=components) | |
| if self.template_manager.save_template(name, template): | |
| self.rag_system.add_to_database(code) | |
| return code, f"Template '{name}' has been saved for eternity." | |
| else: | |
| return code, "A mysterious force prevented saving the template." | |
| except Exception as e: | |
| return code, f"An error occurred while saving the template: {e}" | |
| generate_button.click( | |
| fn=generate_code_wrapper, | |
| inputs=[description_input, template_choice], | |
| outputs=[code_output, status_output] | |
| ) | |
| save_button.click( | |
| fn=save_template_wrapper, | |
| inputs=[code_output, template_choice, description_input], | |
| outputs=[code_output, status_output] | |
| ) | |
| logger.info("The Gradio interface is ready to be unveiled.") | |
| return interface | |
| def launch(self, **kwargs): | |
| """ | |
| Launch the Gradio interface with a flourish. | |
| """ | |
| logger.info("=== Application Startup ===") | |
| try: | |
| self.interface.launch( | |
| server_port=DEFAULT_PORT, | |
| share=False, | |
| debug=True, | |
| **kwargs | |
| ) | |
| except Exception as e: | |
| logger.error(f"An unexpected error has occurred: {e}. The application shall be shut down.") | |
| raise | |
| finally: | |
| logger.info("=== Application Shutdown ===") | |
| def main(): | |
| """ | |
| The main function, where the magic begins. | |
| """ | |
| logger.info("=== Application Initiation ===") | |
| try: | |
| interface = GradioInterface() | |
| interface.launch() | |
| except Exception as e: | |
| logger.error(f"A critical error has occurred: {e}. The application shall be terminated.") | |
| raise | |
| if __name__ == '__main__': | |
| main() |