Spaces:
Sleeping
Sleeping
| import os | |
| import torch | |
| import json | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModel, AutoConfig | |
| from peft import LoraConfig, get_peft_model, PeftModel | |
| from araberta_setting.modeling_bilstm_crf import BERT_BiLSTM_CRF | |
| from seq2seq_inference import infer_t5_prompt | |
| from huggingface_hub import hf_hub_download | |
| from openai import OpenAI | |
| # 🔑 OpenAI client (make sure OPENAI_API_KEY is set in Hugging Face Space secrets) | |
| client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
| # Your fine-tuned OpenAI model IDs | |
| GPT35_FINETUNED = "ft:gpt-3.5-turbo-0125:asma:gpt-3-5-turbo-absa:Bb6gmwkE" | |
| GPT4O_FINETUNED = "ft:gpt-4o-mini-2024-07-18:asma:gpt4-finetune-absa:BazoEjnp" | |
| MODEL_OPTIONS = { | |
| "Araberta": { | |
| "base": "asmashayea/absa-araberta", | |
| "adapter": "asmashayea/absa-araberta" | |
| }, | |
| "mT5": { | |
| "base": "google/mt5-base", | |
| "adapter": "asmashayea/mt4-absa" | |
| }, | |
| "GPT3.5": { | |
| "base": "openai", | |
| "adapter": GPT35_FINETUNED | |
| }, | |
| # "GPT4o": { | |
| # "base": "openai", | |
| # "adapter": GPT4O_FINETUNED | |
| # } | |
| } | |
| cached_models = {} | |
| # --------------------------- | |
| # Araberta loader | |
| # --------------------------- | |
| def load_araberta(): | |
| path = "asmashayea/absa-arabert" | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| tokenizer = AutoTokenizer.from_pretrained(path) | |
| base_model = AutoModel.from_pretrained(path) | |
| lora_config = LoraConfig.from_pretrained(path) | |
| lora_model = get_peft_model(base_model, lora_config) | |
| local_pt = hf_hub_download(repo_id=path, filename="bilstm_crf_head.pt") | |
| config = AutoConfig.from_pretrained(path) | |
| model = BERT_BiLSTM_CRF(lora_model, config) | |
| state_dict = torch.load(local_pt, map_location=torch.device(device)) | |
| model.load_state_dict(state_dict) | |
| model.to(device).eval() | |
| cached_models["Araberta"] = (tokenizer, model) | |
| return tokenizer, model | |
| def infer_araberta(text): | |
| if "Araberta" not in cached_models: | |
| tokenizer, model = load_araberta() | |
| else: | |
| tokenizer, model = cached_models["Araberta"] | |
| device = next(model.parameters()).device | |
| inputs = tokenizer( | |
| text, | |
| return_tensors='pt', | |
| truncation=True, | |
| padding='max_length', | |
| max_length=128 | |
| ) | |
| input_ids = inputs['input_ids'].to(device) | |
| attention_mask = inputs['attention_mask'].to(device) | |
| with torch.no_grad(): | |
| outputs = model(input_ids=input_ids, attention_mask=attention_mask) | |
| predicted_ids = outputs['logits'][0].cpu().tolist() | |
| tokens = tokenizer.convert_ids_to_tokens(input_ids[0].cpu()) | |
| predicted_labels = [model.id2label.get(p, 'O') for p in predicted_ids] | |
| # Remove special tokens | |
| clean_tokens = [t for t in tokens if t not in tokenizer.all_special_tokens] | |
| clean_labels = [l for t, l in zip(tokens, predicted_labels) if t not in tokenizer.all_special_tokens] | |
| # Build structured token predictions (pretty JSON) | |
| token_predictions = [ | |
| {"token": t.replace("##", ""), "label": l} | |
| for t, l in zip(clean_tokens, clean_labels) | |
| ] | |
| # Group tokens by aspect spans | |
| aspects, current_tokens, current_sentiment = [], [], None | |
| def join_tokens(tok_list): | |
| """ | |
| Merge WordPiece tokens correctly: | |
| - Join subwords without spaces | |
| - Add spaces between separate words | |
| """ | |
| merged = "" | |
| for t in tok_list: | |
| if t.startswith("##"): | |
| merged += t[2:] | |
| else: | |
| if merged: | |
| merged += " " | |
| merged += t | |
| return merged | |
| for token, label in zip(clean_tokens, clean_labels): | |
| if label.startswith("B-"): | |
| if current_tokens: | |
| aspects.append({ | |
| "aspect": join_tokens(current_tokens), | |
| "sentiment": current_sentiment | |
| }) | |
| current_tokens = [token] | |
| current_sentiment = label.split("-")[1] | |
| elif label.startswith("I-") and current_sentiment == label.split("-")[1]: | |
| current_tokens.append(token) | |
| else: | |
| if current_tokens: | |
| aspects.append({ | |
| "aspect": join_tokens(current_tokens), | |
| "sentiment": current_sentiment | |
| }) | |
| current_tokens, current_sentiment = [], None | |
| if current_tokens: | |
| aspects.append({ | |
| "aspect": join_tokens(current_tokens), | |
| "sentiment": current_sentiment | |
| }) | |
| return { | |
| "aspects": aspects, | |
| # "token_predictions": token_predictions, | |
| } | |
| # --------------------------- | |
| # Hugging Face seq2seq loaders | |
| # --------------------------- | |
| def load_model(model_key): | |
| if model_key in cached_models: | |
| return cached_models[model_key] | |
| base_id = MODEL_OPTIONS[model_key]["base"] | |
| adapter_id = MODEL_OPTIONS[model_key]["adapter"] | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| tokenizer = AutoTokenizer.from_pretrained(adapter_id) | |
| base_model = AutoModelForSeq2SeqLM.from_pretrained(base_id).to(device) | |
| model = PeftModel.from_pretrained(base_model, adapter_id).to(device) | |
| model.eval() | |
| cached_models[model_key] = (tokenizer, model) | |
| return tokenizer, model | |
| # --------------------------- | |
| # OpenAI inference | |
| # --------------------------- | |
| def infer_openai(text, model_name): | |
| prompt = f"Extract aspects and their sentiment from this review:\n\n{text}\n\nReturn JSON with 'aspect' and 'sentiment'." | |
| response = client.chat.completions.create( | |
| model=model_name, | |
| messages=[{"role": "user", "content": prompt}], | |
| max_tokens=512, | |
| temperature=0 | |
| ) | |
| output = response.choices[0].message.content.strip() | |
| try: | |
| return json.loads(output) | |
| except: | |
| return {"raw_output": output} | |
| # --------------------------- | |
| # Unified predictor | |
| # --------------------------- | |
| def predict_absa(text, model_choice): | |
| if model_choice in ['mT5', 'mBART']: | |
| tokenizer, model = load_model(model_choice) | |
| return infer_t5_prompt(text, tokenizer, model) | |
| elif model_choice == 'Araberta': | |
| return infer_araberta(text) | |
| elif model_choice in ['GPT3.5', 'GPT4o']: | |
| return infer_openai(text, MODEL_OPTIONS[model_choice]["adapter"]) | |
| else: | |
| return {"error": f"Model {model_choice} not supported"} | |