Spaces:
Build error
Build error
| import gradio as gr | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import torch | |
| from huggingface_hub import login | |
| import os | |
| import logging | |
| from datetime import datetime | |
| import json | |
| from typing import List, Dict | |
| import warnings | |
| import spaces | |
| # Filter out warnings | |
| warnings.filterwarnings('ignore') | |
| # Configure logging with more detail | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # Environment variables with default model | |
| MODEL_NAME = "google/gemma-2-2b-it" | |
| HF_TOKEN = os.getenv("HUGGING_FACE_TOKEN") | |
| # Hugging Face Spaces persistent storage directory | |
| STORAGE_DIR = os.path.join(os.getcwd(), "storage") | |
| os.makedirs(STORAGE_DIR, exist_ok=True) | |
| # History file in persistent storage | |
| HISTORY_FILE = os.path.join(STORAGE_DIR, "review_history.json") | |
| class Review: | |
| def __init__(self, code: str, language: str, suggestions: str): | |
| self.code = code | |
| self.language = language | |
| self.suggestions = suggestions | |
| self.timestamp = datetime.now().isoformat() | |
| self.response_time = 0.0 | |
| def to_dict(self): | |
| return { | |
| 'timestamp': self.timestamp, | |
| 'language': self.language, | |
| 'code': code_snippet(self.code, max_length=200), | |
| 'suggestions': self.suggestions, | |
| 'response_time': self.response_time | |
| } | |
| def from_dict(cls, data): | |
| review = cls(data['code'], data['language'], data['suggestions']) | |
| review.timestamp = data['timestamp'] | |
| review.response_time = data.get('response_time', 0.0) | |
| return review | |
| def code_snippet(code: str, max_length: int = 200) -> str: | |
| """Create a truncated version of code for storage.""" | |
| if len(code) <= max_length: | |
| return code | |
| return code[:max_length] + "..." | |
| class CodeReviewer: | |
| def __init__(self): | |
| self.model = None | |
| self.tokenizer = None | |
| self.device = None | |
| self.review_history: List[Review] = [] | |
| self.metrics = { | |
| 'total_reviews': 0, | |
| 'avg_response_time': 0.0, | |
| 'reviews_today': 0 | |
| } | |
| self._initialized = False | |
| self.load_history() | |
| def load_history(self): | |
| """Load review history from file with error handling.""" | |
| try: | |
| if os.path.exists(HISTORY_FILE): | |
| with open(HISTORY_FILE, 'r') as f: | |
| data = json.load(f) | |
| self.review_history = [Review.from_dict(r) for r in data.get('history', [])] | |
| self.metrics = data.get('metrics', { | |
| 'total_reviews': 0, | |
| 'avg_response_time': 0.0, | |
| 'reviews_today': 0 | |
| }) | |
| logger.info(f"Loaded {len(self.review_history)} reviews from history") | |
| else: | |
| logger.info("No history file found, starting fresh") | |
| self.save_history() | |
| except Exception as e: | |
| logger.error(f"Error loading history: {e}") | |
| self.review_history = [] | |
| self.metrics = { | |
| 'total_reviews': 0, | |
| 'avg_response_time': 0.0, | |
| 'reviews_today': 0 | |
| } | |
| def save_history(self): | |
| """Save review history to file with error handling.""" | |
| try: | |
| os.makedirs(os.path.dirname(HISTORY_FILE), exist_ok=True) | |
| data = { | |
| 'history': [r.to_dict() for r in self.review_history[-100:]], | |
| 'metrics': self.metrics | |
| } | |
| temp_file = HISTORY_FILE + '.tmp' | |
| with open(temp_file, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| os.replace(temp_file, HISTORY_FILE) | |
| logger.info("Saved review history successfully") | |
| except Exception as e: | |
| logger.error(f"Error saving history: {e}") | |
| if os.path.exists(temp_file): | |
| try: | |
| os.remove(temp_file) | |
| except: | |
| pass | |
| def initialize_model(self): | |
| """Initialize the model and tokenizer.""" | |
| try: | |
| logger.info(f"Initializing model {MODEL_NAME}") | |
| # Initialize tokenizer | |
| logger.info("Loading tokenizer...") | |
| self.tokenizer = AutoTokenizer.from_pretrained( | |
| MODEL_NAME, | |
| trust_remote_code=True | |
| ) | |
| # Add special tokens if needed | |
| special_tokens = { | |
| 'pad_token': '[PAD]', | |
| 'eos_token': '</s>', | |
| 'bos_token': '<s>' | |
| } | |
| num_added = self.tokenizer.add_special_tokens(special_tokens) | |
| logger.info(f"Added {num_added} special tokens") | |
| # Initialize model | |
| logger.info("Loading model...") | |
| self.model = AutoModelForCausalLM.from_pretrained( | |
| MODEL_NAME, | |
| device_map="auto", | |
| torch_dtype=torch.float16, | |
| trust_remote_code=True, | |
| low_cpu_mem_usage=True | |
| ) | |
| # Resize embeddings if needed | |
| if num_added > 0: | |
| self.model.resize_token_embeddings(len(self.tokenizer)) | |
| self.device = next(self.model.parameters()).device | |
| logger.info(f"Model loaded successfully on {self.device}") | |
| self._initialized = True | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error initializing model: {str(e)}") | |
| self._initialized = False | |
| return False | |
| def create_review_prompt(self, code: str, language: str) -> str: | |
| """Create a structured prompt for code review.""" | |
| return f"""Review this {language} code. List specific points in these sections: | |
| Issues: | |
| Improvements: | |
| Best Practices: | |
| Security: | |
| Code: | |
| ```{language} | |
| {code} | |
| ```""" | |
| def review_code(self, code: str, language: str) -> str: | |
| """Perform code review using the model.""" | |
| try: | |
| if not self._initialized: | |
| logger.info("Model not initialized, attempting initialization...") | |
| if not self.initialize_model(): | |
| return "Error: Model initialization failed. Please check logs for details." | |
| start_time = datetime.now() | |
| prompt = self.create_review_prompt(code, language) | |
| try: | |
| inputs = self.tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| truncation=True, | |
| max_length=512, | |
| padding=True | |
| ).to(self.device) | |
| except Exception as token_error: | |
| logger.error(f"Tokenization error: {str(token_error)}") | |
| return f"Error during tokenization: {str(token_error)}" | |
| try: | |
| with torch.no_grad(): | |
| outputs = self.model.generate( | |
| **inputs, | |
| max_new_tokens=512, | |
| do_sample=True, | |
| temperature=0.7, | |
| top_p=0.95, | |
| num_beams=1, | |
| early_stopping=True, | |
| pad_token_id=self.tokenizer.pad_token_id, | |
| eos_token_id=self.tokenizer.eos_token_id | |
| ) | |
| except Exception as gen_error: | |
| logger.error(f"Generation error: {str(gen_error)}") | |
| return f"Error during generation: {str(gen_error)}" | |
| try: | |
| response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| suggestions = response[len(prompt):].strip() | |
| except Exception as decode_error: | |
| logger.error(f"Decoding error: {str(decode_error)}") | |
| return f"Error decoding response: {str(decode_error)}" | |
| # Create and save review | |
| end_time = datetime.now() | |
| review = Review(code, language, suggestions) | |
| review.response_time = (end_time - start_time).total_seconds() | |
| # Update metrics | |
| self.update_metrics(review) | |
| # Add review to history and save | |
| self.review_history.append(review) | |
| self.save_history() | |
| if self.device.type == "cuda": | |
| del inputs, outputs | |
| torch.cuda.empty_cache() | |
| return suggestions | |
| except Exception as e: | |
| logger.error(f"Error during code review: {str(e)}") | |
| return f"Error performing code review: {str(e)}" | |
| def update_metrics(self, review: Review): | |
| """Update metrics with new review.""" | |
| try: | |
| self.metrics['total_reviews'] += 1 | |
| total_time = self.metrics['avg_response_time'] * (self.metrics['total_reviews'] - 1) | |
| total_time += review.response_time | |
| self.metrics['avg_response_time'] = total_time / self.metrics['total_reviews'] | |
| today = datetime.now().date() | |
| self.metrics['reviews_today'] = sum( | |
| 1 for r in self.review_history | |
| if datetime.fromisoformat(r.timestamp).date() == today | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error updating metrics: {e}") | |
| def get_history(self) -> List[Dict]: | |
| """Get formatted review history.""" | |
| try: | |
| return [ | |
| { | |
| 'timestamp': r.timestamp, | |
| 'language': r.language, | |
| 'code': code_snippet(r.code), | |
| 'suggestions': r.suggestions, | |
| 'response_time': f"{r.response_time:.2f}s" | |
| } | |
| for r in reversed(self.review_history[-10:]) | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error getting history: {e}") | |
| return [] | |
| def get_metrics(self) -> Dict: | |
| """Get current metrics.""" | |
| try: | |
| return { | |
| 'Total Reviews': self.metrics['total_reviews'], | |
| 'Average Response Time': f"{self.metrics['avg_response_time']:.2f}s", | |
| 'Reviews Today': self.metrics['reviews_today'], | |
| 'Device': str(self.device) if self.device else "Not initialized" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting metrics: {e}") | |
| return { | |
| 'Total Reviews': 0, | |
| 'Average Response Time': '0.00s', | |
| 'Reviews Today': 0, | |
| 'Device': 'Error' | |
| } | |
| # Initialize reviewer | |
| reviewer = CodeReviewer() | |
| # Create Gradio interface | |
| with gr.Blocks(theme=gr.themes.Soft()) as iface: | |
| gr.Markdown("# Code Review Assistant v2") | |
| gr.Markdown("An automated code review system powered by Gemma-2-2b-it") | |
| with gr.Tabs(): | |
| with gr.Tab("Review Code"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| code_input = gr.Textbox( | |
| lines=10, | |
| placeholder="Enter your code here...", | |
| label="Code" | |
| ) | |
| language_input = gr.Dropdown( | |
| choices=["python", "javascript", "java", "cpp", "typescript", "go", "rust"], | |
| value="python", | |
| label="Language" | |
| ) | |
| submit_btn = gr.Button("Submit for Review", variant="primary") | |
| with gr.Column(): | |
| output = gr.Textbox( | |
| label="Review Results", | |
| lines=10 | |
| ) | |
| with gr.Tab("History"): | |
| with gr.Row(): | |
| refresh_history = gr.Button("Refresh History", variant="secondary") | |
| history_output = gr.Markdown( | |
| value="Click 'Refresh History' to view review history" | |
| ) | |
| with gr.Tab("Metrics"): | |
| with gr.Row(): | |
| refresh_metrics = gr.Button("Refresh Metrics", variant="secondary") | |
| metrics_output = gr.JSON( | |
| label="Performance Metrics" | |
| ) | |
| def review_code_interface(code: str, language: str) -> str: | |
| if not code.strip(): | |
| return "Please enter some code to review." | |
| try: | |
| result = reviewer.review_code(code, language) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Interface error: {str(e)}") | |
| return f"Error: {str(e)}" | |
| def get_history_interface() -> str: | |
| try: | |
| history = reviewer.get_history() | |
| if not history: | |
| return "No reviews yet." | |
| result = "" | |
| for review in history: | |
| result += f"### Review from {review['timestamp']}\n\n" | |
| result += f"**Language:** {review['language']}\n\n" | |
| result += f"**Response Time:** {review['response_time']}\n\n" | |
| result += "**Code:**\n```\n" + review['code'] + "\n```\n\n" | |
| result += "**Suggestions:**\n" + review['suggestions'] + "\n\n" | |
| result += "---\n\n" | |
| return result | |
| except Exception as e: | |
| logger.error(f"History error: {str(e)}") | |
| return f"Error retrieving history: {str(e)}" | |
| def get_metrics_interface() -> Dict: | |
| try: | |
| return reviewer.get_metrics() | |
| except Exception as e: | |
| logger.error(f"Metrics error: {str(e)}") | |
| return {"error": str(e)} | |
| def update_all_outputs(code: str, language: str) -> tuple: | |
| """Update all outputs after code review.""" | |
| result = review_code_interface(code, language) | |
| history = get_history_interface() | |
| metrics = get_metrics_interface() | |
| return result, history, metrics | |
| # Connect the interface | |
| submit_btn.click( | |
| update_all_outputs, | |
| inputs=[code_input, language_input], | |
| outputs=[output, history_output, metrics_output] | |
| ) | |
| refresh_history.click( | |
| get_history_interface, | |
| outputs=history_output | |
| ) | |
| refresh_metrics.click( | |
| get_metrics_interface, | |
| outputs=metrics_output | |
| ) | |
| # Add example inputs | |
| gr.Examples( | |
| examples=[ | |
| ["""def add_numbers(a, b): | |
| return a + b""", "python"], | |
| ["""function calculateSum(numbers) { | |
| let sum = 0; | |
| for(let i = 0; i < numbers.length; i++) { | |
| sum += numbers[i]; | |
| } | |
| return sum; | |
| }""", "javascript"] | |
| ], | |
| inputs=[code_input, language_input] | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| iface.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| show_error=True | |
| ) | |