import os import logging from typing import List, Dict, Any from dotenv import load_dotenv from langchain.schema import Document as LangchainDocument from langchain_community.vectorstores import FAISS from langchain_together.chat_models import ChatTogether from langchain_together.embeddings import TogetherEmbeddings import spacy import pandas as pd import json import re # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('fact_checker.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) load_dotenv() logger.info("Environment variables loaded") # ---------- API Key Helper ------------------------------------------------- def get_together_api_key() -> str: """Get Together AI API key from environment variables.""" try: key = os.getenv("TOGETHER_API_KEY") if key: logger.info("Together AI API key found") return key # If not found, raise error error_msg = ( "TOGETHER_API_KEY not found. Please set it in one of these ways:\n" "1. Create a .env file with: TOGETHER_API_KEY=your_key_here\n" "2. Set environment variable: export TOGETHER_API_KEY=your_key_here" ) logger.error(error_msg) raise EnvironmentError(error_msg) except Exception as e: logger.exception("Error retrieving Together AI API key") raise # ======================================================================== # FACT-CHECKING SYSTEM COMPONENTS (OOP Architecture) # ======================================================================== class ClaimExtractor: """ Handles claim and entity extraction using NLP (spaCy). Follows Single Responsibility Principle. """ # Supported entity types for extraction ENTITY_TYPES = ['ORG', 'GPE', 'PERSON', 'DATE', 'EVENT', 'MONEY', 'PERCENT', 'LAW', 'PRODUCT'] def __init__(self, model_name: str = "en_core_web_sm"): """ Initialize the ClaimExtractor with a spaCy model. Args: model_name: Name of the spaCy model to use """ self.model_name = model_name self._nlp = None @property def nlp(self): """Lazy load spaCy model to avoid startup overhead.""" if self._nlp is None: try: logger.info(f"Loading spaCy model: {self.model_name}") self._nlp = spacy.load(self.model_name) logger.info(f"Successfully loaded spaCy model: {self.model_name}") except OSError as e: logger.error(f"spaCy model '{self.model_name}' not found") raise RuntimeError( f"spaCy model '{self.model_name}' not found. " f"Please install it with: python -m spacy download {self.model_name}" ) except Exception as e: logger.exception(f"Unexpected error loading spaCy model: {self.model_name}") raise return self._nlp def extract_entities(self, doc) -> List[Dict[str, Any]]: """ Extract named entities from a spaCy document. Args: doc: spaCy document object Returns: List of entity dictionaries with text, type, and position """ try: entities = [] for ent in doc.ents: if ent.label_ in self.ENTITY_TYPES: entities.append({ 'text': ent.text, 'type': ent.label_, 'start': ent.start_char, 'end': ent.end_char }) logger.debug(f"Extracted {len(entities)} entities") return entities except Exception as e: logger.exception("Error extracting entities") return [] def extract_claims(self, text: str, min_length: int = 10) -> List[Dict[str, Any]]: """ Extract key claims and named entities from input text. Args: text: Input text (e.g., news post, social media statement) min_length: Minimum length for a sentence to be considered a claim Returns: List of claim dictionaries with 'text', 'type', and 'entities' """ try: logger.info(f"Extracting claims from text ({len(text)} chars)") doc = self.nlp(text) entities = self.extract_entities(doc) # Extract sentences as potential claims claims = [] for sent in doc.sents: sent_text = sent.text.strip() if len(sent_text) >= min_length: # Find entities in this sentence sent_entities = [ e for e in entities if e['start'] >= sent.start_char and e['end'] <= sent.end_char ] claims.append({ 'text': sent_text, 'type': 'statement', 'entities': sent_entities }) # If no claims extracted, treat entire text as one claim if not claims: logger.debug("No sentences found, using entire text as claim") claims.append({ 'text': text.strip(), 'type': 'statement', 'entities': entities }) logger.info(f"Extracted {len(claims)} claim(s)") return claims except Exception as e: logger.exception("Error extracting claims") # Return fallback claim return [{ 'text': text.strip(), 'type': 'statement', 'entities': [] }] class FactsDatabase: """ Manages the verified facts database and vector store. Handles loading, embedding, and persistence. """ DEFAULT_CSV_PATH = "verified_facts_db.csv" DEFAULT_INDEX_PATH = "faiss_index_facts" EMBEDDING_MODEL = "BAAI/bge-base-en-v1.5" def __init__(self, api_key: str = None): """ Initialize the FactsDatabase. Args: api_key: Together AI API key (optional, can use get_together_api_key) """ logger.info("Initializing FactsDatabase") self.api_key = api_key or get_together_api_key() try: self.embeddings = TogetherEmbeddings( model=self.EMBEDDING_MODEL, api_key=self.api_key ) logger.info(f"Embeddings initialized with model: {self.EMBEDDING_MODEL}") # Initialize ClaimExtractor for entity extraction from facts self.claim_extractor = ClaimExtractor() logger.info("ClaimExtractor initialized for database entity extraction") except Exception as e: logger.exception("Error initializing embeddings") raise def load_from_csv( self, csv_path: str = None, index_path: str = None ) -> str: """ Load verified facts from CSV and create FAISS vector store. Args: csv_path: Path to verified facts CSV file index_path: Path to save FAISS index Returns: Status message with count of loaded facts """ csv_path = csv_path or self.DEFAULT_CSV_PATH index_path = index_path or self.DEFAULT_INDEX_PATH try: logger.info(f"Loading facts from CSV: {csv_path}") # Read verified facts df = pd.read_csv(csv_path) logger.info(f"Loaded {len(df)} rows from CSV") # Handle different CSV formats if 'fact_text' in df.columns: fact_column = 'fact_text' logger.debug("Using 'fact_text' column") elif 'fact' in df.columns: fact_column = 'fact' logger.debug("Using 'fact' column") else: error_msg = "CSV must contain a 'fact' or 'fact_text' column" logger.error(error_msg) raise ValueError(error_msg) # Create documents with metadata logger.info("Creating documents with metadata") documents = self._create_documents(df, fact_column) logger.info(f"Created {len(documents)} documents") # Create FAISS index logger.info("Creating FAISS vector index...") vector_store = FAISS.from_documents(documents, self.embeddings) logger.info("FAISS index created successfully") # Save to disk logger.info(f"Saving FAISS index to: {index_path}") vector_store.save_local(index_path) logger.info("FAISS index saved successfully") return f"✅ Successfully loaded {len(documents)} verified facts into vector store" except FileNotFoundError: raise FileNotFoundError(f"Verified facts CSV not found at: {csv_path}") except Exception as e: raise RuntimeError(f"Error loading verified facts: {str(e)}") def _create_documents( self, df: pd.DataFrame, fact_column: str ) -> List[LangchainDocument]: """ Create LangChain documents from DataFrame with entity extraction. Args: df: Pandas DataFrame with facts fact_column: Name of the column containing fact text Returns: List of LangChain documents with metadata including extracted entities """ try: documents = [] multi_sentence_count = 0 pronoun_count = 0 for idx, row in df.iterrows(): fact_text = row[fact_column] # Extract fact_id if available if 'fact_id' in df.columns: fact_id = row['fact_id'] else: fact_id = f"F{idx:03d}" # DATA VALIDATION: Check for multi-sentence facts sentences = fact_text.split('.') if len([s for s in sentences if s.strip()]) > 1: multi_sentence_count += 1 logger.warning( f"Fact {fact_id} contains multiple sentences ({len(sentences)} sentences). " f"Consider splitting for better retrieval: {fact_text[:80]}..." ) # DATA VALIDATION: Check for unresolved pronouns pronouns = ['he ', 'she ', 'it ', 'they ', 'them ', 'his ', 'her ', 'their '] if any(pronoun in fact_text.lower() for pronoun in pronouns): pronoun_count += 1 logger.warning( f"Fact {fact_id} contains pronouns - may cause coreference issues: {fact_text[:80]}..." ) # ENTITY EXTRACTION: Extract entities from fact text entities = [] entities_dict = {} try: claims = self.claim_extractor.extract_claims(fact_text) if claims and len(claims) > 0: entities = claims[0].get('entities', []) # Convert entities list to dict for easier access entities_dict = { 'organizations': [e['text'] for e in entities if e['type'] in ['ORG', 'ORGANIZATION']], 'locations': [e['text'] for e in entities if e['type'] in ['GPE', 'LOC', 'LOCATION']], 'persons': [e['text'] for e in entities if e['type'] in ['PERSON', 'PER']], 'dates': [e['text'] for e in entities if e['type'] == 'DATE'], 'percentages': [e['text'] for e in entities if e['type'] in ['PERCENT', 'PERCENTAGE']], 'money': [e['text'] for e in entities if e['type'] in ['MONEY', 'CURRENCY']], 'all_entities': [e['text'] for e in entities] } logger.debug(f"Fact {fact_id}: Extracted {len(entities)} entities") except Exception as e: logger.warning(f"Failed to extract entities from fact {fact_id}: {str(e)}") # Create metadata with entities metadata = { 'source': row.get('source', 'Verified Database'), 'date': row.get('date', 'N/A'), 'category': row.get('category', 'General'), 'fact_id': fact_id, 'entities': entities, # Full entity list with types 'entities_dict': entities_dict # Organized by type for easy filtering } # Create LangChain document with metadata doc = LangchainDocument( page_content=fact_text, metadata=metadata ) documents.append(doc) # Summary logging logger.info(f"Created {len(documents)} documents from DataFrame") if multi_sentence_count > 0: logger.warning( f"⚠️ {multi_sentence_count}/{len(documents)} facts contain multiple sentences. " f"Consider atomic splitting for better granularity." ) if pronoun_count > 0: logger.warning( f"⚠️ {pronoun_count}/{len(documents)} facts contain pronouns. " f"Consider coreference resolution." ) # Log entity extraction statistics total_entities = sum(len(doc.metadata.get('entities', [])) for doc in documents) avg_entities = total_entities / len(documents) if documents else 0 logger.info( f"Entity extraction complete: {total_entities} total entities " f"({avg_entities:.1f} avg per fact)" ) return documents except Exception as e: logger.exception("Error creating documents from DataFrame") raise class FactRetriever: """ Retrieves similar facts from the vector store using semantic search. Implements retrieval strategies and similarity scoring. """ DEFAULT_INDEX_PATH = "faiss_index_facts" EMBEDDING_MODEL = "BAAI/bge-base-en-v1.5" def __init__(self, api_key: str = None, index_path: str = None): """ Initialize the FactRetriever. Args: api_key: Together AI API key index_path: Path to FAISS index """ self.api_key = api_key or get_together_api_key() self.index_path = index_path or self.DEFAULT_INDEX_PATH logger.info(f"Initializing FactRetriever with index path: {self.index_path}") try: self.embeddings = TogetherEmbeddings( model=self.EMBEDDING_MODEL, api_key=self.api_key ) logger.info(f"Embeddings model initialized: {self.EMBEDDING_MODEL}") except Exception as e: logger.exception("Error initializing embeddings model") raise self._vector_store = None @property def vector_store(self): """Lazy load vector store to avoid unnecessary I/O.""" if self._vector_store is None: try: logger.info(f"Loading FAISS index from: {self.index_path}") self._vector_store = FAISS.load_local( self.index_path, self.embeddings, allow_dangerous_deserialization=True ) logger.info("FAISS index loaded successfully") except FileNotFoundError: error_msg = f"FAISS index not found at: {self.index_path}. Please initialize the database first." logger.error(error_msg) raise FileNotFoundError(error_msg) except Exception as e: logger.exception("Error loading FAISS index") raise RuntimeError(f"Error loading FAISS index: {str(e)}") return self._vector_store def retrieve( self, claim: str, top_k: int = 3, similarity_threshold: float = 0.0 ) -> List[Dict[str, Any]]: """ Retrieve most similar verified facts for a given claim. Args: claim: The claim text to verify top_k: Number of similar facts to retrieve similarity_threshold: Minimum similarity score (0-1) Returns: List of dictionaries with 'fact', 'metadata', and 'similarity' """ try: logger.info(f"Retrieving top-{top_k} facts for claim: {claim[:100]}...") # Perform similarity search with scores docs_with_scores = self.vector_store.similarity_search_with_score( claim, k=top_k ) logger.debug(f"Retrieved {len(docs_with_scores)} documents from FAISS") # Format and filter results similar_facts = [] for doc, score in docs_with_scores: # FAISS returns distance, convert to similarity similarity = self._normalize_similarity(score) if similarity >= similarity_threshold: similar_facts.append({ 'fact': doc.page_content, 'metadata': doc.metadata, 'similarity': round(similarity, 3) }) logger.debug(f"Fact similarity: {similarity:.3f} - {doc.page_content[:50]}...") logger.info(f"Filtered to {len(similar_facts)} facts above threshold {similarity_threshold}") return similar_facts except Exception as e: logger.exception("Error retrieving similar facts") raise RuntimeError(f"Error retrieving similar facts: {str(e)}") @staticmethod def _normalize_similarity(distance: float) -> float: """ Convert FAISS distance to similarity score (0-1 range). Args: distance: FAISS distance score (lower = more similar) Returns: Normalized similarity score """ return 1 / (1 + distance) class ClaimClassifier: """ Uses LLM to classify claims as True/False/Unverifiable. Handles prompt engineering and response parsing. """ LLM_MODEL = "meta-llama/Meta-Llama-3.1-8B-Instruct-Turbo" TEMPERATURE = 0.3 # Verdict constants VERDICT_TRUE = "Likely True" VERDICT_FALSE = "Likely False" VERDICT_UNVERIFIABLE = "Unverifiable" def __init__(self, api_key: str = None): """ Initialize the ClaimClassifier. Args: api_key: Together AI API key """ self.api_key = api_key or get_together_api_key() logger.info(f"Initializing ClaimClassifier with model: {self.LLM_MODEL}") try: self.llm = ChatTogether( model=self.LLM_MODEL, temperature=self.TEMPERATURE, api_key=self.api_key ) logger.info(f"LLM initialized successfully (temperature={self.TEMPERATURE})") except Exception as e: logger.exception("Error initializing LLM") raise def classify( self, claim: str, retrieved_facts: List[Dict[str, Any]] ) -> Dict[str, Any]: """ Classify a claim against retrieved facts using LLM. Args: claim: The original claim to verify retrieved_facts: List of similar facts with metadata Returns: Dictionary with 'verdict', 'confidence', 'reasoning', 'evidence_used' """ logger.info(f"Classifying claim with {len(retrieved_facts)} retrieved facts") # Build prompt with evidence prompt = self._build_prompt(claim, retrieved_facts) logger.debug(f"Built prompt with {len(prompt)} characters") try: # Get LLM response logger.info("Invoking LLM for claim classification") response = self.llm.invoke([{"role": "user", "content": prompt}]) response_text = response.content.strip() logger.debug(f"LLM response received ({len(response_text)} chars)") # Parse JSON response result = self._parse_response(response_text) logger.info(f"Classification result: {result['verdict']} (confidence: {result['confidence']})") # Add retrieved facts as evidence details result['evidence_details'] = retrieved_facts return result except json.JSONDecodeError as e: logger.error(f"JSON parsing failed: {str(e)}") return self._fallback_response(retrieved_facts, "JSON parsing failed") except Exception as e: logger.exception("Error during claim classification") return self._fallback_response(retrieved_facts, str(e)) def _build_prompt( self, claim: str, retrieved_facts: List[Dict[str, Any]] ) -> str: """ Build the classification prompt for the LLM. Args: claim: The claim to verify retrieved_facts: Retrieved evidence Returns: Formatted prompt string """ # Format evidence evidence_text = self._format_evidence(retrieved_facts) # Construct prompt prompt = f"""You are a fact-checking assistant. Your task is to verify the following claim against verified evidence. CLAIM TO VERIFY: "{claim}" VERIFIED EVIDENCE FROM DATABASE: {evidence_text} INSTRUCTIONS: 1. Compare the claim against the verified evidence carefully 2. Classify the claim as one of: - "{self.VERDICT_TRUE}" - if evidence strongly supports the claim - "{self.VERDICT_FALSE}" - if evidence contradicts the claim - "{self.VERDICT_UNVERIFIABLE}" - if insufficient or conflicting evidence 3. Provide your analysis in EXACTLY this JSON format (no additional text): {{ "verdict": "{self.VERDICT_TRUE}" | "{self.VERDICT_FALSE}" | "{self.VERDICT_UNVERIFIABLE}", "confidence": "high" | "medium" | "low", "reasoning": "Explain your decision in 2-3 sentences", "evidence_used": ["fact 1", "fact 2"] }} IMPORTANT: - Be objective and base your verdict only on the evidence provided - If the evidence is vague or irrelevant, mark as "{self.VERDICT_UNVERIFIABLE}" - Consider dates, entities, and specific details when comparing - Return ONLY the JSON object, no other text YOUR RESPONSE:""" return prompt def _format_evidence(self, retrieved_facts: List[Dict[str, Any]]) -> str: """ Format retrieved facts for the prompt. Args: retrieved_facts: List of facts with metadata Returns: Formatted evidence string """ if not retrieved_facts: return "No similar verified facts found in the database." evidence_lines = [] for i, fact in enumerate(retrieved_facts, 1): lines = [ f"Evidence {i}:", f"{fact['fact']}", f"Source: {fact['metadata'].get('source', 'Unknown')}", f"Date: {fact['metadata'].get('date', 'Unknown')}", f"Similarity: {fact['similarity']:.2f}" ] evidence_lines.append("\n".join(lines)) return "\n\n".join(evidence_lines) def _parse_response(self, response_text: str) -> Dict[str, Any]: """ Parse LLM JSON response. Args: response_text: Raw LLM response Returns: Parsed result dictionary """ try: # Try to extract JSON if LLM added extra text json_match = re.search(r'\{.*\}', response_text, re.DOTALL) if json_match: response_text = json_match.group(0) logger.debug("Extracted JSON from LLM response") result = json.loads(response_text) logger.debug("Successfully parsed JSON response") # Validate required fields required_fields = ['verdict', 'confidence', 'reasoning', 'evidence_used'] missing_fields = [field for field in required_fields if field not in result] if missing_fields: logger.warning(f"Missing fields in LLM response: {missing_fields}") for field in missing_fields: result[field] = "Unknown" if field != 'evidence_used' else [] return result except Exception as e: logger.exception("Error parsing LLM response") raise def _fallback_response( self, retrieved_facts: List[Dict[str, Any]], error_msg: str ) -> Dict[str, Any]: """ Create fallback response on error. Args: retrieved_facts: Retrieved evidence error_msg: Error message Returns: Fallback response dictionary """ logger.warning(f"Creating fallback response due to: {error_msg}") return { 'verdict': self.VERDICT_UNVERIFIABLE, 'confidence': 'low', 'reasoning': f'Error during fact-checking: {error_msg}', 'evidence_used': [], 'evidence_details': retrieved_facts, 'error': error_msg } class FactChecker: """ Main orchestrator for the fact-checking pipeline. Coordinates ClaimExtractor, FactRetriever, and ClaimClassifier. Follows Facade pattern to provide simple interface. """ def __init__(self, api_key: str = None): """ Initialize the FactChecker with all required components. Args: api_key: Together AI API key """ logger.info("Initializing FactChecker pipeline") self.api_key = api_key or get_together_api_key() try: # Initialize components (Dependency Injection) logger.debug("Initializing ClaimExtractor") self.claim_extractor = ClaimExtractor() logger.debug("Initializing FactRetriever") self.fact_retriever = FactRetriever(api_key=self.api_key) logger.debug("Initializing ClaimClassifier") self.claim_classifier = ClaimClassifier(api_key=self.api_key) logger.info("FactChecker initialization complete") except Exception as e: logger.exception("Error initializing FactChecker") raise def check_claim(self, user_claim: str, top_k: int = 3) -> Dict[str, Any]: """ Main fact-checking pipeline that orchestrates the entire process. Args: user_claim: User's input claim/statement to verify top_k: Number of similar facts to retrieve Returns: Complete fact-check result with verdict, evidence, and reasoning """ logger.info("=" * 60) logger.info(f"Starting fact-check pipeline for claim: {user_claim[:100]}...") logger.info("=" * 60) try: # Step 1: Extract claims from input logger.info("Step 1: Extracting claims from input") claims = self.claim_extractor.extract_claims(user_claim) # For simplicity, fact-check the first/main claim main_claim = claims[0]['text'] if claims else user_claim logger.info(f"Main claim identified: {main_claim[:100]}...") # Step 2: Retrieve similar facts logger.info(f"Step 2: Retrieving top-{top_k} similar facts") similar_facts = self.fact_retriever.retrieve(main_claim, top_k=top_k) logger.info(f"Retrieved {len(similar_facts)} similar facts") # Step 3: Classify using LLM logger.info("Step 3: Classifying claim using LLM") result = self.claim_classifier.classify(main_claim, similar_facts) # Step 4: Add metadata logger.info("Step 4: Adding metadata to result") result['original_input'] = user_claim result['extracted_claim'] = main_claim result['entities_found'] = claims[0].get('entities', []) if claims else [] result['total_claims_extracted'] = len(claims) logger.info(f"Fact-check complete: {result['verdict']}") logger.info("=" * 60) return result except Exception as e: logger.exception("Error in fact-checking pipeline") logger.info("=" * 60) return self._error_response(user_claim, str(e)) def _error_response(self, user_claim: str, error_msg: str) -> Dict[str, Any]: """ Create error response when pipeline fails. Args: user_claim: Original user claim error_msg: Error message Returns: Error response dictionary """ logger.error(f"Creating error response for claim: {error_msg}") return { 'verdict': 'Unverifiable', 'confidence': 'low', 'reasoning': f'Error during fact-checking pipeline: {error_msg}', 'evidence_used': [], 'evidence_details': [], 'original_input': user_claim, 'extracted_claim': user_claim, 'entities_found': [], 'error': error_msg } # ======================================================================== # LEGACY FUNCTION WRAPPERS (for backward compatibility) # ======================================================================== def load_verified_facts(csv_path: str = "verified_facts_db.csv") -> str: """ Legacy wrapper for backward compatibility. Uses FactsDatabase class internally. Args: csv_path: Path to verified facts CSV file Returns: Status message """ db = FactsDatabase() return db.load_from_csv(csv_path) def retrieve_similar_facts( claim: str, top_k: int = 3, similarity_threshold: float = 0.0 ) -> List[Dict[str, Any]]: """ Legacy wrapper for backward compatibility. Uses FactRetriever class internally. Args: claim: The claim text to verify top_k: Number of similar facts to retrieve similarity_threshold: Minimum similarity score (0-1) Returns: List of dictionaries with 'fact', 'metadata', and 'similarity' """ retriever = FactRetriever() return retriever.retrieve(claim, top_k, similarity_threshold) def classify_claim(claim: str, retrieved_facts: List[Dict[str, Any]]) -> Dict[str, Any]: """ Legacy wrapper for backward compatibility. Uses ClaimClassifier class internally. Args: claim: The original claim to verify retrieved_facts: List of similar facts with metadata Returns: Dictionary with 'verdict', 'confidence', 'reasoning', 'evidence_used' """ classifier = ClaimClassifier() return classifier.classify(claim, retrieved_facts) def fact_check_claim(user_claim: str, top_k: int = 3) -> Dict[str, Any]: """ Legacy wrapper for backward compatibility. Uses FactChecker class internally. Args: user_claim: User's input claim/statement to verify top_k: Number of similar facts to retrieve Returns: Complete fact-check result with verdict, evidence, and reasoning """ checker = FactChecker() return checker.check_claim(user_claim, top_k)