Spaces:
No application file
No application file
| import streamlit as st | |
| import os | |
| import requests | |
| import hashlib | |
| from typing import List, Dict, Any | |
| from datetime import datetime | |
| import json | |
| import re | |
| from urllib.parse import quote | |
| import time | |
| import random | |
| import functools | |
| # Import required libraries | |
| from crewai import Agent, Task, Crew, Process | |
| from crewai.tools import BaseTool | |
| import nltk | |
| from textstat import flesch_reading_ease, flesch_kincaid_grade | |
| from bs4 import BeautifulSoup | |
| import concurrent.futures | |
| from duckduckgo_search import DDGS | |
| # Import Ollama and LangChain components | |
| from langchain_community.chat_models import ChatOllama | |
| from langchain_core.prompts import ChatPromptTemplate | |
| from langchain_core.output_parsers import StrOutputParser | |
| # Download NLTK data | |
| try: | |
| nltk.download('punkt', quiet=True) | |
| nltk.download('stopwords', quiet=True) | |
| nltk.download('wordnet', quiet=True) | |
| except: | |
| pass | |
| # Custom Tools for CrewAI | |
| class WebSearchTool(BaseTool): | |
| name: str = "web_search" | |
| description: str = "Search the web for content to check plagiarism" | |
| def _run(self, query: str) -> str: | |
| """Search the web using DuckDuckGo with rate limiting""" | |
| try: | |
| # Add delay to avoid overwhelming the search API | |
| time.sleep(1) | |
| with DDGS() as ddgs: | |
| results = list(ddgs.text(query, max_results=5)) # Reduced from 10 to 5 | |
| search_results = [] | |
| for result in results: | |
| search_results.append({ | |
| 'title': result.get('title', ''), | |
| 'body': result.get('body', ''), | |
| 'url': result.get('href', '') | |
| }) | |
| return json.dumps(search_results) | |
| except Exception as e: | |
| return f"Search failed: {str(e)}" | |
| class TextAnalysisTool(BaseTool): | |
| name: str = "text_analysis" | |
| description: str = "Analyze text for readability and quality metrics" | |
| def _run(self, text: str) -> str: | |
| """Analyze text quality""" | |
| try: | |
| # Calculate readability scores | |
| flesch_score = flesch_reading_ease(text) | |
| fk_grade = flesch_kincaid_grade(text) | |
| # Word count and sentence analysis | |
| words = text.split() | |
| sentences = text.split('.') | |
| analysis = { | |
| 'word_count': len(words), | |
| 'sentence_count': len(sentences), | |
| 'avg_words_per_sentence': len(words) / max(len(sentences), 1), | |
| 'flesch_reading_ease': flesch_score, | |
| 'flesch_kincaid_grade': fk_grade, | |
| 'readability_level': self._get_readability_level(flesch_score) | |
| } | |
| return json.dumps(analysis) | |
| except Exception as e: | |
| return f"Analysis failed: {str(e)}" | |
| def _get_readability_level(self, score): | |
| if score >= 90: return "Very Easy" | |
| elif score >= 80: return "Easy" | |
| elif score >= 70: return "Fairly Easy" | |
| elif score >= 60: return "Standard" | |
| elif score >= 50: return "Fairly Difficult" | |
| elif score >= 30: return "Difficult" | |
| else: return "Very Difficult" | |
| class PlagiarismChecker(BaseTool): | |
| name: str = "plagiarism_checker" | |
| description: str = "Check text for potential plagiarism by comparing with web content" | |
| def _run(self, text: str, search_results: str) -> str: | |
| """Check for plagiarism by comparing text with search results""" | |
| try: | |
| results = json.loads(search_results) | |
| text_sentences = [s.strip() for s in text.split('.') if s.strip()] | |
| plagiarism_results = [] | |
| total_sentences = len(text_sentences) | |
| flagged_sentences = 0 | |
| for sentence in text_sentences: | |
| if len(sentence.split()) < 5: # Skip very short sentences | |
| continue | |
| similarity_found = False | |
| for result in results: | |
| content = result.get('body', '') + ' ' + result.get('title', '') | |
| # Simple similarity check | |
| if self._calculate_similarity(sentence, content) > 0.7: | |
| similarity_found = True | |
| flagged_sentences += 1 | |
| plagiarism_results.append({ | |
| 'sentence': sentence, | |
| 'source': result.get('url', 'Unknown'), | |
| 'similarity_score': self._calculate_similarity(sentence, content) | |
| }) | |
| break | |
| plagiarism_score = (flagged_sentences / max(total_sentences, 1)) * 100 | |
| return json.dumps({ | |
| 'plagiarism_score': plagiarism_score, | |
| 'total_sentences': total_sentences, | |
| 'flagged_sentences': flagged_sentences, | |
| 'flagged_content': plagiarism_results[:3] # Return top 3 matches | |
| }) | |
| except Exception as e: | |
| return f"Plagiarism check failed: {str(e)}" | |
| def _calculate_similarity(self, text1: str, text2: str) -> float: | |
| """Calculate basic similarity between two texts""" | |
| words1 = set(text1.lower().split()) | |
| words2 = set(text2.lower().split()) | |
| if not words1 or not words2: | |
| return 0.0 | |
| intersection = words1.intersection(words2) | |
| union = words1.union(words2) | |
| return len(intersection) / len(union) if union else 0.0 | |
| # Rate limit handling decorator (can be kept for other potential API calls, though not strictly needed for local Ollama) | |
| def rate_limit_handler(max_retries=5, base_delay=2, max_delay=60): | |
| def decorator(func): | |
| def wrapper(*args, **kwargs): | |
| for attempt in range(max_retries): | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| error_message = str(e).lower() | |
| if "rate_limit" in error_message or "429" in error_message: | |
| if attempt < max_retries - 1: | |
| delay = min(max_delay, base_delay * (2 ** attempt) + random.uniform(0, 1)) | |
| st.warning(f"Rate limit hit. Retrying in {delay:.1f} seconds... (Attempt {attempt + 1}/{max_retries})") | |
| time.sleep(delay) | |
| else: | |
| st.error(f"Max retries reached for rate limit: {e}") | |
| raise e | |
| else: | |
| raise e | |
| return None | |
| return wrapper | |
| return decorator | |
| # Custom LLM class for CrewAI with Ollama | |
| # Removed GroqLLM and replaced with direct ChatOllama usage | |
| # Simplified agents for better token management | |
| def create_agents(llm): | |
| """Create specialized agents for different tasks""" | |
| # Combined Analysis Agent (combines plagiarism and analysis) | |
| analysis_agent = Agent( | |
| role="Content Analyzer", | |
| goal="Analyze text for plagiarism and quality metrics", | |
| backstory="You are an expert in content analysis and plagiarism detection.", | |
| tools=[WebSearchTool(), PlagiarismChecker(), TextAnalysisTool()], | |
| verbose=True, | |
| allow_delegation=False, | |
| llm=llm | |
| ) | |
| # Paraphrasing Agent | |
| paraphrasing_agent = Agent( | |
| role="Content Rewriter", | |
| goal="Rewrite text to be original while maintaining meaning", | |
| backstory="You are an expert writer who creates original content.", | |
| verbose=True, | |
| allow_delegation=False, | |
| llm=llm | |
| ) | |
| return analysis_agent, paraphrasing_agent | |
| def create_tasks(input_text, agents): | |
| """Create simplified tasks for the agents""" | |
| analysis_agent, paraphrasing_agent = agents | |
| # Truncate input text if too long | |
| if len(input_text.split()) > 350: | |
| words = input_text.split() | |
| input_text = ' '.join(words[:350]) + "..." | |
| # Task 1: Combined Analysis | |
| analysis_task = Task( | |
| description=f""" | |
| Analyze this text briefly: | |
| Text: {input_text} | |
| Provide: | |
| 1. Basic plagiarism check | |
| 2. Readability score | |
| 3. Word count | |
| Keep response under 200 words. | |
| """, | |
| agent=analysis_agent, | |
| expected_output="Brief analysis with plagiarism score and readability metrics" | |
| ) | |
| # Task 2: Paraphrasing | |
| paraphrasing_task = Task( | |
| description=f""" | |
| Rewrite this text to be original: | |
| Original: {input_text} | |
| Requirements: | |
| 1. Maintain meaning | |
| 2. Use different words | |
| 3. Keep it clear and readable | |
| Provide only the rewritten text. | |
| """, | |
| agent=paraphrasing_agent, | |
| expected_output="Paraphrased text that maintains original meaning", | |
| dependencies=[analysis_task] | |
| ) | |
| return [analysis_task, paraphrasing_task] | |
| def run_crew_analysis(input_text, selected_model): | |
| """Run the simplified CrewAI analysis""" | |
| try: | |
| # Initialize LLM with Ollama | |
| # Ensure Ollama server is running and the model is pulled (e.g., ollama run llama2) | |
| llm = ChatOllama(model=selected_model) | |
| # Create agents | |
| agents = create_agents(llm) | |
| # Create tasks | |
| tasks = create_tasks(input_text, agents) | |
| # Create crew | |
| crew = Crew( | |
| agents=list(agents), | |
| tasks=tasks, | |
| process=Process.sequential, | |
| verbose=True | |
| ) | |
| # Execute the crew with progress tracking | |
| with st.spinner("Analyzing text with AI agents..."): | |
| result = crew.kickoff() | |
| return result | |
| except Exception as e: | |
| st.error(f"Error in crew analysis: {str(e)}") | |
| return None | |
| # Streamlit UI | |
| def main(): | |
| st.set_page_config( | |
| page_title="AI Paraphrasing & Plagiarism Checker", | |
| page_icon="π€", | |
| layout="wide" | |
| ) | |
| st.title("π€ AI-Powered Paraphrasing & Plagiarism Checker") | |
| st.markdown("*Built with CrewAI Multi-Agent Framework and Ollama (Local LLM)*") | |
| # Sidebar for configuration | |
| with st.sidebar: | |
| st.header("π§ Configuration") | |
| # Removed Groq API Key input | |
| # Model selection for Ollama | |
| st.markdown("**Ollama Setup:**\n\n1. Download and install Ollama from [ollama.ai](https://ollama.ai/).\n2. Run `ollama run <model_name>` in your terminal (e.g., `ollama run llama2` or `ollama run mistral`).\n3. Ensure the Ollama server is running before using this app.") | |
| model_options = [ | |
| "llama2", # A good general-purpose model | |
| "mistral", # Another strong contender | |
| "phi3", # Smaller, faster model for local use | |
| # Add other Ollama models as needed | |
| ] | |
| selected_model = st.selectbox( | |
| "Select Ollama Model", | |
| model_options, | |
| index=0, # Default to llama2 | |
| help="Choose an Ollama model you have pulled locally." | |
| ) | |
| st.markdown("---") | |
| st.markdown("### π Features") | |
| st.markdown("- Smart plagiarism detection") | |
| st.markdown("- Intelligent paraphrasing") | |
| st.markdown("- Readability analysis") | |
| st.markdown("- Local LLM support (Ollama)") | |
| # Main content area | |
| col1, col2 = st.columns([1, 1]) | |
| with col1: | |
| st.header("π Input Text") | |
| # Text length warning | |
| st.info("π‘ For best results, keep text under 400 words") | |
| # Text input | |
| input_text = st.text_area( | |
| "Enter text to analyze and paraphrase:", | |
| height=300, | |
| placeholder="Paste your text here (max 400 words recommended)..." | |
| ) | |
| # Show word count | |
| if input_text: | |
| word_count = len(input_text.split()) | |
| if word_count > 400: | |
| st.warning(f"β οΈ Text has {word_count} words. Consider shortening for optimal results.") | |
| else: | |
| st.success(f"β Text has {word_count} words!") | |
| # Analysis button | |
| if st.button("π Analyze & Paraphrase", type="primary", use_container_width=True): | |
| if not input_text.strip(): | |
| st.error("Please enter some text to analyze!") | |
| else: | |
| # Run analysis with selected Ollama model | |
| result = run_crew_analysis(input_text, selected_model) | |
| if result: | |
| st.session_state.analysis_result = result | |
| st.session_state.original_text = input_text | |
| st.success("β Analysis completed!") | |
| with col2: | |
| st.header("π Analysis Results") | |
| if "analysis_result" in st.session_state: | |
| result = st.session_state.analysis_result | |
| # Display results in tabs | |
| tab1, tab2 = st.tabs(["π Paraphrased Text", "π Analysis"]) | |
| with tab1: | |
| st.subheader("π Paraphrased Text") | |
| # Display paraphrased text | |
| paraphrased_text = str(result) | |
| st.text_area( | |
| "Paraphrased version:", | |
| value=paraphrased_text, | |
| height=300, | |
| help="This is the AI-generated paraphrased version" | |
| ) | |
| # Download button | |
| st.download_button( | |
| label="π₯ Download Paraphrased Text", | |
| data=paraphrased_text, | |
| file_name="paraphrased_text.txt", | |
| mime="text/plain" | |
| ) | |
| with tab2: | |
| st.subheader("π Analysis Summary") | |
| # Display quick stats | |
| original_words = len(st.session_state.original_text.split()) | |
| paraphrased_words = len(str(result).split()) | |
| col_a, col_b = st.columns(2) | |
| with col_a: | |
| st.metric("Original Words", original_words) | |
| st.metric("Processing Status", "β Complete") | |
| with col_b: | |
| st.metric("Paraphrased Words", paraphrased_words) | |
| st.metric("Model Used", selected_model) | |
| # Simple comparison chart | |
| st.bar_chart({ | |
| "Original": [original_words], | |
| "Paraphrased": [paraphrased_words] | |
| }) | |
| else: | |
| st.info("π Enter text and click 'Analyze & Paraphrase' to see results") | |
| if __name__ == "__main__": | |
| main() | |