Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel | |
| from transformers import AutoModelForSeq2SeqLM, AutoTokenizer, pipeline | |
| import difflib | |
| import spacy | |
| import re | |
| from nltk.sentiment import SentimentIntensityAnalyzer | |
| import nltk | |
| from collections import Counter | |
| import uvicorn | |
| import os | |
| import torch | |
| # Download NLTK resources | |
| try: | |
| nltk.download('vader_lexicon', quiet=True) | |
| nltk.download('punkt', quiet=True) | |
| nltk.download('stopwords', quiet=True) | |
| except: | |
| print("Could not download NLTK resources. Some features may be limited.") | |
| app = FastAPI() | |
| # Configure CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Allows all origins | |
| allow_credentials=True, | |
| allow_methods=["*"], # Allows all methods | |
| allow_headers=["*"], # Allows all headers | |
| ) | |
| # Global variable for the pipeline | |
| humanize_pipe = None | |
| # Load NLP models | |
| try: | |
| # Load spaCy model | |
| nlp = spacy.load("en_core_web_sm") | |
| # Initialize sentiment analyzer | |
| sentiment_analyzer = SentimentIntensityAnalyzer() | |
| print("NLP models loaded successfully!") | |
| except Exception as e: | |
| print(f"Error loading NLP models: {e}") | |
| # Create fallback functions if models fail to load | |
| def mock_function(text): | |
| return "Model could not be loaded. This is a fallback response." | |
| def get_humanize_pipeline(): | |
| """ | |
| Lazy-load the humanization pipeline on first use. | |
| Uses standard settings that don't require accelerate. | |
| """ | |
| global humanize_pipe | |
| if humanize_pipe is None: | |
| try: | |
| print("Loading the humanizer model on CPU...") | |
| # Force CPU usage | |
| device = torch.device("cpu") | |
| # Load model with basic settings (no accelerate needed) | |
| model = AutoModelForSeq2SeqLM.from_pretrained( | |
| "danibor/flan-t5-base-humanizer", | |
| torch_dtype=torch.float32 # Use float32 instead of float16 for CPU | |
| ) | |
| tokenizer = AutoTokenizer.from_pretrained("danibor/flan-t5-base-humanizer") | |
| # Create pipeline with basic settings | |
| humanize_pipe = pipeline( | |
| "text2text-generation", | |
| model=model, | |
| tokenizer=tokenizer, | |
| device=device # Explicitly specify CPU | |
| ) | |
| print("Humanizer model loaded successfully!") | |
| return humanize_pipe | |
| except Exception as e: | |
| print(f"Error loading humanizer model: {e}") | |
| # Create a simple pipeline-like function that just returns the input | |
| def simple_pipeline(text, **kwargs): | |
| return [{"generated_text": f"Could not process: {text} (Model failed to load)"}] | |
| humanize_pipe = simple_pipeline | |
| return humanize_pipe | |
| return humanize_pipe | |
| # Define request models | |
| class TextRequest(BaseModel): | |
| text: str | |
| class HumanizeResponse(BaseModel): | |
| original_text: str | |
| humanized_text: str | |
| diff: list | |
| original_word_count: int | |
| humanized_word_count: int | |
| nlp_analysis: dict | |
| class AnalyzeResponse(BaseModel): | |
| text: str | |
| word_count: int | |
| sentiment: dict | |
| entities: dict | |
| key_phrases: list | |
| readability: dict | |
| complexity: dict | |
| async def humanize_text(request: TextRequest): | |
| input_text = request.text | |
| try: | |
| # Get or initialize the pipeline | |
| pipeline = get_humanize_pipeline() | |
| # Generate humanized text with basic settings | |
| result = pipeline( | |
| input_text, | |
| max_length=min(500, len(input_text) * 2), # Limit max length | |
| do_sample=True | |
| ) | |
| humanized_text = result[0]['generated_text'] | |
| # Get the differences | |
| diff = get_diff(input_text, humanized_text) | |
| # Process both texts with NLP | |
| nlp_analysis = perform_nlp_analysis(input_text, humanized_text) | |
| return { | |
| 'original_text': input_text, | |
| 'humanized_text': humanized_text, | |
| 'diff': diff, | |
| 'original_word_count': len(input_text.split()), | |
| 'humanized_word_count': len(humanized_text.split()), | |
| 'nlp_analysis': nlp_analysis | |
| } | |
| except Exception as e: | |
| print(f"Error in humanize endpoint: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error processing text: {str(e)}") | |
| def get_diff(text1, text2): | |
| """ | |
| Generate a list of changes between two texts. | |
| Returns a list of tuples (operation, text) | |
| where operation is '+' for addition, '-' for deletion, or ' ' for unchanged. | |
| """ | |
| d = difflib.Differ() | |
| diff = list(d.compare(text1.split(), text2.split())) | |
| result = [] | |
| for item in diff: | |
| operation = item[0] | |
| if operation in ['+', '-', ' ']: | |
| text = item[2:] | |
| result.append({'operation': operation, 'text': text}) | |
| return result | |
| def perform_nlp_analysis(original_text, humanized_text): | |
| """ | |
| Perform comprehensive NLP analysis on both original and humanized text. | |
| """ | |
| result = {} | |
| # Process both texts with spaCy | |
| original_doc = nlp(original_text) | |
| humanized_doc = nlp(humanized_text) | |
| # Sentiment analysis | |
| original_sentiment = sentiment_analyzer.polarity_scores(original_text) | |
| humanized_sentiment = sentiment_analyzer.polarity_scores(humanized_text) | |
| # Extract named entities | |
| original_entities = extract_entities(original_doc) | |
| humanized_entities = extract_entities(humanized_doc) | |
| # Extract key phrases using noun chunks | |
| original_phrases = extract_key_phrases(original_doc) | |
| humanized_phrases = extract_key_phrases(humanized_doc) | |
| # Readability metrics | |
| original_readability = calculate_readability(original_text) | |
| humanized_readability = calculate_readability(humanized_text) | |
| # Complexity metrics | |
| original_complexity = analyze_complexity(original_doc) | |
| humanized_complexity = analyze_complexity(humanized_doc) | |
| # Compile all results | |
| result = { | |
| 'original': { | |
| 'sentiment': original_sentiment, | |
| 'entities': original_entities, | |
| 'key_phrases': original_phrases, | |
| 'readability': original_readability, | |
| 'complexity': original_complexity | |
| }, | |
| 'humanized': { | |
| 'sentiment': humanized_sentiment, | |
| 'entities': humanized_entities, | |
| 'key_phrases': humanized_phrases, | |
| 'readability': humanized_readability, | |
| 'complexity': humanized_complexity | |
| } | |
| } | |
| return result | |
| def extract_entities(doc): | |
| """Extract and categorize named entities from a spaCy document.""" | |
| entities = {} | |
| for ent in doc.ents: | |
| if ent.label_ not in entities: | |
| entities[ent.label_] = [] | |
| if ent.text not in entities[ent.label_]: | |
| entities[ent.label_].append(ent.text) | |
| return entities | |
| def extract_key_phrases(doc): | |
| """Extract key phrases using noun chunks.""" | |
| return [chunk.text for chunk in doc.noun_chunks][:10] # Limit to top 10 | |
| def calculate_readability(text): | |
| """Calculate basic readability metrics.""" | |
| # Count sentences | |
| sentences = len(list(nltk.sent_tokenize(text))) | |
| if sentences == 0: | |
| sentences = 1 # Avoid division by zero | |
| # Count words | |
| words = len(text.split()) | |
| if words == 0: | |
| words = 1 # Avoid division by zero | |
| # Average words per sentence | |
| avg_words_per_sentence = words / sentences | |
| # Count syllables (simplified approach) | |
| syllables = count_syllables(text) | |
| # Calculate Flesch Reading Ease | |
| flesch = 206.835 - 1.015 * (words / sentences) - 84.6 * (syllables / words) | |
| return { | |
| 'sentence_count': sentences, | |
| 'word_count': words, | |
| 'avg_words_per_sentence': round(avg_words_per_sentence, 2), | |
| 'syllable_count': syllables, | |
| 'flesch_reading_ease': round(flesch, 2) | |
| } | |
| def count_syllables(text): | |
| """Count syllables in text (simplified approach).""" | |
| # This is a simplified syllable counter | |
| text = text.lower() | |
| text = re.sub(r'[^a-zA-Z]', ' ', text) | |
| words = text.split() | |
| count = 0 | |
| for word in words: | |
| word = word.strip() | |
| if not word: | |
| continue | |
| # Count vowel groups as syllables | |
| if word[-1] == 'e': | |
| word = word[:-1] | |
| vowel_count = len(re.findall(r'[aeiouy]+', word)) | |
| if vowel_count == 0: | |
| vowel_count = 1 | |
| count += vowel_count | |
| return count | |
| def analyze_complexity(doc): | |
| """Analyze text complexity using POS tags and dependency parsing.""" | |
| # Count POS tags | |
| pos_counts = Counter([token.pos_ for token in doc]) | |
| # Calculate lexical diversity | |
| total_tokens = len(doc) | |
| unique_tokens = len(set([token.text.lower() for token in doc])) | |
| lexical_diversity = unique_tokens / total_tokens if total_tokens > 0 else 0 | |
| # Count dependency relationship types | |
| dep_counts = Counter([token.dep_ for token in doc]) | |
| return { | |
| 'pos_distribution': dict(pos_counts), | |
| 'lexical_diversity': round(lexical_diversity, 4), | |
| 'dependency_types': dict(dep_counts) | |
| } | |
| async def analyze_text(request: TextRequest): | |
| """Endpoint to just analyze text without humanizing it.""" | |
| input_text = request.text | |
| try: | |
| # Process text with NLP | |
| doc = nlp(input_text) | |
| # Analyze text | |
| sentiment = sentiment_analyzer.polarity_scores(input_text) | |
| entities = extract_entities(doc) | |
| key_phrases = extract_key_phrases(doc) | |
| readability = calculate_readability(input_text) | |
| complexity = analyze_complexity(doc) | |
| return { | |
| 'text': input_text, | |
| 'word_count': len(input_text.split()), | |
| 'sentiment': sentiment, | |
| 'entities': entities, | |
| 'key_phrases': key_phrases, | |
| 'readability': readability, | |
| 'complexity': complexity | |
| } | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Error analyzing text: {str(e)}") | |
| # Add a root endpoint for Hugging Face Spaces health check | |
| async def root(): | |
| return {"message": "Text Analysis and Humanization API is running!"} | |
| # For local development | |
| if __name__ == "__main__": | |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=True) |