T-K-O-H
huggingface problem 1.2
a34656d
import gradio as gr
import os
from dotenv import load_dotenv
from sentence_transformers import SentenceTransformer, util
from youtube_transcript_api import YouTubeTranscriptApi
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langgraph.graph import StateGraph, END
from typing import Dict, TypedDict, Annotated, List, Tuple, Union, Optional
import json
# import PyPDF2
# import requests
# # from bs4 import BeautifulSoup
# import io
# # import trafilatura
# # import ragas
# # from ragas.metrics import (
# # faithfulness,
# # answer_relevancy,
# # context_recall,
# # context_precision
# # )
from datasets import Dataset
# import plotly.graph_objects as go
import numpy as np
from langchain_community.vectorstores import FAISS
import asyncio
import nest_asyncio
from langchain_chroma import Chroma
from langchain.schema import Document
from datetime import datetime
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import RecursiveCharacterTextSplitter
# from ragas import evaluate
# # Configure asyncio
# nest_asyncio.apply()
# asyncio.set_event_loop(asyncio.new_event_loop())
# Load environment variables
load_dotenv(verbose=True)
# Verify OpenAI API key
if not os.getenv("OPENAI_API_KEY"):
raise ValueError("OpenAI API key not found.")
# Define state types
class ProcessState(TypedDict):
video_url: str
transcript: str
enhanced: str
linkedin_post: str
verification: dict
error: str
status: str
verification_score: float
enhancement_attempts: int
needs_improvement: bool
research_context: str
class FineTunedModelManager:
def __init__(self, model_name: str = "Shipmaster1/finetuned_mpnet_matryoshka_mnr"):
self.model = SentenceTransformer(model_name)
self.embeddings = HuggingFaceEmbeddings(
model_name=model_name,
model_kwargs={'device': 'cpu'},
encode_kwargs={'normalize_embeddings': True}
)
def get_embeddings(self, texts: List[str]) -> np.ndarray:
"""Get embeddings for a list of texts."""
return self.model.encode(texts, convert_to_numpy=True)
def semantic_search(self, query: str, documents: List[str], top_k: int = 3) -> List[Tuple[str, float]]:
"""Perform semantic search using the fine-tuned model."""
# Get embeddings
query_embedding = self.model.encode(query, convert_to_numpy=True)
doc_embeddings = self.model.encode(documents, convert_to_numpy=True)
# Calculate similarity scores
scores = util.cos_sim(query_embedding, doc_embeddings)[0]
# Get top k results
top_results = []
for score, doc in zip(scores, documents):
top_results.append((doc, float(score)))
# Sort by score and return top k
return sorted(top_results, key=lambda x: x[1], reverse=True)[:top_k]
def find_similar_content(self, content: str, corpus: List[str], threshold: float = 0.7) -> List[Tuple[str, float]]:
"""Find similar content in a corpus using the fine-tuned model."""
# Get embeddings
content_embedding = self.model.encode(content, convert_to_numpy=True)
corpus_embeddings = self.model.encode(corpus, convert_to_numpy=True)
# Calculate similarity scores
scores = util.cos_sim(content_embedding, corpus_embeddings)[0]
# Filter by threshold
similar_content = []
for score, text in zip(scores, corpus):
if float(score) >= threshold:
similar_content.append((text, float(score)))
return sorted(similar_content, key=lambda x: x[1], reverse=True)
def process_text(self, text: str) -> Dict[str, float]:
"""Process text using the fine-tuned model to extract features."""
# Get embedding
embedding = self.model.encode(text, convert_to_numpy=True)
# Calculate various features
features = {
'embedding_norm': float(np.linalg.norm(embedding)),
'embedding_mean': float(np.mean(embedding)),
'embedding_std': float(np.std(embedding))
}
return features
# Initialize the model manager
model_manager = FineTunedModelManager()
def extract_video_id(url: str) -> str:
"""Extract video ID from YouTube URL."""
if "youtu.be" in url:
return url.split("/")[-1]
return url.split("v=")[-1].split("&")[0]
def get_transcript(state: ProcessState, progress=gr.Progress()) -> ProcessState:
"""Get transcript from YouTube video."""
try:
progress(0.25, desc="Fetching transcript...")
video_id = extract_video_id(state["video_url"])
transcript = YouTubeTranscriptApi.get_transcript(video_id)
state["transcript"] = " ".join([segment["text"] for segment in transcript])
state["status"] = "✅ Transcript fetched"
return state
except Exception as e:
error_message = str(e).lower()
if "too many requests" in error_message or "429" in error_message:
state["error"] = "⚠️ YouTube API rate limit reached. Please wait a few minutes and try again."
state["status"] = "❌ Rate limit exceeded"
else:
state["error"] = f"⚠️ Error fetching transcript: {str(e)}"
state["status"] = "❌ Failed to fetch transcript"
return state
def get_chroma_collection(model_name: str = "Shipmaster1/finetuned_mpnet_matryoshka_mnr"):
"""Get or create a Chroma collection using LangChain's abstraction."""
try:
# Use the model manager's embeddings
collection = Chroma(
collection_name="youtube_videos",
embedding_function=model_manager.embeddings,
persist_directory="./chroma_db"
)
return collection
except Exception as e:
raise Exception(f"Error creating Chroma collection: {str(e)}")
def enhance_content(state: ProcessState, progress=gr.Progress()) -> ProcessState:
"""Enhance the transcript content with semantic search and similarity analysis."""
try:
if not state["transcript"]:
return state
progress(0.50, desc="Enhancing content...")
# Get similar content from the vector store
collection = get_chroma_collection()
similar_docs = collection.similarity_search(
state["transcript"],
k=3
)
# Process the transcript using the fine-tuned model
transcript_features = model_manager.process_text(state["transcript"])
# Initialize LLM for content generation
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert content enhancer. Transform this transcript into engaging content:
1. Identify and emphasize key points
2. Add context and examples
3. Make it more engaging and professional
4. Keep it concise (max 3000 characters)
5. Maintain factual accuracy
Transcript:
{transcript}
Similar Content for Context:
{similar_content}
Transcript Features:
{features}"""),
("human", "Enhance this content for a professional audience.")
])
chain = prompt | llm | StrOutputParser()
state["enhanced"] = chain.invoke({
"transcript": state["transcript"],
"similar_content": "\n".join([doc.page_content for doc in similar_docs]),
"features": str(transcript_features)
})
state["status"] = "✅ Content enhanced"
return state
except Exception as e:
state["error"] = f"⚠️ Error enhancing content: {str(e)}"
state["status"] = "❌ Failed to enhance content"
return state
def format_linkedin_post(state: ProcessState, progress=gr.Progress()) -> ProcessState:
"""Format content as a LinkedIn post."""
try:
if not state["enhanced"]:
return state
progress(0.75, desc="Formatting for LinkedIn...")
# Initialize LLM for formatting
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", """Create an engaging LinkedIn post from this content. The post should be:
1. Natural and conversational - write like a real person sharing insights
2. Focused on value - emphasize practical takeaways and actionable insights
3. Authentic - avoid overused phrases or corporate speak
4. Visually clean - use line breaks and emojis sparingly and purposefully
5. Under 1500 characters
Content Preservation Rules:
- MUST maintain the exact same topic and subject matter
- MUST keep all specific examples, techniques, and exercises mentioned
- MUST preserve the original context and purpose
- MUST include all key points from the original content
- MUST maintain the same level of technical detail
- MUST keep the same target audience in mind
- MUST preserve any specific terminology or jargon that's important to the topic
- MUST maintain the same tone and expertise level
Formatting Guidelines:
- Start with a hook that grabs attention
- Share insights in a natural flow
- Use 2-3 relevant hashtags maximum
- End with a genuine call to action
- Avoid numbered lists unless absolutely necessary
- Don't use section headers or dividers
- Don't use bullet points or emoji bullets
- Don't use multiple hashtag groups
Content to transform:
{content}
Remember: The goal is to make the content more engaging while keeping ALL the original information, examples, and technical details intact."""),
("human", "Create a natural, engaging LinkedIn post that preserves all the original content and context.")
])
chain = prompt | llm | StrOutputParser()
state["linkedin_post"] = chain.invoke({"content": state["enhanced"]})
state["status"] = "✅ LinkedIn post formatted"
return state
except Exception as e:
state["error"] = f"⚠️ Error formatting LinkedIn post: {str(e)}"
state["status"] = "❌ Failed to format LinkedIn post"
return state
def verify_content(state: ProcessState, progress=gr.Progress()) -> ProcessState:
"""Verify the enhanced content against the original using semantic similarity."""
try:
if not state["enhanced"] or not state["transcript"]:
return state
progress(1.0, desc="Verifying content...")
# Initialize enhancement attempts if not present
if "enhancement_attempts" not in state:
state["enhancement_attempts"] = 0
# Calculate semantic similarity
similar_content = model_manager.find_similar_content(
state["enhanced"],
[state["transcript"]],
threshold=0.7
)
similarity_score = similar_content[0][1] if similar_content else 0.0
# Initialize LLM for verification
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
prompt = ChatPromptTemplate.from_messages([
("system", """Verify the enhanced content against the original:
1. Check factual accuracy
2. Ensure key messages are preserved
3. Look for any misrepresentations
Return JSON in this format:
{{
"verified": boolean,
"score": float between 0-1,
"feedback": string with details
}}
Original:
{original}
Enhanced:
{enhanced}
Semantic Similarity Score: {similarity_score}"""),
("human", "Verify this content.")
])
chain = prompt | llm | StrOutputParser()
verification_result = json.loads(chain.invoke({
"original": state["transcript"],
"enhanced": state["enhanced"],
"similarity_score": similarity_score
}))
# Update state with verification results
state["verification"] = verification_result
state["verification_score"] = verification_result["score"]
# Trigger agent decision if score is below threshold
if verification_result["score"] < 0.85 and state["enhancement_attempts"] < 3:
state["needs_improvement"] = True
# Create improvement plan
state = agent_decide(state)
state["status"] = f"🔄 Planning improvements (Attempt {state['enhancement_attempts'] + 1}/3)"
else:
state["needs_improvement"] = False
if verification_result["score"] >= 0.85:
state["status"] = "✅ Content quality threshold met"
else:
state["status"] = "⚠️ Max enhancement attempts reached"
return state
except Exception as e:
state["error"] = f"⚠️ Error verifying content: {str(e)}"
state["status"] = "❌ Failed to verify content"
return state
def should_continue(state: ProcessState) -> bool:
"""Determine if processing should continue."""
return not state.get("error", "")
def create_workflow() -> StateGraph:
"""Create the LangGraph workflow."""
workflow = StateGraph(ProcessState)
# Add nodes
workflow.add_node("get_transcript", get_transcript)
workflow.add_node("enhance_content", enhance_content)
workflow.add_node("format_linkedin", format_linkedin_post)
workflow.add_node("verify_content", verify_content)
workflow.add_node("agent_decide", agent_decide)
workflow.add_node("research_content", research_content)
workflow.add_node("enhance_again", enhance_again)
# Set entry point
workflow.set_entry_point("get_transcript")
# Add edges for main flow
workflow.add_edge("get_transcript", "enhance_content")
workflow.add_edge("enhance_content", "format_linkedin")
workflow.add_edge("format_linkedin", "verify_content")
workflow.add_edge("verify_content", "agent_decide")
# Add conditional edges for agentic flow
workflow.add_conditional_edges(
"agent_decide",
lambda x: x["needs_improvement"],
{
True: "research_content",
False: END
}
)
# Add edges for enhancement loop
workflow.add_edge("research_content", "enhance_again")
workflow.add_edge("enhance_again", "verify_content")
# Add conditional edges for error handling
workflow.add_conditional_edges(
"get_transcript",
should_continue,
{
True: "enhance_content",
False: END
}
)
workflow.add_conditional_edges(
"enhance_content",
should_continue,
{
True: "format_linkedin",
False: END
}
)
workflow.add_conditional_edges(
"format_linkedin",
should_continue,
{
True: "verify_content",
False: END
}
)
workflow.add_conditional_edges(
"verify_content",
should_continue,
{
True: "agent_decide",
False: END
}
)
workflow.add_conditional_edges(
"research_content",
should_continue,
{
True: "enhance_again",
False: END
}
)
workflow.add_conditional_edges(
"enhance_again",
should_continue,
{
True: "verify_content",
False: END
}
)
return workflow
def process_video(video_url: str, progress=gr.Progress()) -> tuple:
"""Process YouTube video and generate LinkedIn post."""
try:
# Input validation
if not video_url:
return (
"⚠️ Please enter a YouTube URL", # error
"❌ Failed: No URL provided", # status
"", # transcript
"", # enhanced
"", # linkedin
"" # verification
)
if "youtube.com" not in video_url and "youtu.be" not in video_url:
return (
"⚠️ Invalid URL. Please enter a YouTube URL", # error
"❌ Failed: Invalid URL", # status
"", # transcript
"", # enhanced
"", # linkedin
"" # verification
)
# Initialize state
initial_state = ProcessState(
video_url=video_url,
transcript="",
enhanced="",
linkedin_post="",
verification={},
error="",
status="Starting..."
)
# Create and run workflow
workflow = create_workflow()
app = workflow.compile()
final_state = app.invoke(initial_state)
# Format verification text
if final_state.get("verification"):
verification_text = f"""Verification Results:
• Status: {"✅ Verified" if final_state["verification"]["verified"] else "❌ Not Verified"}
• Accuracy Score: {final_state["verification"]["score"]:.2f}
• Feedback: {final_state["verification"]["feedback"]}"""
else:
verification_text = ""
return (
final_state.get("error", ""), # error
final_state.get("status", ""), # status
final_state.get("transcript", ""), # transcript
final_state.get("enhanced", ""), # enhanced
final_state.get("linkedin_post", ""), # linkedin
verification_text # verification
)
except Exception as e:
return (
f"⚠️ Error: {str(e)}", # error
"❌ Processing failed", # status
"", # transcript
"", # enhanced
"", # linkedin
"" # verification
)
def process_from_stage(state: ProcessState, start_stage: str, progress=gr.Progress()) -> tuple:
"""Process content from a specific stage onwards."""
try:
# Select appropriate workflow based on stage
if start_stage == "enhance":
workflow = create_workflow()
if not state["transcript"]:
return (
"⚠️ No transcript available to enhance",
"❌ Failed: No transcript",
state.get("transcript", ""),
"",
"",
""
)
elif start_stage == "format":
workflow = create_workflow()
if not state["enhanced"]:
return (
"⚠️ No enhanced content available to format",
"❌ Failed: No enhanced content",
state.get("transcript", ""),
state.get("enhanced", ""),
"",
""
)
else:
workflow = create_workflow()
app = workflow.compile()
final_state = app.invoke(state)
# Format verification text
if final_state.get("verification"):
verification_text = f"""Verification Results:
• Status: {"✅ Verified" if final_state["verification"]["verified"] else "❌ Not Verified"}
• Accuracy Score: {final_state["verification"]["score"]:.2f}
• Feedback: {final_state["verification"]["feedback"]}"""
else:
verification_text = ""
return (
final_state.get("error", ""),
final_state.get("status", ""),
final_state.get("transcript", ""),
final_state.get("enhanced", ""),
final_state.get("linkedin_post", ""),
verification_text
)
except Exception as e:
return (
f"⚠️ Error: {str(e)}",
"❌ Processing failed",
state.get("transcript", ""),
state.get("enhanced", ""),
state.get("linkedin_post", ""),
""
)
def format_verification_text(verification: dict) -> str:
"""Format verification results into a readable string."""
if not verification:
return ""
return f"""Verification Results:
• Status: {"✅ Verified" if verification.get("verified") else "❌ Not Verified"}
• Accuracy Score: {verification.get("score", 0):.2f}
• Feedback: {verification.get("feedback", "No feedback available")}"""
def safe_json_loads(json_str: str, default: dict = None) -> dict:
"""Safely parse JSON string with error handling."""
if default is None:
default = {}
try:
return json.loads(json_str) if json_str else default
except json.JSONDecodeError:
return default
def format_improvement_plan(plan: dict) -> str:
"""Format the improvement plan into a readable string."""
if not plan:
return "No improvement plan available"
text = "📋 Improvement Plan:\n\n"
# Improvement Areas
if "improvement_areas" in plan:
text += "🎯 Priority Areas:\n"
for area in plan["improvement_areas"]:
text += f"• {area.get('area', 'N/A')} (Priority: {area.get('priority', 'N/A')}/5)\n"
text += f" Strategy: {area.get('strategy', 'N/A')}\n"
text += f" Research Focus: {area.get('research_focus', 'N/A')}\n\n"
# Research Priorities
if "research_priorities" in plan:
text += "🔍 Research Priorities:\n"
for topic in plan["research_priorities"]:
text += f"• {topic.get('topic', 'N/A')}\n"
text += f" Reason: {topic.get('reason', 'N/A')}\n"
text += f" Expected Impact: {topic.get('expected_impact', 'N/A')}\n\n"
# Enhancement Strategy
if "enhancement_strategy" in plan:
text += "⚡ Enhancement Strategy:\n"
strategy = plan["enhancement_strategy"]
text += f"• Approach: {strategy.get('approach', 'N/A')}\n"
text += f"• Key Focus: {strategy.get('key_focus', 'N/A')}\n"
text += "• Expected Improvements:\n"
for imp in strategy.get("expected_improvements", []):
text += f" - {imp}\n"
return text
def format_research_results(research: dict) -> str:
"""Format the research results into a readable string."""
if not research:
return "No research results available"
text = "📚 Research Results:\n\n"
# Focused Research
if "focused_research" in research:
text += "🎯 Focused Research by Area:\n"
for area, data in research["focused_research"].items():
text += f"• {area} (Priority: {data.get('priority', 'N/A')}/5)\n"
text += f" Strategy: {data.get('strategy', 'N/A')}\n"
text += " Key Findings:\n"
for content in data.get("content", [])[:1]: # Show first finding
text += f" - {content[:200]}...\n\n"
# Additional Research
if research.get("similar_content"):
text += "📖 Additional Research:\n"
for content in research["similar_content"][:2]: # Show first two
text += f"• {content[:200]}...\n\n"
return text
def create_synthetic_dataset():
"""Generate synthetic dataset for RAG evaluation."""
try:
# Read synthetic data from JSON file
with open('synthetic_data.json', 'r') as f:
data = json.load(f)
# Extract data into lists
questions = []
answers = []
contexts = []
for item in data['data']:
questions.append(item['question'])
answers.append(item['answer'])
contexts.append(item['context'])
# Create dataset
dataset = Dataset.from_dict({
'question': questions,
'answer': answers,
'context': contexts
})
return dataset
except Exception as e:
raise Exception(f"Error generating synthetic dataset: {str(e)}")
class SentenceTransformerWrapper:
"""Wrapper class to make SentenceTransformer compatible with RAGAS evaluation."""
def __init__(self, model):
self.model = model
self.run_config = {}
def embed_documents(self, texts: List[str]) -> List[List[float]]:
"""Embed a list of documents."""
embeddings = self.model.encode(texts)
return embeddings.tolist()
def embed_query(self, text: str) -> List[float]:
"""Embed a single query."""
embedding = self.model.encode([text])[0]
return embedding.tolist()
def embed_text(self, text: Union[str, List[str]]) -> Union[List[float], List[List[float]]]:
"""Embed text (required by RAGAS)."""
if isinstance(text, str):
return self.embed_query(text)
return self.embed_documents(text)
def set_run_config(self, config: dict) -> None:
"""Set run configuration for the model."""
self.run_config = config
def embed(self, text: Union[str, List[str]]) -> Union[List[float], List[List[float]]]:
"""Synchronous embed function."""
return self.embed_text(text)
# def evaluate_models(dataset):
# """Evaluate embedding models using RAGAS metrics."""
# try:
# # Initialize models
# openai_model = OpenAIEmbeddings(model="text-embedding-3-small")
# base_mpnet = SentenceTransformer("sentence-transformers/all-mpnet-base-v2")
# base_mpnet_wrapper = SentenceTransformerWrapper(base_mpnet)
# fine_tuned_model = SentenceTransformer("Shipmaster1/finetuned_mpnet_matryoshka_mnr")
# fine_tuned_wrapper = SentenceTransformerWrapper(fine_tuned_model)
# # Initialize evaluation metrics
# metrics = [
# faithfulness, # How well answers align with context
# answer_relevancy, # How relevant answers are to questions
# context_recall, # How well context covers required information
# context_precision # How focused and precise the context is
# ]
# # Create evaluation dataset with all required columns
# eval_dataset = Dataset.from_dict({
# "question": dataset["question"],
# "answer": dataset["answer"],
# "context": dataset["context"],
# "retrieved_contexts": [[ctx] for ctx in dataset["context"]], # Each context in its own list
# "reference": dataset["context"] # Using context as reference for recall calculation
# })
# # Evaluate each model and store results
# results = {}
# # OpenAI model evaluation
# openai_eval = evaluate(
# eval_dataset,
# metrics=metrics,
# embeddings=openai_model,
# llm=ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# )
# results["OpenAI"] = {
# "faithfulness": float(openai_eval._repr_dict["faithfulness"]),
# "answer_relevancy": float(openai_eval._repr_dict["answer_relevancy"]),
# "context_recall": float(openai_eval._repr_dict["context_recall"]),
# "context_precision": float(openai_eval._repr_dict["context_precision"])
# }
# # Base MPNet evaluation
# base_mpnet_eval = evaluate(
# eval_dataset,
# metrics=metrics,
# embeddings=base_mpnet_wrapper,
# llm=ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# )
# results["Base MPNet"] = {
# "faithfulness": float(base_mpnet_eval._repr_dict["faithfulness"]),
# "answer_relevancy": float(base_mpnet_eval._repr_dict["answer_relevancy"]),
# "context_recall": float(base_mpnet_eval._repr_dict["context_recall"]),
# "context_precision": float(base_mpnet_eval._repr_dict["context_precision"])
# }
# # Fine-tuned MPNet evaluation
# fine_tuned_eval = evaluate(
# eval_dataset,
# metrics=metrics,
# embeddings=fine_tuned_wrapper,
# llm=ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# )
# results["Fine-tuned MPNet"] = {
# "faithfulness": float(fine_tuned_eval._repr_dict["faithfulness"]),
# "answer_relevancy": float(fine_tuned_eval._repr_dict["answer_relevancy"]),
# "context_recall": float(fine_tuned_eval._repr_dict["context_recall"]),
# "context_precision": float(fine_tuned_eval._repr_dict["context_precision"])
# }
# return results
# except Exception as e:
# print(f"Error evaluating models: {str(e)}")
# return {}
# def create_comparison_plot(results):
# """Create a comparison plot of the evaluation metrics."""
# # Define metrics we're using
# metrics = [
# 'faithfulness',
# 'answer_relevancy',
# 'context_recall',
# 'context_precision'
# ]
# # Extract scores for each model
# models = list(results.keys())
# model_scores = {
# model: [results[model][metric] for metric in metrics]
# for model in models
# }
# fig = go.Figure()
# # Add traces for each model
# colors = {
# "OpenAI": 'rgb(55, 83, 109)',
# "Base MPNet": 'rgb(26, 118, 255)',
# "Fine-tuned MPNet": 'rgb(15, 196, 141)'
# }
# for model in models:
# fig.add_trace(go.Bar(
# name=model,
# x=metrics,
# y=model_scores[model],
# marker_color=colors.get(model, 'rgb(128, 128, 128)')
# ))
# # Update layout
# fig.update_layout(
# title='Model Comparison Metrics',
# xaxis_title='Metrics',
# yaxis_title='Score',
# barmode='group',
# yaxis=dict(range=[0, 1]),
# showlegend=True
# )
# return fig
# def run_ragas_evaluation():
# """Run the complete RAGAS evaluation process."""
# try:
# # Generate synthetic dataset
# dataset = create_synthetic_dataset()
# # Evaluate models
# results = evaluate_models(dataset)
# # Create comparison plot
# plot = create_comparison_plot(results)
# # Format results as markdown
# results_md = """## Model Evaluation Results
# ### Models Being Compared
# - **OpenAI Model**: text-embedding-3-small
# - **Base MPNet**: sentence-transformers/all-mpnet-base-v2
# - **Fine-tuned Model**: Shipmaster1/finetuned_mpnet_matryoshka_mnr
# ### OpenAI Model (text-embedding-3-small)
# """
# for metric in results["OpenAI"]:
# results_md += "- {}: {:.3f}\n".format(metric, results["OpenAI"][metric])
# results_md += "\n### Base MPNet Model (all-mpnet-base-v2)\n"
# for metric in results["Base MPNet"]:
# results_md += "- {}: {:.3f}\n".format(metric, results["Base MPNet"][metric])
# results_md += "\n### Fine-tuned Model (finetuned_mpnet_matryoshka_mnr)\n"
# for metric in results["Fine-tuned MPNet"]:
# results_md += "- {}: {:.3f}\n".format(metric, results["Fine-tuned MPNet"][metric])
# return results_md, plot
# except Exception as e:
# return f"Error during evaluation: {str(e)}", None
def create_ui():
with gr.Blocks(theme='JohnSmith9982/small_and_pretty') as demo:
current_state = gr.State({
"video_url": "",
"transcript": "",
"enhanced": "",
"linkedin_post": "",
"verification": {},
"error": "",
"status": "",
"improvement_plan": {},
"research_context": "{}",
"enhancement_attempts": 0,
"needs_improvement": False
})
gr.Markdown(
"""
# YouTube to LinkedIn Post Converter
Transform your YouTube videos into professional LinkedIn posts with AI content enhancement.
### 🎬 Sample Videos to Try
Copy any of these URLs to test the application:
```
1. Open AI video: https://www.youtube.com/watch?v=LsMxX86mm2Y
Agent will likely find high quality initial content and not improve
2. Financial News: https://www.youtube.com/watch?v=hvP1UNALZ3g
Agent will likely decide to not improve this post
3. Video About AI: https://www.youtube.com/watch?v=Yq0QkCxoTHM
Agent will likely decide to improve this post
```
These videos are chosen to show the application's ability to handle different types of professional content.
"""
)
with gr.Row():
with gr.Column():
video_url = gr.Textbox(
label="YouTube URL",
placeholder="https://www.youtube.com/watch?v=e1GJ5tZePjk",
show_label=True
)
youtube_convert_btn = gr.Button("🚀 Generate from YouTube", variant="primary", size="lg")
status = gr.Textbox(
label="Status",
value="Ready to process...",
interactive=False
)
error = gr.Textbox(
label="Error",
visible=False,
interactive=False
)
with gr.Tabs() as tabs:
with gr.TabItem("📝 Content"):
with gr.Row():
with gr.Column():
transcript = gr.TextArea(
label="📄 Raw Transcript",
interactive=False,
show_copy_button=True,
lines=8
)
with gr.Column():
enhanced = gr.TextArea(
label="✨ Enhanced Content",
interactive=False,
show_copy_button=True,
lines=8
)
with gr.Row():
with gr.Column():
linkedin = gr.TextArea(
label="🔗 LinkedIn Post",
interactive=False,
show_copy_button=True,
lines=6
)
with gr.Row():
with gr.Column():
verification = gr.TextArea(
label="✓ Verification Results",
interactive=False,
lines=4
)
with gr.Row():
with gr.Column():
improvement_plan = gr.TextArea(
label="📋 Improvement Plan",
interactive=False,
show_copy_button=True,
lines=8,
visible=True,
value="Waiting for verification..."
)
with gr.Row():
with gr.Column():
research_results = gr.TextArea(
label="🔍 Research Results",
interactive=False,
show_copy_button=True,
lines=8,
visible=True,
value="Waiting for research..."
)
with gr.Row():
with gr.Column():
improved_linkedin = gr.TextArea(
label="🚀 Improved LinkedIn Post Final",
interactive=False,
show_copy_button=True,
lines=6,
visible=True,
value="Waiting for improvements..."
)
# Loading indicators
with gr.Row(visible=False) as loading_indicators:
transcript_loading = gr.Markdown("🔄 Fetching transcript...")
enhanced_loading = gr.Markdown("🔄 Enhancing content...")
linkedin_loading = gr.Markdown("🔄 Formatting for LinkedIn...")
verify_loading = gr.Markdown("🔄 Verifying content...")
plan_loading = gr.Markdown("🔄 Creating improvement plan...")
research_loading = gr.Markdown("🔄 Researching content...")
improved_loading = gr.Markdown("🔄 Creating improved post...")
with gr.TabItem("ℹ️ Help"):
gr.Markdown(
"""
### How to Use
1. **Input**: Paste a YouTube video URL in the input field
2. **Process**: Click the "Generate Post" button
3. **Wait**: The system will process your video through multiple steps
4. **Review**: Check the generated content in each tab
5. **Copy**: Use the copy button to grab your LinkedIn post
### 🔄 Regeneration Options
- Click 🔄 next to "Enhanced Content" to regenerate from the enhancement stage
- Click 🔄 next to "LinkedIn Post" to regenerate from the formatting stage
### 💡 Tips for Best Results
- Use videos with clear English audio
- Optimal video length: 5-15 minutes
- Ensure videos have accurate captions
- Review and personalize the post before sharing
- Consider your target audience when selecting videos
"""
)
# with gr.TabItem("RAGAS Evaluation"):
# gr.Markdown(
# """
# # RAGAS Model Evaluation
# Compare the performance of three embedding models using synthetic data.
# ### Models Being Evaluated
# - **OpenAI Model**: text-embedding-3-small (Not Free)
# - **Base MPNet**: sentence-transformers/all-mpnet-base-v2 (Open Source)
# - **Fine-tuned Model**: Shipmaster1/finetuned_mpnet_matryoshka_mnr (Free Custom, trained on YouTube transcript handling)
# The evaluation uses GPT-3.5 Turbo to assess the quality of the embeddings through various metrics:
# - Faithfulness: How well the answers align with the provided context
# - Answer Relevancy: How relevant the answers are to the questions
# - Context Recall: How well the model retrieves relevant context
# - Context Precision: How precise the retrieved context is
# Click the run button to find out how well the models perform on the synthetic data.
# """
# )
# with gr.Row():
# evaluate_btn = gr.Button("Run Evaluation", variant="primary", size="lg")
# with gr.Row():
# results_md = gr.Markdown(label="Evaluation Results")
# plot_output = gr.Plot(label="Comparison Plot")
# evaluate_btn.click(
# fn=run_ragas_evaluation,
# outputs=[results_md, plot_output]
# )
def update_loading_state(stage: str):
"""Update loading indicators based on current stage."""
states = {
"transcript": [True, False, False, False, False, False, False],
"enhance": [False, True, False, False, False, False, False],
"format": [False, False, True, False, False, False, False],
"verify": [False, False, False, True, False, False, False],
"plan": [False, False, False, False, True, False, False],
"research": [False, False, False, False, False, True, False],
"improved": [False, False, False, False, False, False, True],
"done": [False, False, False, False, False, False, False]
}
# Loading messages for each stage
loading_messages = {
"transcript": "🔄 Fetching transcript...\n⏳ Please wait...",
"enhance": "✨ Enhancing content...\n⚡ AI is working its magic...",
"format": "🎨 Formatting for LinkedIn...\n📝 Creating engaging post...",
"verify": "🔍 Verifying content...\n⚖️ Checking accuracy...",
"plan": "🔄 Creating improvement plan...",
"research": "🔎 Researching content...\n📚 Finding relevant information...",
"improved": "🚀 Creating improved LinkedIn post...\n✨ Applying enhancements..."
}
# Get current stage message
current_message = loading_messages.get(stage, "")
# Return loading states and message
return [
gr.update(visible=state) for state in states.get(stage, [False] * 7)
], current_message
def process_with_loading(url, state):
"""Process video with loading indicators."""
try:
# Initialize state if needed
if "improvement_plan" not in state:
state["improvement_plan"] = {}
if "research_context" not in state:
state["research_context"] = "{}"
if "enhancement_attempts" not in state:
state["enhancement_attempts"] = 0
if "needs_improvement" not in state:
state["needs_improvement"] = False
# Show loading indicators
loading_states, message = update_loading_state("transcript")
yield [
"", # error
"Processing...", # status
message, # transcript (loading)
"", # enhanced
"", # linkedin
"", # verification
"Waiting for verification...", # improvement plan
"Waiting for research...", # research results
"Waiting for improvements...", # improved linkedin
state, # current_state
*loading_states # loading indicators
]
# Get transcript
state["video_url"] = url
transcript_text = get_transcript(state)["transcript"]
# Show enhancing state
loading_states, message = update_loading_state("enhance")
yield [
"",
"Enhancing content...",
transcript_text,
message, # enhanced (loading)
"",
"",
"",
"",
"",
state,
*loading_states
]
# Enhance content
state["transcript"] = transcript_text
enhanced_state = enhance_content(state)
enhanced_text = enhanced_state["enhanced"]
# Show formatting state
loading_states, message = update_loading_state("format")
yield [
"",
"Formatting for LinkedIn...",
transcript_text,
enhanced_text,
message, # linkedin (loading)
"",
"",
"",
"",
state,
*loading_states
]
# Format LinkedIn post
state["enhanced"] = enhanced_text
linkedin_state = format_linkedin_post(state)
linkedin_text = linkedin_state["linkedin_post"]
# Show verifying state
loading_states, message = update_loading_state("verify")
yield [
"",
"Verifying content...",
transcript_text,
enhanced_text,
linkedin_text,
"🔍 Verifying...\n⚖️ Analyzing accuracy...", # verification (loading)
"",
"",
"",
state,
*loading_states
]
# Verify content
state["linkedin_post"] = linkedin_text
final_state = verify_content(state)
verification_text = format_verification_text(final_state.get("verification", {}))
# Update improvement plan and research results
improvement_plan_text = format_improvement_plan(final_state.get("improvement_plan", {}))
research_results_text = format_research_results(safe_json_loads(final_state.get("research_context", "{}")))
# Check if enhancement is needed
if final_state.get("needs_improvement", False):
# Show planning state
loading_states, message = update_loading_state("plan")
yield [
"",
f"Creating improvement plan (Attempt {final_state.get('enhancement_attempts', 1)}/3)...",
transcript_text,
enhanced_text,
linkedin_text,
verification_text,
improvement_plan_text,
research_results_text,
"",
state,
*loading_states
]
# Show researching state
loading_states, message = update_loading_state("research")
yield [
"",
f"Researching content (Attempt {final_state.get('enhancement_attempts', 1)}/3)...",
transcript_text,
enhanced_text,
linkedin_text,
verification_text,
improvement_plan_text,
research_results_text,
"",
state,
*loading_states
]
# Research content
state = research_content(state)
research_results_text = format_research_results(safe_json_loads(state.get("research_context", "{}")))
# Show enhancing again state
loading_states, message = update_loading_state("enhance")
yield [
"",
f"Enhancing content again (Attempt {final_state.get('enhancement_attempts', 1)}/3)...",
transcript_text,
enhanced_text,
linkedin_text,
verification_text,
improvement_plan_text,
research_results_text,
"",
state,
*loading_states
]
# Enhance again
state = enhance_again(state)
enhanced_text = state["enhanced"]
# Update LinkedIn post
state["enhanced"] = enhanced_text
linkedin_state = format_linkedin_post(state)
linkedin_text = linkedin_state["linkedin_post"]
# Verify again
state["linkedin_post"] = linkedin_text
final_state = verify_content(state)
verification_text = format_verification_text(final_state.get("verification", {}))
improvement_plan_text = format_improvement_plan(final_state.get("improvement_plan", {}))
research_results_text = format_research_results(safe_json_loads(final_state.get("research_context", "{}")))
# After research and enhancement, create improved LinkedIn post
if final_state.get("needs_improvement", False):
# Show improved post loading state
loading_states, message = update_loading_state("improved")
yield [
"",
f"Creating improved LinkedIn post (Attempt {final_state.get('enhancement_attempts', 1)}/3)...",
transcript_text,
enhanced_text,
linkedin_text,
verification_text,
improvement_plan_text,
research_results_text,
message, # improved linkedin (loading)
state,
*loading_states
]
# Create improved LinkedIn post
improved_state = format_linkedin_post(final_state)
improved_text = improved_state["linkedin_post"]
# Update final state
final_state["improved_linkedin"] = improved_text
# Complete
loading_states, _ = update_loading_state("done")
yield [
"",
"✅ Processing complete!",
transcript_text,
enhanced_text,
linkedin_text,
verification_text,
improvement_plan_text,
research_results_text,
final_state.get("improved_linkedin", "No improvements needed"),
final_state,
*loading_states
]
except Exception as e:
loading_states, _ = update_loading_state("done")
yield [
f"⚠️ Error: {str(e)}",
"❌ Processing failed",
state.get("transcript", ""),
state.get("enhanced", ""),
state.get("linkedin_post", ""),
"",
"Error occurred during processing",
"Error occurred during processing",
"Error occurred during processing",
state,
*loading_states
]
# Set up event handlers
youtube_convert_btn.click(
fn=process_with_loading,
inputs=[video_url, current_state],
outputs=[
error,
status,
transcript,
enhanced,
linkedin,
verification,
improvement_plan,
research_results,
improved_linkedin,
current_state,
transcript_loading,
enhanced_loading,
linkedin_loading,
verify_loading,
plan_loading,
research_loading,
improved_loading
],
show_progress=True, # Show progress bar
api_name="convert" # Name the API endpoint
)
# Update error visibility with immediate feedback
error.change(
lambda x: gr.update(visible=bool(x), value=x), # Update both visibility and value
error,
error,
queue=False # Process immediately
)
# Add loading state visibility updates
def update_loading_visibility(is_loading):
return {
loading: gr.update(visible=is_loading)
for loading in [
transcript_loading,
enhanced_loading,
linkedin_loading,
verify_loading,
plan_loading,
research_loading,
improved_loading
]
}
youtube_convert_btn.click(
lambda: update_loading_visibility(True),
None,
[transcript_loading, enhanced_loading, linkedin_loading,
verify_loading, plan_loading, research_loading, improved_loading],
queue=False
)
return demo
def print_graph():
"""Print ASCII representation of the workflow graph."""
print("\nWorkflow Graph Visualization:")
print("-----------------------------")
print("""
Main Workflow with Error Handling and Enhancement:
[YouTube URL] -> [get_transcript] -----> [enhance_content] -> [format_linkedin] -> [verify_content] -> [agent_decide] -> [END]
| | | | | |
| | | | | |
v v v v v v
[Invalid] [Rate Limit] [ERROR] [ERROR] [ERROR] [needs_improvement]
| | | | | |
v v v v v |
[END] [Wait & Retry] [END] [END] [END] |
|
v
[research_content]
|
v
[enhance_again]
|
v
[verify_content]
|
v
[Improvement Loop]
(max 3 attempts)
Error Types:
- Rate Limit: YouTube API quota exceeded
- Invalid: Malformed or unsupported URL
- ERROR: General processing errors
Enhancement Loop:
- Triggered if content needs improvement
- Limited to 3 attempts
- Includes research and verification
""")
print("-----------------------------\n")
# def extract_text_from_webpage(url: str) -> str:
# """Extract main content text from a webpage."""
# try:
# # Use trafilatura for better content extraction
# downloaded = trafilatura.fetch_url(url)
# if downloaded:
# text = trafilatura.extract(downloaded, include_links=False, include_images=False)
# if text:
# return text.strip()
# # Fallback to basic BeautifulSoup extraction
# response = requests.get(url)
# response.raise_for_status()
# soup = BeautifulSoup(response.text, 'html.parser')
# # Remove script and style elements
# for script in soup(["script", "style"]):
# script.decompose()
# text = soup.get_text()
# lines = (line.strip() for line in text.splitlines())
# chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
# text = ' '.join(chunk for chunk in chunks if chunk)
# return text.strip()
# except Exception as e:
# raise Exception(f"Error extracting webpage content: {str(e)}")
def process_youtube_video(video_url: str, model_name: str = "Shipmaster1/finetuned_mpnet_matryoshka_mnr"):
"""Process a YouTube video and store its content in the vector store using LangChain."""
try:
# Get video transcript
transcript = get_transcript({"video_url": video_url})["transcript"]
if not transcript:
return None, "Failed to get video transcript"
# Get video metadata
video_info = {
"video_id": extract_video_id(video_url),
"title": "Untitled Video",
"channel": "Unknown Channel",
"url": video_url,
"timestamp": datetime.now().isoformat()
}
# Create document with metadata
doc = Document(
page_content=transcript,
metadata=video_info
)
# Split text into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100,
length_function=len,
)
chunks = text_splitter.split_documents([doc])
# Store in Chroma using LangChain's abstraction
collection = get_chroma_collection(model_name)
collection.add_documents(chunks)
return doc, "Successfully processed video"
except Exception as e:
return None, f"Error processing video: {str(e)}"
# def process_webpage(url: str, model_name: str = "Shipmaster1/finetuned_mpnet_matryoshka_mnr"):
# """Process a webpage and store its content in the vector store using LangChain."""
# try:
# # Get webpage content
# content = extract_text_from_webpage(url)
# if not content:
# return None, "Failed to extract webpage content"
# # Create document with metadata
# doc = Document(
# page_content=content,
# metadata={
# "url": url,
# "source": "webpage",
# "timestamp": datetime.now().isoformat()
# }
# )
# # Split text into chunks
# text_splitter = RecursiveCharacterTextSplitter(
# chunk_size=1000,
# chunk_overlap=200,
# length_function=len,
# )
# chunks = text_splitter.split_documents([doc])
# # Store in Chroma using LangChain's abstraction
# collection = get_chroma_collection(model_name)
# collection.add_documents(chunks)
# return doc, "Successfully processed webpage"
# except Exception as e:
# return None, f"Error processing webpage: {str(e)}"
def agent_decide(state: ProcessState, progress=gr.Progress()) -> ProcessState:
"""Agent decides whether to enhance content further based on verification score and creates an improvement plan."""
try:
progress(0.95, desc="Analyzing content quality and planning improvements...")
# Get verification score and attempts
score = state.get("verification", {}).get("score", 0)
attempts = state.get("enhancement_attempts", 0)
feedback = state.get("verification", {}).get("feedback", "")
# Initialize LLM for agentic decision making
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert content strategist. Analyze the content quality and create an improvement plan.
Current Content:
{content}
Verification Results:
- Score: {score}
- Feedback: {feedback}
- Previous Attempts: {attempts}
Create a detailed improvement plan in JSON format:
{{
"needs_improvement": boolean,
"improvement_areas": [
{{
"area": string,
"priority": number (1-5),
"strategy": string,
"research_focus": string
}}
],
"research_priorities": [
{{
"topic": string,
"reason": string,
"expected_impact": string
}}
],
"enhancement_strategy": {{
"approach": string,
"key_focus": string,
"expected_improvements": [string]
}}
}}
Consider:
1. Content quality and engagement
2. Information accuracy and completeness
3. Target audience needs
4. Previous enhancement attempts
5. Available research context"""),
("human", "Analyze this content and create an improvement plan.")
])
chain = prompt | llm | StrOutputParser()
plan = json.loads(chain.invoke({
"content": state["enhanced"],
"score": score,
"feedback": feedback,
"attempts": attempts
}))
# Update state with plan
state["verification_score"] = score
state["enhancement_attempts"] = attempts
state["needs_improvement"] = plan["needs_improvement"]
state["improvement_plan"] = plan
# Create detailed status message
if plan["needs_improvement"] and attempts < 3:
status = f"🔄 Planning improvements (Attempt {attempts + 1}/3)\n"
status += "Key focus areas:\n"
for area in plan["improvement_areas"][:2]: # Show top 2 priorities
status += f"• {area['area']} (Priority: {area['priority']})\n"
state["status"] = status
else:
if score >= 0.95:
state["status"] = "✅ Content quality threshold met"
else:
state["status"] = "⚠️ Max enhancement attempts reached"
return state
except Exception as e:
state["error"] = f"⚠️ Error in agent decision: {str(e)}"
state["status"] = "❌ Failed to analyze content"
return state
def research_content(state: ProcessState, progress=gr.Progress()) -> ProcessState:
"""Research additional context based on the improvement plan."""
try:
progress(0.96, desc="Researching based on improvement plan...")
# Get improvement plan
plan = state.get("improvement_plan", {})
if not plan:
raise Exception("No improvement plan found")
# Initialize research results
research_results = {
"similar_content": [],
"focused_research": {},
"verification_feedback": state.get("verification", {}).get("feedback", "")
}
# Get similar content from vector store
collection = get_chroma_collection()
# Research each priority area
for area in plan["improvement_areas"]:
# Search for content related to this area
similar_docs = collection.similarity_search(
f"{area['area']} {area['research_focus']}",
k=2
)
# Process with fine-tuned model
area_features = model_manager.process_text("\n".join([doc.page_content for doc in similar_docs]))
# Store research results
research_results["focused_research"][area["area"]] = {
"content": [doc.page_content for doc in similar_docs],
"features": area_features,
"priority": area["priority"],
"strategy": area["strategy"]
}
# Research specific topics from research_priorities
for topic in plan["research_priorities"]:
topic_docs = collection.similarity_search(
topic["topic"],
k=1
)
if topic_docs:
research_results["similar_content"].extend([doc.page_content for doc in topic_docs])
# Store research results
state["research_context"] = json.dumps(research_results)
state["status"] = "✅ Research completed based on improvement plan"
return state
except Exception as e:
state["error"] = f"⚠️ Error researching content: {str(e)}"
state["status"] = "❌ Failed to research content"
return state
def enhance_again(state: ProcessState, progress=gr.Progress()) -> ProcessState:
"""Enhance content using research and improvement plan."""
try:
progress(0.97, desc="Enhancing content based on research and plan...")
# Get research context and improvement plan
research_context = json.loads(state["research_context"])
plan = state.get("improvement_plan", {})
if not plan:
raise Exception("No improvement plan found")
# Initialize LLM for enhancement
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", """You are an expert content enhancer. Improve the content based on the research and improvement plan while maintaining the original topic and key messages.
Current Content:
{content}
Improvement Plan:
{plan}
Research Results:
{research}
Enhancement Strategy:
{strategy}
Create enhanced content that:
1. Maintains the original topic and key messages
2. Addresses each improvement area according to its priority
3. Incorporates relevant research findings
4. Follows the enhancement strategy
5. Improves engagement and clarity
6. Keeps the same core subject matter and examples
Important:
- DO NOT change the main topic or subject matter
- DO NOT replace specific examples with generic ones
- DO NOT lose the original context or purpose
- DO NOT generate content about a different topic
- DO preserve and enhance the original message"""),
("human", "Enhance this content while maintaining its original topic and key messages.")
])
chain = prompt | llm | StrOutputParser()
enhanced = chain.invoke({
"content": state["enhanced"],
"plan": json.dumps(plan),
"research": json.dumps(research_context),
"strategy": json.dumps(plan["enhancement_strategy"])
})
# Update state
state["enhanced"] = enhanced
state["enhancement_attempts"] = state.get("enhancement_attempts", 0) + 1
state["status"] = f"✅ Content enhanced with research (Attempt {state['enhancement_attempts']}/3)"
return state
except Exception as e:
state["error"] = f"⚠️ Error enhancing content: {str(e)}"
state["status"] = "❌ Failed to enhance content"
return state
if __name__ == "__main__":
print_graph() # Print the graph visualization
demo = create_ui()
demo.queue() # Enable queuing for better handling of concurrent requests
demo.launch(
server_name="0.0.0.0", # Required for Hugging Face Spaces
server_port=7860, # Standard port for Hugging Face Spaces
show_error=True,
share=True, # Enable sharing for development
show_api=False
)