Spaces:
Runtime error
Runtime error
| # Install necessary libraries | |
| #!pip install -q transformers accelerate gguf datasets gradio sympy matplotlib pandas | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from llama_cpp import Llama | |
| from huggingface_hub import hf_hub_download | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| # Define model paths | |
| MODEL_NAME = "meta-llama/Llama-3.2-1B-Instruct" | |
| QUANTIZED_PRM_PATH = hf_hub_download( | |
| repo_id="mradermacher/Llama3.1-8B-PRM-Mistral-Data-GGUF", | |
| filename="Llama3.1-8B-PRM-Mistral-Data.Q4_K_S.gguf" | |
| ) | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| def load_model(model_name, quantized=False, quantized_model_path=None): | |
| if quantized: | |
| n_gpu_layers = -1 if torch.cuda.is_available() else 0 | |
| model = Llama( | |
| model_path=quantized_model_path, | |
| n_ctx=2048, | |
| n_batch=512, | |
| n_gpu_layers=n_gpu_layers, | |
| verbose=False | |
| ) | |
| return model, None | |
| else: | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, padding_side='left') | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| model = AutoModelForCausalLM.from_pretrained(model_name, device_map="auto") | |
| return model, tokenizer | |
| # Load models | |
| llama_model, llama_tokenizer = load_model(MODEL_NAME) | |
| prm_model, _ = load_model(None, quantized=True, quantized_model_path=QUANTIZED_PRM_PATH) | |
| # Strategies | |
| def majority_voting(prompt, num_samples=5): | |
| outputs = [] | |
| for _ in range(num_samples): | |
| input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
| output = llama_model.generate(input_ids, max_new_tokens=50) | |
| outputs.append(llama_tokenizer.decode(output[0], skip_special_tokens=True)) | |
| return max(set(outputs), key=outputs.count) | |
| def best_of_n(prompt, num_samples=5): | |
| scored_outputs = [] | |
| for _ in range(num_samples): | |
| input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
| output = llama_model.generate(input_ids, max_new_tokens=50) | |
| response = llama_tokenizer.decode(output[0], skip_special_tokens=True) | |
| score = prm_model(**prm_tokenizer(response, return_tensors="pt").to(device)).logits.mean().item() | |
| scored_outputs.append((response, score)) | |
| return max(scored_outputs, key=lambda x: x[1])[0] | |
| def beam_search(prompt, num_beams=5): | |
| input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
| outputs = llama_model.generate(input_ids, max_new_tokens=50, num_beams=num_beams, num_return_sequences=num_beams) | |
| return [llama_tokenizer.decode(output, skip_special_tokens=True) for output in outputs] | |
| def dvts(prompt, depth=3, breadth=2): | |
| results = [] | |
| for _ in range(breadth): | |
| input_ids = llama_tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
| output = llama_model.generate(input_ids, max_new_tokens=50) | |
| response = llama_tokenizer.decode(output[0], skip_special_tokens=True) | |
| score = prm_model(**prm_tokenizer(response, return_tensors="pt").to(device)).logits.mean().item() | |
| results.append((response, score)) | |
| for _ in range(depth - 1): | |
| best_responses = sorted(results, key=lambda x: x[1], reverse=True)[:breadth] | |
| for response, _ in best_responses: | |
| input_ids = llama_tokenizer(response, return_tensors="pt").input_ids.to(device) | |
| output = llama_model.generate(input_ids, max_new_tokens=50) | |
| extended_response = llama_tokenizer.decode(output[0], skip_special_tokens=True) | |
| score = prm_model(**prm_tokenizer(extended_response, return_tensors="pt").to(device)).logits.mean().item() | |
| results.append((extended_response, score)) | |
| return max(results, key=lambda x: x[1])[0] | |
| def temperature_sampling(model, tokenizer, prompt, temperature=0.7, num_samples=5): | |
| outputs = [] | |
| for _ in range(num_samples): | |
| input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
| output = model.generate(input_ids, max_new_tokens=50, temperature=temperature) | |
| outputs.append(tokenizer.decode(output[0], skip_special_tokens=True)) | |
| return { | |
| "outputs": outputs, | |
| "final_result": outputs[0] | |
| } | |
| def top_p_sampling(model, tokenizer, prompt, top_p=0.9, num_samples=5): | |
| outputs = [] | |
| for _ in range(num_samples): | |
| input_ids = tokenizer(prompt, return_tensors="pt").input_ids.to(device) | |
| output = model.generate(input_ids, max_new_tokens=50, top_p=top_p) | |
| outputs.append(tokenizer.decode(output[0], skip_special_tokens=True)) | |
| return { | |
| "outputs": outputs, | |
| "final_result": outputs[0] | |
| } | |
| def custom_strategy(prompt, flow): | |
| intermediate_results = [] | |
| for step in flow: | |
| strategy = step.get("strategy") | |
| params = step.get("params", {}) | |
| if strategy == "majority_voting": | |
| result = majority_voting(prompt, **params) | |
| elif strategy == "best_of_n": | |
| result = best_of_n(prompt, **params) | |
| elif strategy == "beam_search": | |
| result = beam_search(prompt, **params) | |
| elif strategy == "top_p_sampling": | |
| result = top_p_sampling(prompt, **params) | |
| else: | |
| continue | |
| intermediate_results.append({"strategy": strategy, "result": result}) | |
| prompt = result["final_result"] | |
| return intermediate_results | |
| def compare_strategies(model, tokenizer, prm_model, prompt, num_samples=5): | |
| print("Running comparison...") | |
| strategies = { | |
| "Majority Voting": majority_voting(model, tokenizer, prompt, num_samples), | |
| "Best-of-N": best_of_n(model, tokenizer, prm_model, prompt, num_samples), | |
| "Beam Search": beam_search(model, tokenizer, prompt, 5) #num_beams | |
| #... | |
| } | |
| plt.figure(figsize=(10, 6)) | |
| plt.bar(strategies.keys(), [len(s["outputs"]) for s in strategies.values()]) | |
| plt.title("Strategy Comparison") | |
| plt.ylabel("Number of Outputs") | |
| plt.xticks(rotation=45) | |
| plt.tight_layout() | |
| plt.show() | |
| df = pd.DataFrame.from_dict({ | |
| strategy: { | |
| "Final Result": data["final_result"], | |
| "Outputs": data["outputs"] | |
| } for strategy, data in strategies.items() | |
| }, orient="index") | |
| return strategies, df | |
| def test_generation(): | |
| sample_prompt = "Explain the concept of neural networks in simple terms." | |
| print("Starting generation test...") | |
| strategies_results, results_df = compare_strategies(llama_model, llama_tokenizer, prm_model, sample_prompt, 1) | |
| print("\nResults DataFrame:") | |
| print(results_df) | |
| return strategies_results, results_df | |
| test_generation() | |
| ##### | |
| ###### | |
| ##### | |
| ##### | |
| ### | |
| import torch | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from llama_cpp import Llama | |
| from huggingface_hub import hf_hub_download | |
| import matplotlib.pyplot as plt | |
| import pandas as pd | |
| import gradio as gr | |
| import time | |
| import json | |
| import numpy as np | |
| from datetime import datetime | |
| def calculate_metrics(text): | |
| return { | |
| 'token_count': len(text.split()), | |
| 'char_count': len(text), | |
| 'sentence_count': len([s for s in text.split('.') if s.strip()]), | |
| } | |
| def create_performance_plot(times, strategies): | |
| plt.figure(figsize=(10, 5)) | |
| plt.bar(strategies, times) | |
| plt.title('Generation Time by Strategy') | |
| plt.ylabel('Time (seconds)') | |
| plt.xticks(rotation=45) | |
| plt.tight_layout() | |
| return plt | |
| def create_token_plot(tokens, strategies): | |
| plt.figure(figsize=(10, 5)) | |
| plt.bar(strategies, tokens) | |
| plt.title('Output Token Count by Strategy') | |
| plt.ylabel('Number of Tokens') | |
| plt.xticks(rotation=45) | |
| plt.tight_layout() | |
| return plt | |
| def format_metrics(metrics): | |
| print(type(metrics)) # Check if it's a list or dictionary | |
| print(metrics) # Inspect its contents | |
| return f""" | |
| ### Metrics | |
| - Token Count: {metrics[0]['token_count']} | |
| - Character Count: {metrics[0]['char_count']} | |
| - Sentence Count: {metrics[0]['sentence_count']} | |
| - Generation Time: {metrics[0]['generation_time']:.2f}s | |
| """ | |
| def run_single_strategy(prompt, strategy, num_samples): | |
| if not prompt: | |
| return "Please enter a prompt.", None, None, None | |
| start_time = time.time() | |
| strategies = { | |
| "Majority Voting": lambda: majority_voting(llama_model, llama_tokenizer, prompt, num_samples), | |
| "Best-of-N": lambda: best_of_n(llama_model, llama_tokenizer, prm_model, prompt, num_samples), | |
| "Beam Search": lambda: beam_search(llama_model, llama_tokenizer, prompt, num_beams=num_samples) | |
| } | |
| if strategy not in strategies: | |
| return "Invalid strategy selected.", None, None, None | |
| result = strategies[strategy]() | |
| generation_time = time.time() - start_time | |
| # Calculate metrics | |
| metrics = calculate_metrics(result['final_result']) | |
| metrics['generation_time'] = generation_time | |
| # Create visualizations | |
| performance_fig = create_performance_plot([generation_time], [strategy]) | |
| token_fig = create_token_plot([metrics['token_count']], [strategy]) | |
| formatted_output = f""" | |
| # Results for {strategy} | |
| ## Final Result | |
| {result['final_result']} | |
| {format_metrics(metrics)} | |
| ## All Outputs | |
| {format_metrics(result['outputs'])} | |
| ## Generation Details | |
| - Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| - Number of samples: {num_samples} | |
| - Model: {MODEL_NAME} | |
| - Device: {device} | |
| """ | |
| return formatted_output, performance_fig, token_fig, metrics | |
| def run_all_strategies(prompt, num_samples): | |
| if not prompt: | |
| return "Please enter a prompt.", None, None, None | |
| all_metrics = {} | |
| all_times = [] | |
| all_tokens = [] | |
| strategies = ["Majority Voting", "Best-of-N", "Beam Search"] | |
| output_text = "# Results from All Strategies\n\n" | |
| for strategy in strategies: | |
| start_time = time.time() | |
| result = run_single_strategy(prompt, strategy, num_samples)[0] | |
| generation_time = time.time() - start_time | |
| metrics = calculate_metrics(result['final_result']) | |
| metrics['generation_time'] = generation_time | |
| all_metrics[strategy] = metrics | |
| all_times.append(generation_time) | |
| all_tokens.append(metrics['token_count']) | |
| output_text += f""" | |
| ## {strategy} | |
| {result} | |
| --- | |
| """ | |
| # Create comparison visualizations | |
| performance_fig = create_performance_plot(all_times, strategies) | |
| token_fig = create_token_plot(all_tokens, strategies) | |
| # Add comparison summary | |
| output_text += """ | |
| # Strategy Comparison Summary | |
| """ | |
| for strategy, metrics in all_metrics.items(): | |
| output_text += f""" | |
| ## {strategy} | |
| {format_metrics(metrics)} | |
| """ | |
| return output_text, performance_fig, token_fig, all_metrics | |
| # Create the enhanced Gradio interface | |
| with gr.Blocks(title="Advanced Text Generation Strategies") as demo: | |
| gr.Markdown("# Advanced Text Generation Strategies Demo") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| prompt_input = gr.Textbox( | |
| label="Enter your prompt", | |
| placeholder="Type your prompt here...", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| num_samples = gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| value=5, | |
| step=1, | |
| label="Number of samples/beams" | |
| ) | |
| strategy_dropdown = gr.Dropdown( | |
| choices=["Majority Voting", "Best-of-N", "Beam Search"], | |
| label="Select Strategy", | |
| value="Majority Voting" | |
| ) | |
| with gr.Row(): | |
| single_strategy_btn = gr.Button("Run Selected Strategy") | |
| all_strategies_btn = gr.Button("Run All Strategies") | |
| with gr.Column(scale=3): | |
| output_display = gr.Markdown(label="Results") | |
| with gr.Row(): | |
| performance_plot = gr.Plot(label="Performance Comparison") | |
| token_plot = gr.Plot(label="Token Count Comparison") | |
| metrics_display = gr.JSON(label="Detailed Metrics") | |
| # Set up event handlers | |
| single_strategy_btn.click( | |
| fn=run_single_strategy, | |
| inputs=[prompt_input, strategy_dropdown, num_samples], | |
| outputs=[output_display, performance_plot, token_plot, metrics_display] | |
| ) | |
| all_strategies_btn.click( | |
| fn=run_all_strategies, | |
| inputs=[prompt_input, num_samples], | |
| outputs=[output_display, performance_plot, token_plot, metrics_display] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(debug=True) |