Spaces:
Sleeping
Sleeping
| import os | |
| import torch | |
| import json | |
| from fastapi import FastAPI, Request | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.templating import Jinja2Templates | |
| from pydantic import BaseModel | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| from peft import PeftModel, PeftConfig | |
| import warnings | |
| warnings.filterwarnings("ignore") | |
| cache_dir = "/tmp/huggingface_cache" | |
| os.makedirs(cache_dir, exist_ok=True) | |
| os.environ['HF_HOME'] = cache_dir | |
| os.environ['TRANSFORMERS_CACHE'] = cache_dir | |
| os.environ['HF_DATASETS_CACHE'] = cache_dir | |
| app = FastAPI() | |
| templates = Jinja2Templates(directory="templates") | |
| def read_index(request: Request): | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| class TinyLlamaInput(BaseModel): | |
| prompt: str | |
| base_model = "TinyLlama/TinyLlama-1.1B-Chat-v1.0" | |
| qa_lookup = {} | |
| try: | |
| with open("QA.json", "r", encoding="utf-8") as f: | |
| qa_data = json.load(f) | |
| qa_lookup = {q["question"].lower().strip(): q["answer"] for q in qa_data} | |
| print("✅ QA data loaded") | |
| except: | |
| print("⚠️ No QA data found") | |
| def load_model_safe(lora_dir, model_name): | |
| print(f"\n🔄 Loading {model_name} from {lora_dir}...") | |
| if not os.path.exists(lora_dir): | |
| print(f"❌ Directory {lora_dir} not found for {model_name}") | |
| return load_base_model_fallback(model_name) | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(lora_dir) | |
| print(f"✅ Tokenizer loaded from {model_name}") | |
| except: | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(base_model) | |
| print(f"⚠️ Using base model tokenizer for {model_name}") | |
| except Exception as e: | |
| print(f"❌ Failed to load tokenizer: {e}") | |
| return None, None | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| try: | |
| peft_config = PeftConfig.from_pretrained(lora_dir) | |
| adapter_type = peft_config.peft_type.value if hasattr(peft_config.peft_type, 'value') else str(peft_config.peft_type) | |
| print(f"✅ PEFT config loaded for {model_name} (Type: {adapter_type})") | |
| base = AutoModelForCausalLM.from_pretrained( | |
| peft_config.base_model_name_or_path, | |
| device_map="auto", | |
| torch_dtype=torch.float16, | |
| low_cpu_mem_usage=True, | |
| trust_remote_code=True, | |
| cache_dir=cache_dir, | |
| load_in_8bit=False, | |
| load_in_4bit=False | |
| ) | |
| model = PeftModel.from_pretrained( | |
| base, | |
| lora_dir, | |
| torch_dtype=torch.float16, | |
| is_trainable=False | |
| ) | |
| print(f"✅ {model_name} PEFT adapter loaded successfully") | |
| try: | |
| if adapter_type.lower() in ['lora', 'adalora']: | |
| model = model.merge_and_unload() | |
| print(f"✅ Merged {adapter_type} adapter for {model_name}") | |
| else: | |
| print(f"ℹ️ Keeping {adapter_type} adapter separate for {model_name} (merge not supported)") | |
| except Exception as merge_error: | |
| print(f"⚠️ Could not merge adapter for {model_name}: {merge_error}. Using as-is.") | |
| except Exception as peft_error: | |
| print(f"⚠️ PEFT loading failed for {model_name}: {peft_error}") | |
| return load_base_model_fallback(model_name) | |
| model.eval() | |
| return tokenizer, model | |
| def load_base_model_fallback(model_name): | |
| """Load base model as fallback""" | |
| print(f"⚠️ Loading base model as fallback for {model_name}") | |
| try: | |
| model = AutoModelForCausalLM.from_pretrained( | |
| base_model, | |
| device_map="auto", | |
| torch_dtype=torch.float16, | |
| low_cpu_mem_usage=True, | |
| trust_remote_code=True, | |
| cache_dir=cache_dir | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained(base_model) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| model.eval() | |
| print(f"✅ Base model loaded as fallback for {model_name}") | |
| return tokenizer, model | |
| except Exception as e: | |
| print(f"❌ Complete failure loading fallback for {model_name}: {e}") | |
| return None, None | |
| try: | |
| tokenizer_1, model_1 = load_model_safe("./ia3-tinyllama", "IA3 Model") | |
| print("✅ Model 1 (IA3) loaded successfully") | |
| except Exception as e: | |
| print(f"❌ Error loading model 1: {e}") | |
| tokenizer_1, model_1 = None, None | |
| try: | |
| tokenizer_2, model_2 = load_model_safe("./lora-tinyllama", "LoRA Model") | |
| print("✅ Model 2 (LoRA) loaded successfully") | |
| except Exception as e: | |
| print(f"❌ Error loading model 2: {e}") | |
| tokenizer_2, model_2 = None, None | |
| try: | |
| tokenizer_3, model_3 = load_model_safe("./qlora-tinyllama", "QLoRA Model") | |
| print("✅ Model 3 (QLoRA) loaded successfully") | |
| except Exception as e: | |
| print(f"❌ Error loading model 3: {e}") | |
| tokenizer_3, model_3 = None, None | |
| def generate_response(prompt, tokenizer, model): | |
| if tokenizer is None or model is None: | |
| return "Model not available" | |
| prompt_lower = prompt.lower().strip() | |
| if prompt_lower in qa_lookup: | |
| return qa_lookup[prompt_lower] | |
| for known_q, answer in qa_lookup.items(): | |
| if known_q in prompt_lower or prompt_lower in known_q: | |
| return answer | |
| full_prompt = f"""<|system|> | |
| You are a helpful, respectful and honest assistant. Always answer as helpfully as possible. | |
| If a question does not make any sense, or is not factually coherent, explain why instead of answering something not correct. | |
| <|end|> | |
| <|user|> | |
| {prompt} | |
| <|end|> | |
| <|assistant|> | |
| """ | |
| try: | |
| inputs = tokenizer(full_prompt, return_tensors="pt", truncation=True, max_length=512) | |
| device = next(model.parameters()).device | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| output = model.generate( | |
| **inputs, | |
| max_new_tokens=256, | |
| temperature=0.7, | |
| top_p=0.9, | |
| top_k=50, | |
| do_sample=True, | |
| pad_token_id=tokenizer.eos_token_id, | |
| eos_token_id=tokenizer.eos_token_id, | |
| repetition_penalty=1.1, | |
| no_repeat_ngram_size=3, | |
| ) | |
| full_response = tokenizer.decode(output[0], skip_special_tokens=True) | |
| if "<|assistant|>" in full_response: | |
| response = full_response.split("<|assistant|>")[-1].strip() | |
| else: | |
| response = full_response.strip() | |
| return response if response else "No response generated" | |
| except Exception as e: | |
| return f"Generation error: {str(e)}" | |
| def predict_model1(input_data: TinyLlamaInput): | |
| if model_1 is None: | |
| return {"error": "IA3 Model is not available"} | |
| answer = generate_response(input_data.prompt, tokenizer_1, model_1) | |
| return {"model": "TinyLlama IA3 Model", "response": answer} | |
| def predict_model2(input_data: TinyLlamaInput): | |
| if model_2 is None: | |
| return {"error": "LoRA Model is not available"} | |
| answer = generate_response(input_data.prompt, tokenizer_2, model_2) | |
| return {"model": "TinyLlama LoRA Model", "response": answer} | |
| def predict_model3(input_data: TinyLlamaInput): | |
| if model_3 is None: | |
| return {"error": "QLoRA Model is not available"} | |
| answer = generate_response(input_data.prompt, tokenizer_3, model_3) | |
| return {"model": "TinyLlama QLoRA Model", "response": answer} | |
| def get_model_status(): | |
| """Get status of all loaded models""" | |
| return { | |
| "model1": { | |
| "name": "IA3 Model", | |
| "status": "available" if model_1 is not None else "unavailable" | |
| }, | |
| "model2": { | |
| "name": "LoRA Model", | |
| "status": "available" if model_2 is not None else "unavailable" | |
| }, | |
| "model3": { | |
| "name": "QLoRA Model", | |
| "status": "available" if model_3 is not None else "unavailable" | |
| } | |
| } | |
| def predict_all_models(input_data: TinyLlamaInput): | |
| """Get responses from all available models""" | |
| responses = {} | |
| if model_1 is not None: | |
| responses["model1"] = { | |
| "name": "IA3 Model", | |
| "response": generate_response(input_data.prompt, tokenizer_1, model_1) | |
| } | |
| if model_2 is not None: | |
| responses["model2"] = { | |
| "name": "LoRA Model", | |
| "response": generate_response(input_data.prompt, tokenizer_2, model_2) | |
| } | |
| if model_3 is not None: | |
| responses["model3"] = { | |
| "name": "QLoRA Model", | |
| "response": generate_response(input_data.prompt, tokenizer_3, model_3) | |
| } | |
| return {"prompt": input_data.prompt, "responses": responses} | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) | |