|
|
from pathlib import Path |
|
|
import logging |
|
|
import json |
|
|
from transformers import pipeline |
|
|
from textblob import TextBlob |
|
|
import spacy |
|
|
import re |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class ProcessingError(Exception): |
|
|
"""Exception raised when ad processing fails.""" |
|
|
pass |
|
|
|
|
|
class AIPipeline: |
|
|
def __init__(self): |
|
|
"""Initialize the AI pipeline with necessary models.""" |
|
|
try: |
|
|
|
|
|
self.nlp = spacy.load('en_core_web_sm') |
|
|
|
|
|
|
|
|
self.sentiment = pipeline('sentiment-analysis', model='distilbert-base-uncased-finetuned-sst-2-english') |
|
|
|
|
|
logger.info("AI Pipeline initialized successfully") |
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing AI Pipeline: {e}") |
|
|
raise |
|
|
|
|
|
def _analyze_sentiment(self, text: str) -> float: |
|
|
"""Analyze sentiment of text and return a score between -1 and 1.""" |
|
|
try: |
|
|
|
|
|
result = self.sentiment(text)[0] |
|
|
|
|
|
|
|
|
if result['label'] == 'POSITIVE': |
|
|
score = result['score'] |
|
|
else: |
|
|
score = -result['score'] |
|
|
|
|
|
|
|
|
blob = TextBlob(text) |
|
|
blob_score = blob.sentiment.polarity |
|
|
|
|
|
|
|
|
final_score = (score + blob_score) / 2 |
|
|
|
|
|
return final_score |
|
|
except Exception as e: |
|
|
logger.error(f"Error in sentiment analysis: {e}") |
|
|
return 0.0 |
|
|
|
|
|
def _extract_topics(self, text: str) -> list: |
|
|
"""Extract main topics from text.""" |
|
|
try: |
|
|
doc = self.nlp(text) |
|
|
|
|
|
|
|
|
noun_phrases = [chunk.text.lower() for chunk in doc.noun_chunks] |
|
|
|
|
|
|
|
|
entities = [ent.text.lower() for ent in doc.ents |
|
|
if ent.label_ in ['ORG', 'PRODUCT', 'EVENT', 'WORK_OF_ART']] |
|
|
|
|
|
|
|
|
all_topics = noun_phrases + entities |
|
|
|
|
|
|
|
|
cleaned_topics = [] |
|
|
for topic in all_topics: |
|
|
|
|
|
topic = re.sub(r'[^\w\s]', '', topic) |
|
|
topic = ' '.join(topic.split()) |
|
|
|
|
|
|
|
|
if len(topic) > 3 and topic not in ['the', 'this', 'that', 'these', 'those']: |
|
|
cleaned_topics.append(topic) |
|
|
|
|
|
|
|
|
unique_topics = list(set(cleaned_topics)) |
|
|
return sorted(unique_topics)[:5] |
|
|
except Exception as e: |
|
|
logger.error(f"Error in topic extraction: {e}") |
|
|
return [] |
|
|
|
|
|
def _extract_entities(self, text: str) -> list: |
|
|
"""Extract named entities from text.""" |
|
|
try: |
|
|
doc = self.nlp(text) |
|
|
|
|
|
entities = [] |
|
|
for ent in doc.ents: |
|
|
entity = { |
|
|
'text': ent.text, |
|
|
'type': ent.label_, |
|
|
'description': spacy.explain(ent.label_) |
|
|
} |
|
|
entities.append(entity) |
|
|
|
|
|
return entities |
|
|
except Exception as e: |
|
|
logger.error(f"Error in entity extraction: {e}") |
|
|
return [] |
|
|
|
|
|
def process_ad(self, ad) -> dict: |
|
|
"""Process an ad and return analysis results.""" |
|
|
try: |
|
|
|
|
|
if not hasattr(ad, 'content') or not ad.content: |
|
|
return { |
|
|
'sentiment': 0.0, |
|
|
'topics': [], |
|
|
'entities': [] |
|
|
} |
|
|
|
|
|
|
|
|
sentiment = self._analyze_sentiment(ad.content) |
|
|
|
|
|
|
|
|
topics = self._extract_topics(ad.content) |
|
|
|
|
|
|
|
|
entities = self._extract_entities(ad.content) |
|
|
|
|
|
return { |
|
|
'sentiment': sentiment, |
|
|
'topics': topics, |
|
|
'entities': entities |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error in ad processing: {e}") |
|
|
return { |
|
|
'sentiment': 0.0, |
|
|
'topics': [], |
|
|
'entities': [] |
|
|
} |