Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| import numpy as np | |
| from openai import OpenAI | |
| import pickle | |
| from huggingface_hub import hf_hub_download | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import httpx | |
| # ============================================ | |
| # CONFIGURATION | |
| # ============================================ | |
| HF_DATASET_REPO = "vijaykumaredstellar/edstellar-internal-linking-kb" | |
| EMBEDDING_MODEL = "openai/text-embedding-3-small" | |
| CHAT_MODEL = "deepseek/deepseek-chat" | |
| # ============================================ | |
| # KNOWLEDGE BASE | |
| # ============================================ | |
| class KnowledgeBase: | |
| def __init__(self): | |
| self.knowledge_base = [] | |
| self.embeddings = None | |
| self.loaded = False | |
| def load_from_huggingface(self, repo_id, hf_token=None): | |
| """Load knowledge base from Hugging Face""" | |
| try: | |
| token = hf_token.strip() if hf_token and hf_token.strip() else None | |
| kb_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename='knowledge_base.pkl', | |
| repo_type='dataset', | |
| token=token | |
| ) | |
| with open(kb_path, 'rb') as f: | |
| data = pickle.load(f) | |
| self.knowledge_base = data['knowledge_base'] | |
| self.embeddings = data['embeddings'] | |
| self.loaded = True | |
| num_posts = len(set(p['url'] for p in self.knowledge_base)) | |
| return True, f"β Loaded {len(self.knowledge_base)} paragraphs from {num_posts} blog posts" | |
| except Exception as e: | |
| return False, f"β Error: {str(e)}" | |
| def search(self, query_embedding, top_k=50): | |
| """Find most similar paragraphs""" | |
| if not self.loaded: | |
| return [] | |
| query_embedding = np.array(query_embedding).reshape(1, -1) | |
| similarities = cosine_similarity(query_embedding, self.embeddings)[0] | |
| top_indices = np.argsort(similarities)[-top_k:][::-1] | |
| results = [] | |
| for idx in top_indices: | |
| results.append({ | |
| **self.knowledge_base[idx], | |
| 'similarity_score': float(similarities[idx]) | |
| }) | |
| return results | |
| # ============================================ | |
| # OPENROUTER CLIENT | |
| # ============================================ | |
| class OpenRouterClient: | |
| def __init__(self, api_key): | |
| http_client = httpx.Client( | |
| headers={ | |
| "HTTP-Referer": "https://edstellar.com", | |
| "X-Title": "Edstellar Internal Linking Tool" | |
| }, | |
| timeout=60.0 | |
| ) | |
| self.client = OpenAI( | |
| base_url="https://openrouter.ai/api/v1", | |
| api_key=api_key, | |
| http_client=http_client | |
| ) | |
| def get_embedding(self, text): | |
| """Get embedding vector""" | |
| response = self.client.embeddings.create( | |
| model=EMBEDDING_MODEL, | |
| input=text[:8000] | |
| ) | |
| return response.data[0].embedding | |
| def chat(self, messages, temperature=0.3): | |
| """Get LLM response""" | |
| response = self.client.chat.completions.create( | |
| model=CHAT_MODEL, | |
| messages=messages, | |
| temperature=temperature | |
| ) | |
| return response.choices[0].message.content | |
| # ============================================ | |
| # ORPHAN PAGE ANALYZER | |
| # ============================================ | |
| class OrphanPageAnalyzer: | |
| def __init__(self, kb, client): | |
| self.kb = kb | |
| self.client = client | |
| def analyze(self, orphan_url, orphan_title, orphan_keyword, num_sources=3): | |
| """ | |
| Find pages in knowledge base that should link TO the orphan page | |
| Orphan page does NOT need to be in the knowledge base | |
| """ | |
| # Create search query from orphan page info | |
| search_query = f"{orphan_title} {orphan_keyword}" | |
| print(f"π Searching for pages related to: {search_query}") | |
| # Get embedding for the orphan page topic | |
| query_embedding = self.client.get_embedding(search_query) | |
| # Search knowledge base for relevant paragraphs | |
| candidates = self.kb.search(query_embedding, top_k=50) | |
| print(f"π Found {len(candidates)} candidate paragraphs") | |
| # Group by URL (to find source pages) | |
| url_scores = {} | |
| for item in candidates: | |
| url = item['url'] | |
| # Skip if somehow the orphan URL is in KB | |
| if url == orphan_url: | |
| continue | |
| if url not in url_scores: | |
| url_scores[url] = { | |
| 'url': url, | |
| 'title': item['title'], | |
| 'category': item['category'], | |
| 'keyword': item['keyword'], | |
| 'paragraphs': [] | |
| } | |
| url_scores[url]['paragraphs'].append({ | |
| 'index': item['paragraph_index'], | |
| 'text': item['text'], | |
| 'similarity': item['similarity_score'] | |
| }) | |
| print(f"π Found {len(url_scores)} unique source pages") | |
| # Rank source pages | |
| ranked_sources = [] | |
| for url, data in url_scores.items(): | |
| avg_sim = np.mean([p['similarity'] for p in data['paragraphs']]) | |
| max_sim = max([p['similarity'] for p in data['paragraphs']]) | |
| score = (avg_sim * 0.5 + max_sim * 0.5) | |
| ranked_sources.append({ | |
| **data, | |
| 'score': score | |
| }) | |
| ranked_sources.sort(key=lambda x: x['score'], reverse=True) | |
| top_sources = ranked_sources[:num_sources] | |
| print(f"β Selected top {len(top_sources)} sources") | |
| # Generate linking recommendations for each source | |
| results = [] | |
| for idx, source in enumerate(top_sources, 1): | |
| print(f"π Processing source {idx}/{len(top_sources)}: {source['title']}") | |
| # Get best paragraph in this source | |
| best_para = max(source['paragraphs'], key=lambda x: x['similarity']) | |
| # Generate anchor text | |
| anchor_prompt = f"""Generate a natural 2-4 word anchor text to link to this page: | |
| Target Page Title: {orphan_title} | |
| Target Keyword: {orphan_keyword} | |
| Context where link will be placed: | |
| {best_para['text'][:200]}... | |
| Provide ONLY the anchor text, no quotes or explanation.""" | |
| anchor_text = self.client.chat([ | |
| {"role": "user", "content": anchor_prompt} | |
| ]).strip().strip('"').strip("'") | |
| # Generate modified sentence | |
| modify_prompt = f"""Modify this sentence to naturally include an internal link. | |
| Current sentence: | |
| {best_para['text']} | |
| Add this internal link: | |
| - Anchor text: "{anchor_text}" | |
| - Target page: {orphan_title} | |
| - Target URL: {orphan_url} | |
| Provide ONLY the modified sentence with the anchor text naturally integrated.""" | |
| new_sentence = self.client.chat([ | |
| {"role": "user", "content": modify_prompt} | |
| ]).strip() | |
| results.append({ | |
| 'source_url': source['url'], | |
| 'source_title': source['title'], | |
| 'score': int(source['score'] * 100), | |
| 'paragraph_index': best_para['index'], | |
| 'current_sentence': best_para['text'], | |
| 'new_sentence': new_sentence, | |
| 'anchor_text': anchor_text, | |
| 'target_url': orphan_url | |
| }) | |
| # Generate report | |
| report = self.generate_report(orphan_url, orphan_title, results) | |
| # Generate table | |
| df = pd.DataFrame([{ | |
| 'Source Page': r['source_title'][:50], | |
| 'Paragraph #': r['paragraph_index'], | |
| 'Score': r['score'], | |
| 'Anchor Text': r['anchor_text'], | |
| 'Current Sentence': r['current_sentence'][:100] + '...', | |
| 'New Sentence': r['new_sentence'][:100] + '...' | |
| } for r in results]) | |
| return report, df | |
| def generate_report(self, orphan_url, orphan_title, results): | |
| """Generate markdown report""" | |
| report = f"# π Internal Linking Report\n\n" | |
| report += f"**Orphan Page:** {orphan_title}\n" | |
| report += f"**Target URL:** `{orphan_url}`\n" | |
| report += f"**Links Generated:** {len(results)}\n\n" | |
| report += "---\n\n" | |
| for i, result in enumerate(results, 1): | |
| report += f"## Link {i}: {result['source_title']}\n\n" | |
| report += f"**Source URL:** `{result['source_url']}`\n" | |
| report += f"**Paragraph #:** {result['paragraph_index']}\n" | |
| report += f"**Relevance Score:** {result['score']}/100\n" | |
| report += f"**Anchor Text:** \"{result['anchor_text']}\"\n\n" | |
| report += "### Current Sentence:\n" | |
| report += "```\n" | |
| report += result['current_sentence'] + "\n" | |
| report += "```\n\n" | |
| report += "### New Sentence (with link):\n" | |
| report += "```\n" | |
| report += result['new_sentence'] + "\n" | |
| report += "```\n\n" | |
| report += "### HTML Code:\n" | |
| report += "```html\n" | |
| html_code = result['new_sentence'].replace( | |
| result['anchor_text'], | |
| f'<a href="{result["target_url"]}">{result["anchor_text"]}</a>' | |
| ) | |
| report += html_code + "\n" | |
| report += "```\n\n" | |
| report += "---\n\n" | |
| return report | |
| # ============================================ | |
| # GLOBAL STATE | |
| # ============================================ | |
| kb = KnowledgeBase() | |
| analyzer = None | |
| # ============================================ | |
| # GRADIO FUNCTIONS | |
| # ============================================ | |
| def setup(api_key, hf_token): | |
| """Setup API and load knowledge base""" | |
| global analyzer | |
| if not api_key or not api_key.strip(): | |
| return "β Please enter your OpenRouter API key", None | |
| try: | |
| client = OpenRouterClient(api_key) | |
| status = ["β API key configured"] | |
| except Exception as e: | |
| return f"β API Error: {str(e)}", None | |
| # Load knowledge base | |
| success, message = kb.load_from_huggingface(HF_DATASET_REPO, hf_token) | |
| if not success: | |
| return f"β API key configured\n{message}", None | |
| status.append(message) | |
| # Create analyzer | |
| analyzer = OrphanPageAnalyzer(kb, client) | |
| status.append("β System ready!") | |
| return "\n".join(status), None | |
| def analyze_orphan(orphan_url, orphan_title, orphan_keyword, num_sources): | |
| """Analyze orphan page and generate report""" | |
| if not analyzer: | |
| return "β Please complete setup first", None | |
| if not orphan_url or not orphan_url.strip(): | |
| return "β Please enter an orphan page URL", None | |
| if not orphan_title or not orphan_title.strip(): | |
| return "β Please enter the orphan page title", None | |
| try: | |
| report, table = analyzer.analyze( | |
| orphan_url.strip(), | |
| orphan_title.strip(), | |
| orphan_keyword.strip() if orphan_keyword else orphan_title.strip(), | |
| num_sources | |
| ) | |
| return report, table | |
| except Exception as e: | |
| import traceback | |
| error_detail = traceback.format_exc() | |
| return f"β Error: {str(e)}\n\nDetails:\n{error_detail}", None | |
| # ============================================ | |
| # INTERFACE | |
| # ============================================ | |
| with gr.Blocks(title="Edstellar Internal Linking Tool", theme=gr.themes.Soft()) as app: | |
| gr.Markdown("# π Edstellar Internal Linking Tool") | |
| gr.Markdown("Find the best existing blog posts to link to your orphan page") | |
| # Setup Section | |
| with gr.Accordion("βοΈ Setup (Do this once)", open=True): | |
| gr.Markdown("### Configure API Keys") | |
| with gr.Row(): | |
| api_key = gr.Textbox( | |
| label="OpenRouter API Key", | |
| placeholder="sk-or-v1-...", | |
| type="password", | |
| scale=2 | |
| ) | |
| hf_token = gr.Textbox( | |
| label="Hugging Face Token", | |
| placeholder="hf_...", | |
| type="password", | |
| scale=2 | |
| ) | |
| setup_btn = gr.Button("π Setup System", variant="primary", size="lg") | |
| setup_status = gr.Textbox(label="Setup Status", lines=3, interactive=False) | |
| gr.Markdown("---") | |
| # Analysis Section | |
| gr.Markdown("### π Analyze Orphan Page") | |
| gr.Markdown("Enter details about the orphan page you want to get links FOR") | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| orphan_url_input = gr.Textbox( | |
| label="Orphan Page URL", | |
| placeholder="https://edstellar.com/blog/your-orphan-page", | |
| info="The page that needs backlinks" | |
| ) | |
| orphan_title_input = gr.Textbox( | |
| label="Orphan Page Title", | |
| placeholder="Business Development Manager Roles", | |
| info="The title/topic of your orphan page" | |
| ) | |
| orphan_keyword_input = gr.Textbox( | |
| label="Primary Keyword (Optional)", | |
| placeholder="business development", | |
| info="Main keyword for anchor text generation" | |
| ) | |
| with gr.Column(scale=1): | |
| num_sources_input = gr.Slider( | |
| label="Number of Sources", | |
| minimum=3, | |
| maximum=5, | |
| value=3, | |
| step=1, | |
| info="How many source pages to find" | |
| ) | |
| analyze_btn = gr.Button("π Analyze & Generate Report", variant="primary", size="lg") | |
| gr.Markdown("---") | |
| # Results Section | |
| gr.Markdown("### π Report") | |
| report_output = gr.Markdown() | |
| gr.Markdown("### π Summary Table") | |
| table_output = gr.Dataframe( | |
| label="Quick Overview", | |
| wrap=True, | |
| interactive=False | |
| ) | |
| # Wire up events | |
| setup_btn.click( | |
| setup, | |
| inputs=[api_key, hf_token], | |
| outputs=[setup_status, table_output] | |
| ) | |
| analyze_btn.click( | |
| analyze_orphan, | |
| inputs=[orphan_url_input, orphan_title_input, orphan_keyword_input, num_sources_input], | |
| outputs=[report_output, table_output] | |
| ) | |
| # Launch | |
| if __name__ == "__main__": | |
| app.launch() |