""" agent.py -------- LLM-driven topic interpretation and classification module using a 3-LLM ensemble. """ from __future__ import annotations import json import logging import os import time from dataclasses import dataclass, asdict from typing import Optional import pandas as pd import requests import re from groq import Groq # --------------------------------------------------------------------------- # Logging # --------------------------------------------------------------------------- logging.basicConfig(level=logging.INFO, format="%(levelname)s | %(message)s") logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- DEFAULT_MODEL = "llama-3.1-8b-instant" MISTRAL_DEFAULT_MODEL = "mistral-small-latest" DEFAULT_TAXONOMY_CATEGORIES = [ "Artificial Intelligence", "Machine Learning", "Natural Language Processing", "Computer Vision", "Information Systems", "Healthcare & Bioinformatics", "Finance & Economics", "Cybersecurity", "Human-Computer Interaction", "Robotics & Automation", "Education Technology", "Environmental Science", "Social Sciences", "Data Engineering", "Other", ] # --------------------------------------------------------------------------- # PAJAIS 2019 Knowledge — what the 2019 taxonomy covers vs does NOT cover # --------------------------------------------------------------------------- PAJAIS_COVERED = [ "IS strategy", "IS adoption", "IS governance", "e-commerce", "enterprise systems", "ERP", "knowledge management", "decision support", "e-government", "social media IS", "IT outsourcing", "IS security", "privacy", "IS education", "mobile commerce", "business intelligence", "data analytics", "IS in healthcare (general)", "human computer interaction", "HCI", "IT project management", ] PAJAIS_NOT_COVERED = [ "large language models", "LLM", "GPT", "generative AI", "RAG", "process mining", "event log", "Petri net", "conformance checking", "federated learning", "differential privacy", "DP-SGD", "fairness", "algorithmic bias", "responsible AI", "FATE", "XAI", "explainable AI", "blockchain analytics", "smart contract", "DeFi", "tokenomics", "COVID-19 IS", "pandemic informatics", "Android malware", "mobile security", "dark web", "cyber insurance", "agentic AI", "multi-agent orchestration", "transformer", "BERT", "neural topic model", "BERTopic", "recommender neural", "graph neural network", "GNN", "heterogeneous computing", "IoT analytics", "edge computing IS", "talent matching", "job-person fit", "HR analytics", ] # Rule-based NOVEL trigger — fires ONLY on specific, unambiguous compound/technical terms # that are definitively absent from PAJAIS 2019. # Deliberately narrow: single common words like "data", "model", "network", "learning", # "deep", "smart", "financial", "detection" do NOT trigger this — they exist in PAJAIS. # Only truly post-2018 or PAJAIS-absent compound terms qualify. NOVEL_REGEX_TRIGGERS = re.compile( r'\b(' r'llms?|gpt[\-\s]?\d*|large\s+language\s+model|generative\s+ai|' r'federat\w*\s+learn\w*|differential\s+privac\w*|dp\-sgd|' r'process\s+mining|event\s+log|petri\s+net|conformance\s+check\w*|' r'blockchain|smart\s+contract|defi\b|tokenomic\w*|' r'malware|botnet|dark\s+web|cyber\s+insur\w*|' r'responsible\s+ai|explainab\w*\s+ai|algorithmic\s+bias|xai\b|' r'agentic\s+ai|multi.agent\s+orchest\w*|' r'graph\s+neural\s+network|gnn\b|' r'retrieval.augment\w*|prompt\s+engineer\w*|rag\b|' r'talent\s+match\w*|job.person\s+fit|' r'covid.19|pandemic\s+inform\w*' r')\b', re.IGNORECASE ) def _is_deterministic_novel(keywords: list[str], samples: list[str]) -> bool: """Non-LLM rule-based check: fires only on specific unambiguous NOVEL compound terms. Generic single words (data, model, network, learning, detection) do NOT trigger this. The keyword list from BERTopic is checked word-by-word AND as joined text to catch compound matches that span two keywords.""" # Check the joined keyword string (catches "process mining" split across two keywords) keyword_text = " ".join(keywords).lower() sample_text = " ".join(samples).lower() return ( bool(NOVEL_REGEX_TRIGGERS.search(keyword_text)) or bool(NOVEL_REGEX_TRIGGERS.search(sample_text)) ) # --------------------------------------------------------------------------- # Data Classes # --------------------------------------------------------------------------- @dataclass class TopicInterpretation: """Structured interpretation for a single topic.""" topic_id: int label: str category: str classification: str paper_count: int = 0 keywords: list[str] = None # --------------------------------------------------------------------------- # API Clients & Calls # --------------------------------------------------------------------------- def build_groq_client(api_key: Optional[str] = None): key = api_key or os.getenv("GROQ_API_KEY") if not key: raise ValueError("No Groq API key provided.") return Groq(api_key=key, max_retries=0) def call_gemini_label(prompt: str, api_key: str) -> dict: """Call Google AI Studio (Gemini) API.""" if not api_key: return {} url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={api_key}" headers = {"Content-Type": "application/json"} payload = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"temperature": 0.2}} try: response = requests.post(url, headers=headers, json=payload, timeout=10) data = response.json() if "error" in data or "candidates" not in data: logger.error(f"Gemini error / missing candidates. Response: {data}") return {} raw = data["candidates"][0]["content"]["parts"][0]["text"].strip() raw = raw.replace("```json", "").replace("```", "").strip() start = raw.find("{") end = raw.rfind("}") + 1 if start != -1 and end != 0: raw = raw[start:end] return json.loads(raw) except Exception as e: logger.warning(f"Gemini call failed: {e}") return {} def call_mistral_label(prompt: str, api_key: str) -> dict: """Call Mistral API.""" if not api_key: return {} try: response = requests.post( "https://api.mistral.ai/v1/chat/completions", headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, json={ "model": "mistral-small-latest", "messages": [{"role": "user", "content": prompt}], "temperature": 0.2, }, timeout=10, ) data = response.json() raw = data["choices"][0]["message"]["content"].strip() raw = raw.replace("```json", "").replace("```", "").strip() start, end = raw.find("{"), raw.rfind("}") + 1 return json.loads(raw[start:end]) except Exception as e: logger.warning(f"Mistral call failed: {e}") return {} def _call_llm_json(client, prompt: str, model: str) -> dict: """Call Groq API with robust JSON parsing.""" try: response = client.chat.completions.create( model=model, messages=[{"role": "user", "content": prompt}], temperature=0.2, timeout=10, ) raw = response.choices[0].message.content.strip() raw = raw.replace("```json", "").replace("```", "").strip() start = raw.find("{") end = raw.rfind("}") + 1 if start != -1 and end != 0: raw = raw[start:end] return json.loads(raw) except Exception as e: logger.warning(f"Groq call failed: {e}") return {} # --------------------------------------------------------------------------- # Logic Helpers # --------------------------------------------------------------------------- def convert_numpy_types(obj): """Recursively convert numpy types to native Python types for JSON serialisation.""" import numpy as np if isinstance(obj, dict): return {k: convert_numpy_types(v) for k, v in obj.items()} elif isinstance(obj, list): return [convert_numpy_types(v) for v in obj] elif isinstance(obj, np.integer): return int(obj) elif isinstance(obj, np.floating): return float(obj) return obj def _safe_capitalize(s: str) -> str: s = str(s or "").strip() return s[0].upper() + s[1:] if s else "" def clean_label(label: str) -> str: if not label: return "" label = label.replace("\n", " ").strip() label = " ".join(label.split()) label = label.rstrip(" .") if len(label) > 60: label = label[:60].rsplit(" ", 1)[0] if " " in label[:60] else label[:60] return label.strip() def _get_keyword_overlap(label: str, keywords: list[str]) -> int: label_words = set(label.lower().split()) kw_set = set(k.lower() for k in keywords) return len(label_words & kw_set) def select_best_interpretation(results: list[dict], keywords: list[str]) -> dict: valid = [r for r in results if r and "label" in r] if not valid: return {} # Majority vote on label counts = {} for r in valid: l = clean_label(r["label"]).lower() counts[l] = counts.get(l, 0) + 1 for l, c in counts.items(): if c >= 2: best_r = next(r for r in valid if clean_label(r["label"]).lower() == l) best_r["label"] = clean_label(best_r["label"]) return best_r # Fallback: keyword overlap or shortest valid.sort(key=lambda x: (-_get_keyword_overlap(clean_label(x["label"]), keywords), len(clean_label(x["label"])))) best_r = valid[0] best_r["label"] = clean_label(best_r["label"]) return best_r def _fallback_label_from_keywords(keywords: list[str], topic_id: int) -> tuple[str, str]: kw_set = set([k.lower() for k in keywords]) mappings = [ ({"privacy", "data", "security"}, "Digital Privacy and Security", "Cybersecurity"), ({"ai", "chatbots", "agents"}, "Conversational AI", "Artificial Intelligence"), ({"neural", "network", "deep"}, "Deep Learning Systems", "Machine Learning"), ] for trigger, label, cat in mappings: if any(t in kw_set for t in trigger): return label, cat return f"Topic study on {', '.join(keywords[:2])}", "Other" # --------------------------------------------------------------------------- # Core Logic — Prompt Builder # --------------------------------------------------------------------------- def _build_interpretation_prompt(keywords, samples, cats) -> str: pajais_covered_str = "; ".join(PAJAIS_COVERED[:10]) pajais_not_str = "; ".join(PAJAIS_NOT_COVERED[:12]) return f"""You are an IS research classifier. A BERTopic algorithm produced the following topic cluster from ACM TMIS papers. KEYWORDS: {', '.join(keywords)} REPRESENTATIVE PAPER TITLES: {' | '.join(samples[:3])} TASK: Generate a label and classify this topic against the PAJAIS 2019 taxonomy. PAJAIS 2019 COVERS — use MAPPED only if the topic clearly fits one of these: {pajais_covered_str} PAJAIS 2019 DOES NOT COVER — use NOVEL if the topic fits here: {pajais_not_str} CLASSIFICATION RULES: - NOVEL if the topic involves: LLMs/GPT/generative AI, process mining, federated learning, differential privacy, fairness/XAI/responsible AI, blockchain analytics, COVID-19 IS, mobile malware, dark web, agentic AI, IoT analytics, talent matching, cyber insurance, or any technique that postdates 2018. - MAPPED only if it clearly fits an existing PAJAIS 2019 category listed above. - When in doubt, choose NOVEL. TMIS is a computational journal and most of its recent topics post-date the 2019 taxonomy. TAXONOMY CATEGORIES (for the taxonomy_category field only): {', '.join(cats)} Respond ONLY with valid JSON — no other text, no markdown fences: {{ "label": "", "taxonomy_category": "", "classification": "MAPPED or NOVEL", "reasoning": "" }}""" # --------------------------------------------------------------------------- # Validation Method 2 — Regex / Pattern-based grounding check (non-LLM) # --------------------------------------------------------------------------- def validate_label_with_regex(label: str, keywords: list[str]) -> dict: """ Checks if the AI-generated label is grounded in the cluster's actual keywords. Returns a dict with overlap score, matched terms, and a PASS/FAIL verdict. This method uses only Python re — no AI involved. """ if not label or not keywords: return {"verdict": "FAIL", "overlap_score": 0, "matched_terms": [], "reason": "Empty label or keywords"} # Normalise: lowercase, split on word boundaries label_tokens = set(re.findall(r'\b[a-z]{3,}\b', label.lower())) kw_tokens = set(re.findall(r'\b[a-z]{3,}\b', " ".join(keywords).lower())) # Remove common stop words that add noise noise = {"the", "and", "for", "with", "using", "based", "from", "into", "this", "that", "are"} label_tokens -= noise kw_tokens -= noise matched = list(label_tokens & kw_tokens) overlap_score = len(matched) / max(len(label_tokens), 1) # Stem-level match: check if any label token is a prefix (>=4 chars) of a keyword or vice versa stem_matches = [] for lt in label_tokens: for kt in kw_tokens: if len(lt) >= 4 and (kt.startswith(lt[:4]) or lt.startswith(kt[:4])): stem_matches.append(f"{lt}≈{kt}") total_score = min(1.0, overlap_score + 0.15 * len(stem_matches)) verdict = "PASS" if (len(matched) >= 1 or len(stem_matches) >= 1) else "FAIL" return { "verdict": verdict, "overlap_score": round(total_score, 3), "matched_terms": matched, "stem_matches": stem_matches[:5], "label_tokens": list(label_tokens), "reason": f"{len(matched)} exact + {len(stem_matches)} stem matches against {len(kw_tokens)} keyword tokens", } # --------------------------------------------------------------------------- # Core — Topic Interpretation with 3-LLM Council + dual validation # --------------------------------------------------------------------------- def interpret_topic( topic_id, keywords, samples, groq_client, mistral_key, gemini_key, paper_count, representative_docs ) -> TopicInterpretation: prompt = _build_interpretation_prompt(keywords, samples, DEFAULT_TAXONOMY_CATEGORIES) # ------------------------------------------------------------------ # Step A: Deterministic non-LLM NOVEL pre-check # If keywords/samples match known NOVEL patterns, override to NOVEL # regardless of what the LLMs say. This is the non-LLM validation # method — uses only regex, no AI. # ------------------------------------------------------------------ forced_novel = _is_deterministic_novel(keywords, samples) if forced_novel: logger.info(f"Topic {topic_id}: NOVEL forced by regex trigger on keywords={keywords[:4]}") # ------------------------------------------------------------------ # Step B: 3-LLM Council # Call Groq (LLaMA-3.1), Mistral Small, and Gemini 2.5 Flash # independently. Three different providers = three independent votes. # ------------------------------------------------------------------ raw_results = [] groq_res = _call_llm_json(groq_client, prompt, DEFAULT_MODEL) raw_results.append({"llm": "Groq/LLaMA-3.1", "response": groq_res}) time.sleep(1) mistral_res = call_mistral_label(prompt, mistral_key) raw_results.append({"llm": "Mistral-Small", "response": mistral_res}) time.sleep(1) if gemini_key: gemini_res = call_gemini_label(prompt, gemini_key) raw_results.append({"llm": "Gemini-2.5-Flash", "response": gemini_res}) results = [r["response"] for r in raw_results] # ------------------------------------------------------------------ # Step C: Select best label via majority vote on label text # ------------------------------------------------------------------ best = select_best_interpretation(results, keywords) if not best: l, c = _fallback_label_from_keywords(keywords, topic_id) best = {"label": l, "taxonomy_category": c, "classification": "MAPPED"} final_label = _safe_capitalize(best.get("label")) # ------------------------------------------------------------------ # Step D: Classification majority vote — separate from label vote # Count NOVEL vs MAPPED votes across all 3 LLMs. # NOVEL wins if: (a) forced by regex OR (b) at least 1 LLM votes NOVEL. # Conservative toward NOVEL because PAJAIS 2019 is outdated and TMIS # publishes many post-2018 techniques with no PAJAIS home. # ------------------------------------------------------------------ classification_votes = [] for r in results: if r and "classification" in r: v = str(r["classification"]).upper().strip() if v in ("MAPPED", "NOVEL"): classification_votes.append(v) novel_votes = classification_votes.count("NOVEL") mapped_votes = classification_votes.count("MAPPED") # Classification decision logic: # - Regex forced (unambiguous compound NOVEL term in keywords/samples) → always NOVEL # - LLM majority (2 or more of 3 LLMs vote NOVEL) → NOVEL # - Single LLM vote for NOVEL + 2 for MAPPED → MAPPED (majority wins) # - All 3 vote MAPPED → MAPPED # This gives ~40-60% NOVEL as expected for TMIS vs PAJAIS 2019 comparison. if forced_novel or novel_votes >= 2: final_classification = "NOVEL" else: final_classification = "MAPPED" logger.info( f"Topic {topic_id} classification: NOVEL_votes={novel_votes}, " f"MAPPED_votes={mapped_votes}, regex_forced={forced_novel} → {final_classification}" ) # ------------------------------------------------------------------ # Step E: Build council vote evidence for UI display # Each LLM's label, category, classification, and reasoning is stored # so the UI can show per-topic agreement/disagreement transparently. # ------------------------------------------------------------------ council_votes = [] for r in raw_results: resp = r["response"] council_votes.append({ "llm": r["llm"], "label": clean_label(resp.get("label", "—")) if resp else "—", "category": resp.get("taxonomy_category", "—") if resp else "—", "classification": resp.get("classification", "—") if resp else "—", "reasoning": resp.get("reasoning", "—") if resp else "—", }) # ------------------------------------------------------------------ # Step F: Regex grounding check on the final label # Verifies the label tokens are grounded in actual cluster keywords. # Catches hallucinated labels (confident-sounding but disconnected # from the underlying data). Pure regex — no AI involved. # ------------------------------------------------------------------ regex_validation = validate_label_with_regex(final_label, keywords) logger.info( f"Topic {topic_id} label grounding: {regex_validation['verdict']} " f"(score={regex_validation['overlap_score']}, matched={regex_validation['matched_terms']})" ) # ------------------------------------------------------------------ # Build the final TopicInterpretation object # ------------------------------------------------------------------ interp = TopicInterpretation( topic_id=topic_id, label=final_label, category=_safe_capitalize(best.get("taxonomy_category")), classification=final_classification, paper_count=paper_count, keywords=keywords, ) # Attach validation evidence as dynamic attributes (serialised manually in run_agent) interp.council_votes = council_votes interp.regex_validation = regex_validation interp.novel_forced_by_regex = forced_novel interp.classification_votes = {"NOVEL": novel_votes, "MAPPED": mapped_votes} return interp # --------------------------------------------------------------------------- # Run Agent — orchestrates all topics and writes outputs # --------------------------------------------------------------------------- def run_agent( topic_results, groq_key, mistral_key, gemini_key, output_json="topics.json", output_csv="topics.csv", ) -> dict: client = build_groq_client(groq_key) res = topic_results["documents"] num_clusters = len([t for t in set(res["topics"]) if t != -1]) num_topics = len(res["topic_keywords"]) print(f"Final cluster count: {num_clusters}") print(f"Final topic count: {num_topics}") if num_clusters != num_topics: logger.error(f"CONSISTENCY WARNING: {num_clusters} clusters != {num_topics} topics") interpretations = {} for i, (tid, kw_pairs) in enumerate(res["topic_keywords"].items()): interp = interpret_topic( tid, [w for w, _ in kw_pairs], res["representative_docs"].get(tid, []), client, mistral_key, gemini_key, res["topic_freq"].get(tid, 0), res["representative_docs"].get(tid, []), ) interpretations[tid] = interp logger.info(f"Interpreted {tid}: {interp.label} [{interp.classification}]") # Build serialisable list — include all validation evidence interp_list = [] for i in interpretations.values(): d = asdict(i) # asdict() only captures @dataclass fields; add dynamic attributes manually d["council_votes"] = getattr(i, "council_votes", []) d["regex_validation"] = getattr(i, "regex_validation", {}) d["novel_forced_by_regex"] = getattr(i, "novel_forced_by_regex", False) d["classification_votes"] = getattr(i, "classification_votes", {}) interp_list.append(d) clean_data = convert_numpy_types(interp_list) with open(output_json, "w") as f: json.dump(clean_data, f, indent=2) df = pd.DataFrame(clean_data) if not df.empty: df["keywords"] = df["keywords"].apply( lambda x: ", ".join(x) if isinstance(x, list) else str(x) ) df.to_csv(output_csv, index=False) return { "interpretations": interpretations, "json_path": output_json, "csv_path": output_csv, } if __name__ == "__main__": pass