Spaces:
Sleeping
Sleeping
| # -*- coding: utf-8 -*- | |
| """ | |
| Automatically generated by Colab. | |
| Original file is located at | |
| https://colab.research.google.com/drive/1p8LZ5eICRuSfjSRLGIDv4TDW32GSm4Wf | |
| """ | |
| #!pip install torch gradio transformers pandas langchain-fireworks fireworks stanza sentence_transformers anytree | |
| import torch | |
| from transformers import GPT2LMHeadModel, GPT2Tokenizer | |
| import gradio as gr | |
| import pandas as pd | |
| from collections import Counter, defaultdict | |
| import os | |
| from huggingface_hub import login | |
| import requests | |
| from bs4 import BeautifulSoup | |
| import numpy as np | |
| import re | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from googlesearch import search | |
| import time | |
| import random | |
| from lxml import html | |
| import nltk | |
| nltk.download('punkt') | |
| from sentence_transformers import SentenceTransformer, util | |
| model_ranker = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
| Question = [ | |
| "RG Kar recent rape and murder case" | |
| # "Who won the physics nobel prize in 2023?", | |
| # "Who has been awarded the Nobel Prize in Physics in 2023 | |
| ] | |
| headers = { | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| } | |
| exclude=["Thank you for your patience","Subscribe","subscribe","trouble retrieving the article content","browser settings", | |
| "Thank you for your patience while we verify access. If you are in Reader mode please exit and log into your Times account, or subscribe for all of The Times.", | |
| "Thank you for your patience while we verify access.", | |
| "Already a subscriber? Log in.", | |
| "Want all of The Times? Subscribe.", | |
| "Advertisement", | |
| "Site Index", | |
| "Thank you for your patience while we verify access. If you are in Reader mode please exit andlog intoyour Times account, orsubscribefor all of The Times.", | |
| "Already a subscriber?Log in.", | |
| "Want all of The Times?Subscribe.", | |
| "Site Information Navigation", | |
| "Please enable JS and disable any ad blocker" | |
| ] | |
| def fetch_article_text_sequential(url): | |
| headers = { | |
| "Content-Type": "application/json", | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36", | |
| } | |
| exclude=[ | |
| "Thank you for your patience","Subscribe","subscribe","trouble retrieving the article content","browser settings", | |
| "Thank you for your patience while we verify access. If you are in Reader mode please exit and log into your Times account, or subscribe for all of The Times.", | |
| "Thank you for your patience while we verify access.", | |
| "Already a subscriber? Log in.", | |
| "Want all of The Times? Subscribe.", | |
| "Advertisement", | |
| "Site Index", | |
| "Thank you for your patience while we verify access. If you are in Reader mode please exit andlog intoyour Times account, orsubscribefor all of The Times.", | |
| "Already a subscriber?Log in.", | |
| "Want all of The Times?Subscribe.", | |
| "Site Information Navigation" | |
| ] | |
| try: | |
| # Send a request to the webpage with the specified headers | |
| response = requests.get(url, headers=headers,timeout=5) | |
| response.raise_for_status() # Check that the request was successful | |
| # Parse the webpage content | |
| soup = BeautifulSoup(response.text, 'html.parser') | |
| # Initialize an empty list to store the text sequentially | |
| article_content = [] | |
| # Define the tags we are interested in (headlines and paragraphs) | |
| tags_of_interest = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p'] | |
| # Find all tags of interest in the order they appear in the document | |
| for tag in soup.find_all(tags_of_interest): | |
| if not any(excluded_phrase in tag.get_text() for excluded_phrase in exclude): | |
| text = tag.get_text(strip=True) | |
| article_content.append(text) | |
| return '\n'.join(article_content) | |
| except: | |
| return None | |
| def fetch_article_text_sequential_new(url): | |
| user_agents = [ | |
| 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.0.3 Safari/605.1.15', | |
| # Add more User-Agents here | |
| ] | |
| headers = { | |
| 'User-Agent': random.choice(user_agents) | |
| } | |
| try: | |
| response =requests.get(url,timeout=5,verify=False,headers=headers) | |
| response.raise_for_status() # Check for HTTP errors | |
| response.encoding = 'utf-8' | |
| content = response.text | |
| if not content.strip(): | |
| return "" | |
| try: | |
| tree = html.fromstring(content) | |
| except: | |
| return "" | |
| # Extract all paragraph | |
| scraped_data = [] | |
| tags = ['h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'p'] | |
| for tag in tags: | |
| for element in tree.xpath(f'//{tag}'): | |
| scraped_data.append(element.text_content()) | |
| return '\n'.join(scraped_data) | |
| except: | |
| return "" | |
| def get_google_search_results(query, start=0): | |
| search_url = "https://www.google.com/search" | |
| params = {"q": query, "start": start} | |
| response = requests.get(search_url,timeout=5,verify=False, params=params, headers=headers) | |
| soup = BeautifulSoup(response.text, "html.parser") | |
| search_results = [] | |
| for g in soup.find_all(class_="g"): | |
| title = g.find("h3").text if g.find("h3") else "No title" | |
| link = g.find("a")["href"] if g.find("a") else "No link" | |
| if not link.lower().endswith(('.pdf', '.PDF')): | |
| search_results.append({"title": title, "link": link}) | |
| return search_results | |
| def fetch_sentences_from_html(html): | |
| try: | |
| # Parse the string with BeautifulSoup | |
| if html == None: | |
| return [] | |
| soup = BeautifulSoup(html, 'html.parser') | |
| paragraphs = soup.find_all("p") | |
| text = " ".join(p.get_text() for p in paragraphs) | |
| sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?)\s', text) | |
| #print(sentences) | |
| return sentences | |
| except Exception as e: | |
| #print(f"Failed to fetch {html}: {str(e)}") | |
| return [] | |
| # Function to rank sentences using cosine similarity | |
| def rank_sentences(sentences): | |
| if not sentences: | |
| return [] # Return an empty list if no sentences are found | |
| embeddings = model_ranker.encode(sentences, convert_to_tensor=True) | |
| # Compute pairwise cosine similarity between sentences | |
| similarities = util.pytorch_cos_sim(embeddings, embeddings).cpu().numpy() | |
| # Calculate the average similarity for each sentence | |
| avg_similarities = np.mean(similarities, axis=1) | |
| # Rank sentences based on their average similarity | |
| ranked_sentences = sorted(zip(sentences, avg_similarities), key=lambda x: x[1], reverse=True) | |
| ranked_sentences = [sentence for sentence, _ in ranked_sentences] | |
| return ranked_sentences[:min(len(ranked_sentences),2000)] | |
| def rank_sentences_new(sentences, query, top_n=20): | |
| if sentences == None: | |
| return [] | |
| sentences = re.split("\n", sentences.strip()) | |
| # Remove any empty strings from the list | |
| [sentence.strip() for sentence in sentences if sentence.strip()] | |
| vectorizer = TfidfVectorizer().fit_transform([query] + sentences) | |
| vectors = vectorizer.toarray() | |
| query_vector = vectors[0] | |
| sentences_vectors = vectors[1:] | |
| cosine_similarities = cosine_similarity([query_vector], sentences_vectors).flatten() | |
| ranked_indices = cosine_similarities.argsort()[-top_n:][::-1] | |
| return [sentences[idx] for idx in ranked_indices] | |
| domains = [ | |
| "wikipedia.org", "nytimes.com", "cnn.com", "bbc.com", "theguardian.com", | |
| "forbes.com", "reuters.com", "cnbc.com", "bloomberg.com", "foxnews.com", | |
| "npr.org", "washingtonpost.com", "wsj.com", "aljazeera.com", "ft.com", | |
| "huffpost.com", "nationalgeographic.com", "scientificamerican.com", | |
| "nature.com", "time.com", "usatoday.com", "apnews.com", "abcnews.go.com", | |
| "cbsnews.com", "nbcnews.com", "news.yahoo.com", "theatlantic.com", | |
| "vox.com", "politico.com", "economist.com" | |
| ] | |
| # Define number of results we want to retrieve | |
| num_results_needed = 40 | |
| all_results = [] | |
| start = 0 | |
| def get_web_content(user_query,num_results_needed): | |
| num = 50 | |
| all_results = search(user_query,num) | |
| t1=time.time() | |
| text_combined=[] | |
| web_context=[] | |
| for result in all_results: | |
| text = fetch_article_text_sequential_new(result) | |
| print("===============================") | |
| print(result) | |
| print("\n\n") | |
| print(text) | |
| print("===============================") | |
| text= text.splitlines() | |
| text_combined.extend(text) | |
| for line in text_combined: | |
| if not any(excluded_phrase in line for excluded_phrase in exclude): | |
| if len(line.split())>8: | |
| web_context.append(line) | |
| top_sentences = rank_sentences(web_context) | |
| t2=time.time() | |
| minutes, seconds = divmod(t2-t1, 60) | |
| print(f"{minutes} minutes and {seconds} seconds") | |
| ans = "\n".join(sentence.strip() for sentence in top_sentences if sentence.strip()) | |
| return ans | |
| # Get the token from the environment variable | |
| api_token = os.getenv('HF_TOKEN') | |
| # Load pre-trained model and tokenizer | |
| model_name = "gpt2-large" | |
| model = GPT2LMHeadModel.from_pretrained(model_name) | |
| tokenizer = GPT2Tokenizer.from_pretrained(model_name) | |
| #device = torch.device("mps") | |
| #model.to(device) | |
| model.eval() | |
| def create_ngrams(tokens, n): return [tuple(tokens[i:i+n]) for i in range(len(tokens)-n+1)] | |
| ###Smoothing___ | |
| def kneser_ney_smoothing(ngram_counts, lower_order_counts, discount=0.75): | |
| """ | |
| Apply Kneser-Ney smoothing to n-gram counts. | |
| Args: | |
| ngram_counts (Counter): Counts of n-grams (e.g., 4-grams or 3-grams). | |
| lower_order_counts (Counter): Counts of (n-1)-grams (e.g., 3-grams or 2-grams). | |
| discount (float): Discounting parameter. | |
| Returns: | |
| defaultdict: Smoothed probabilities. | |
| """ | |
| continuation_counts = Counter() | |
| lower_counts = Counter() | |
| for ngram in ngram_counts: | |
| lower_ngram = ngram[1:] | |
| continuation_counts[lower_ngram] += 1 | |
| lower_counts[lower_ngram] += 1 | |
| def continuation_probability(word): | |
| return continuation_counts[word] / sum(continuation_counts.values()) | |
| probabilities = defaultdict(lambda: defaultdict(float)) | |
| for ngram, count in ngram_counts.items(): | |
| lower_ngram = ngram[:-1] | |
| lower_count = lower_order_counts[lower_ngram] | |
| discounted_count = max(count - discount, 0) | |
| lambda_factor = (discount / lower_count) * len(continuation_counts) | |
| probabilities[lower_ngram][ngram[-1]] = (discounted_count / lower_count) + lambda_factor * continuation_probability(ngram[-1]) | |
| return probabilities | |
| def get_probability_from_context(Context): | |
| context_tokens = tokenizer.tokenize(Context) | |
| four_grams = create_ngrams(context_tokens, 4) | |
| three_grams = create_ngrams(context_tokens, 3) | |
| four_gram_counts = Counter(four_grams) | |
| three_gram_counts = Counter(three_grams) | |
| probabilities = kneser_ney_smoothing(four_gram_counts, three_gram_counts) | |
| return probabilities, four_gram_counts, three_gram_counts | |
| def predict_next_token(probabilities, three_gram): | |
| return probabilities.get(three_gram, {}) | |
| def generate_text_with_probs(initial_context, context_text , top_p, max_length, top_k, threshold=0.6): | |
| Tokens = {} | |
| #input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device='mps') | |
| input_ids = tokenizer.encode(initial_context, return_tensors="pt").to(device='cpu') | |
| generated_text = initial_context | |
| token_tables = [] | |
| token_no = 1 | |
| context_tokens = tokenizer.tokenize(context_text) | |
| four_grams = create_ngrams(context_tokens, 4) | |
| three_grams = create_ngrams(context_tokens, 3) | |
| two_grams = create_ngrams(context_tokens, 2) | |
| one_grams = create_ngrams(context_tokens, 1) | |
| four_gram_counts = Counter(four_grams) | |
| three_gram_counts = Counter(three_grams) | |
| two_grams_counts = Counter(two_grams) | |
| one_grams_counts = Counter(one_grams) | |
| prob_list = ["four_gram", "three_gram", "two_gram", "one_gram"] # Define prob_list here | |
| prob = [four_gram_counts ,three_gram_counts ,two_grams_counts ,one_grams_counts] | |
| probs = kneser_ney_smoothing(four_gram_counts, three_gram_counts) | |
| use_llm = 0 | |
| use_llm_back_up = 0 | |
| use_ngram = 0 | |
| flag = False | |
| count = 0 | |
| Token_index = 0 | |
| colored_text = initial_context | |
| with torch.no_grad(): | |
| #while len(generated_text.split()) < max_length: | |
| for _ in range(max_length): | |
| outputs = model(input_ids=input_ids) | |
| next_token_logits = outputs.logits[:, -1, :] | |
| sorted_logits, sorted_indices = torch.sort(next_token_logits, descending=True) | |
| cumulative_probs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1) | |
| sorted_indices_to_remove = cumulative_probs > top_p | |
| sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[..., :-1].clone() | |
| sorted_indices_to_remove[..., 0] = 0 | |
| indices_to_remove = sorted_indices[sorted_indices_to_remove] | |
| next_token_logits[:, indices_to_remove] = -float('Inf') | |
| probabilities = torch.softmax(next_token_logits, dim=-1) | |
| top_tokens = sorted_indices[0, :top_k] | |
| top_probs = probabilities[0, top_tokens] | |
| top_token_probs = [(tokenizer.decode([token.item()]), prob.item()) for token, prob in zip(top_tokens, top_probs)] | |
| df = pd.DataFrame(top_token_probs, columns=["Token", "Probability"]) | |
| df.index = df.index + 1 | |
| token_tables.append((f"{token_no}>> Next token options from LLM", df)) | |
| ##print("Next token options from LLM") | |
| ##print(df) | |
| cumulative_prob = cumulative_probs[0, top_k - 1].item() | |
| ##print(f"cumulative_prob from LLM: {cumulative_prob}") | |
| entropy = (-1)*np.sum(np.array(df['Probability'])*np.log(df['Probability'])) | |
| ##print("LLM Entropy:",(-1)*np.sum(np.array(df['Probability'])*np.log(df['Probability']))) | |
| ##print("\n") | |
| input_text = tokenizer.decode(input_ids[0], skip_special_tokens=True) | |
| input_tokens = tokenizer.tokenize(input_text) | |
| use_llm += 1 | |
| __token_pob__ = {} | |
| num = 0 | |
| num_ = 4 | |
| while __token_pob__ == {} and num < 3: | |
| probs = kneser_ney_smoothing(prob[num],prob[num+1]) | |
| __inputs__ = tuple(input_tokens[-(3-num):]) | |
| __token_pob__ = probs.get(__inputs__, {}) | |
| ##print(num,"\n",num_) | |
| num += 1 | |
| num_ -= 1 | |
| ##print(f"Next word probs N_GRAM:{__token_pob__},\n input_{num_}_gram: {__inputs__},\n using {prob_list[num]}_counter and {prob_list[num-1]}_counter; probability exist: {__token_pob__ != {}}") | |
| df = pd.DataFrame(list(__token_pob__.items()), columns=['Token', 'Probability']) | |
| df.index = df.index + 1 | |
| token_tables.append((f"{token_no}>> Next token options from N_gram", df)) | |
| token_no +=1 | |
| ##print(f"Next token options from N_GRAM:") | |
| ##print(df) | |
| ##print("Cumulative Probability of N_gram:",np.sum(df['Probability'])) | |
| #print("\n") | |
| if cumulative_prob < threshold and __token_pob__ != {} and flag == True and count >= 4 or np.sum(df['Probability']) > cumulative_prob: | |
| Token_index+=1 | |
| #if cumulative_prob < threshold and __token_pob__ != {} and flag == True and count >= 4 or entropy >= 0.6: | |
| ##print("Using n-gram model") | |
| next_token = max(__token_pob__, key=__token_pob__.get) | |
| if next_token == 'Ċ': | |
| sorted_tokens = sorted(__token_pob__.items(), key=lambda x: x[1], reverse=True) | |
| if len(sorted_tokens) > 1: | |
| next_token = sorted_tokens[1][0] | |
| ##print("Second max token : ", next_token) | |
| Tokens[Token_index] = [next_token,"ngram",__token_pob__[next_token]] | |
| ####### | |
| color_code = "#78bfd3" # Light blue for n-gram | |
| colored_text += f"<span style='color: {color_code}'>{tokenizer.convert_tokens_to_string(next_token)}</span>" | |
| else: | |
| Tokens[Token_index] = [next_token,"ngram",__token_pob__[next_token]] | |
| ###### | |
| color_code = "#78bfd3" # Light blue for n-gram | |
| colored_text += f"<span style='color: {color_code}'>{tokenizer.convert_tokens_to_string(next_token)}</span>" | |
| ##print("n-gram token : ",next_token) | |
| input_tokens.append(next_token) | |
| generated_text = tokenizer.convert_tokens_to_string(input_tokens) | |
| ##print(generated_text) | |
| initial_context = generated_text | |
| #input_ids = tokenizer.encode(generated_text, return_tensors="pt").to(device='mps') | |
| input_ids = tokenizer.encode(generated_text, return_tensors="pt").to(device='cpu') | |
| use_ngram += 1 | |
| else: | |
| ##print("Using LLM") | |
| Token_index+=1 | |
| next_token = torch.multinomial(probabilities, num_samples=1) | |
| next_token_prob = probabilities[0, next_token].item() | |
| next_token_text = tokenizer.decode(next_token.item()) | |
| ##print("LLM token : ",next_token_text) | |
| Tokens[Token_index] = [next_token_text,"llm",next_token_prob] | |
| color_code = "#c99a6e" | |
| colored_text += f"<span style='color: {color_code}'>{next_token_text}</span>" | |
| count += 1 | |
| if count >= 4: | |
| flag = True | |
| #token_no += 1 | |
| input_ids = torch.cat([input_ids, next_token], dim=-1) | |
| if next_token.item() == tokenizer.eos_token_id: | |
| break | |
| generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True) | |
| ##print(generated_text) | |
| initial_context = generated_text | |
| use_llm_back_up += 1 | |
| ##print(initial_context) | |
| ##print('-------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n') | |
| ##print("\n\n") | |
| generated_text = tokenizer.decode(input_ids[0], skip_special_tokens=True) | |
| #total = use_llm + use_llm_back_up + use_ngram | |
| ##print(f"total: {use_llm} ({(use_llm / total) * 100:.2f}%)") | |
| ##print(f"use_llms: {use_llm_back_up} ({(use_llm_back_up / total) * 100:.2f}%)") | |
| ##print(f"use_ngram: {use_ngram} ({(use_ngram / total) * 100:.2f}%)") | |
| ##print('-------------------------------------------------------------------------------------------------------------------------------------------------------------\n\n') | |
| return generated_text, Tokens, token_tables,colored_text | |
| def save_content_as_file(question, docs): | |
| # Fetch the web content based on the question | |
| content = get_web_content(question, docs) | |
| # Define file path to save the content | |
| file_path = "fetched_content.txt" | |
| # Write the content to a text file | |
| with open(file_path, "w") as f: | |
| f.write(content) | |
| # Return the file path to download | |
| return file_path | |
| '''def combined_model_predictions(query, initial_context, top_p, max_length, top_k, threshold, docs): | |
| Question = [query] | |
| context_text = get_web_content(Question[0], docs) | |
| print('Content Fetched') | |
| generated_text, tokens, token_tables, colored_html = generate_text_with_probs(initial_context, context_text, top_p, max_length, top_k, threshold) | |
| data_list = [(token_index, tupes[0], tupes[1], tupes[2]) for token_index, tupes in tokens.items()] | |
| df = pd.DataFrame(data_list, columns=['Token_pos', 'Token', 'Source Model', "Probability"]) | |
| return colored_html, df, token_tables | |
| iface = gr.Interface( | |
| fn=combined_model_predictions, | |
| inputs=[ | |
| gr.Textbox(lines=2,placeholder="Enter query here..."), | |
| gr.Textbox(lines=2,placeholder="Enter initial context here..."), | |
| gr.Slider(0, 1, step=0.01, value=0.9, label="Top-p (nucleus) sampling"), | |
| gr.Slider(1, 100, value= 4, step=1, label="Max Length"), | |
| gr.Slider(1, 50, value= 5, step=1, label="Top-k"), | |
| gr.Slider(0, 1, step=0.01, value=0.9, label="LLM cumulative Threshold"), | |
| gr.Slider(1, 50, step=1, value=10, label="Web_retrieved Docs to fetch") | |
| ], | |
| outputs=[ | |
| gr.HTML(label="Generated Text"), | |
| gr.Dataframe(label="Tokens"), | |
| gr.Dataframe(label="Token tables"), | |
| ], | |
| title="Next Token Visualizer (GPT-2-large - 812M param.)" | |
| ) | |
| iface.launch()''' | |
| import pandas as pd | |
| import gradio as gr | |
| def combined_model_predictions(query, initial_context, top_p, max_length, top_k, threshold, docs): | |
| Question = [query] | |
| context_text = get_web_content(Question[0], docs) | |
| print('Content Fetched') | |
| # Write context_text to a .txt file | |
| file_name = "context_corpora.txt" | |
| with open(file_name, "w") as file: | |
| file.write(context_text) | |
| # Generate the text using the model | |
| generated_text, tokens, token_tables, colored_html = generate_text_with_probs(initial_context, context_text, top_p, max_length, top_k, threshold) | |
| # Create a DataFrame for tokens | |
| data_list = [(token_index, tupes[0], tupes[1], tupes[2]) for token_index, tupes in tokens.items()] | |
| df = pd.DataFrame(data_list, columns=['Token_pos', 'Token', 'Source Model', "Probability"]) | |
| # Return the file path for download, colored HTML, and DataFrames | |
| return file_name, colored_html, df, token_tables | |
| # Gradio interface | |
| iface = gr.Interface( | |
| fn=combined_model_predictions, | |
| inputs=[ | |
| gr.Textbox(lines=2, placeholder="Enter query here..."), | |
| gr.Textbox(lines=2, placeholder="Enter initial context here..."), | |
| gr.Slider(0, 1, step=0.01, value=0.9, label="Top-p (nucleus) sampling"), | |
| gr.Slider(1, 100, value=4, step=1, label="Max Length"), | |
| gr.Slider(1, 50, value=5, step=1, label="Top-k"), | |
| gr.Slider(0, 1, step=0.01, value=0.9, label="LLM cumulative Threshold"), | |
| gr.Slider(1, 50, step=1, value=10, label="Web_retrieved Docs to fetch") | |
| ], | |
| outputs=[ | |
| gr.File(label="Download Context Corpora"), | |
| gr.HTML(label="Generated Text"), | |
| gr.Dataframe(label="Tokens"), | |
| gr.Dataframe(label="Token tables"), | |
| ], | |
| title="Next Token Visualizer (GPT-2-large - 812M param.)" | |
| ) | |
| iface.launch() | |