| """ |
| agent.py |
| -------- |
| LLM-driven topic interpretation and classification module using a 3-LLM ensemble. |
| """ |
|
|
| from __future__ import annotations |
| import json |
| import logging |
| import os |
| import time |
| from dataclasses import dataclass, asdict |
| from typing import Optional |
| import pandas as pd |
| import requests |
| import re |
| from groq import Groq |
|
|
| |
| |
| |
| logging.basicConfig(level=logging.INFO, format="%(levelname)s | %(message)s") |
| logger = logging.getLogger(__name__) |
|
|
| |
| |
| |
| DEFAULT_MODEL = "llama-3.1-8b-instant" |
| MISTRAL_DEFAULT_MODEL = "mistral-small-latest" |
| DEFAULT_TAXONOMY_CATEGORIES = [ |
| "Artificial Intelligence", "Machine Learning", "Natural Language Processing", |
| "Computer Vision", "Information Systems", "Healthcare & Bioinformatics", |
| "Finance & Economics", "Cybersecurity", "Human-Computer Interaction", |
| "Robotics & Automation", "Education Technology", "Environmental Science", |
| "Social Sciences", "Data Engineering", "Other", |
| ] |
|
|
| |
| |
| |
| PAJAIS_COVERED = [ |
| "IS strategy", "IS adoption", "IS governance", "e-commerce", "enterprise systems", |
| "ERP", "knowledge management", "decision support", "e-government", "social media IS", |
| "IT outsourcing", "IS security", "privacy", "IS education", "mobile commerce", |
| "business intelligence", "data analytics", "IS in healthcare (general)", |
| "human computer interaction", "HCI", "IT project management", |
| ] |
|
|
| PAJAIS_NOT_COVERED = [ |
| "large language models", "LLM", "GPT", "generative AI", "RAG", |
| "process mining", "event log", "Petri net", "conformance checking", |
| "federated learning", "differential privacy", "DP-SGD", |
| "fairness", "algorithmic bias", "responsible AI", "FATE", "XAI", "explainable AI", |
| "blockchain analytics", "smart contract", "DeFi", "tokenomics", |
| "COVID-19 IS", "pandemic informatics", |
| "Android malware", "mobile security", "dark web", "cyber insurance", |
| "agentic AI", "multi-agent orchestration", |
| "transformer", "BERT", "neural topic model", "BERTopic", |
| "recommender neural", "graph neural network", "GNN", |
| "heterogeneous computing", "IoT analytics", "edge computing IS", |
| "talent matching", "job-person fit", "HR analytics", |
| ] |
|
|
| |
| |
| |
| |
| |
| NOVEL_REGEX_TRIGGERS = re.compile( |
| r'\b(' |
| r'llms?|gpt[\-\s]?\d*|large\s+language\s+model|generative\s+ai|' |
| r'federat\w*\s+learn\w*|differential\s+privac\w*|dp\-sgd|' |
| r'process\s+mining|event\s+log|petri\s+net|conformance\s+check\w*|' |
| r'blockchain|smart\s+contract|defi\b|tokenomic\w*|' |
| r'malware|botnet|dark\s+web|cyber\s+insur\w*|' |
| r'responsible\s+ai|explainab\w*\s+ai|algorithmic\s+bias|xai\b|' |
| r'agentic\s+ai|multi.agent\s+orchest\w*|' |
| r'graph\s+neural\s+network|gnn\b|' |
| r'retrieval.augment\w*|prompt\s+engineer\w*|rag\b|' |
| r'talent\s+match\w*|job.person\s+fit|' |
| r'covid.19|pandemic\s+inform\w*' |
| r')\b', |
| re.IGNORECASE |
| ) |
|
|
| def _is_deterministic_novel(keywords: list[str], samples: list[str]) -> bool: |
| """Non-LLM rule-based check: fires only on specific unambiguous NOVEL compound terms. |
| Generic single words (data, model, network, learning, detection) do NOT trigger this. |
| The keyword list from BERTopic is checked word-by-word AND as joined text to catch |
| compound matches that span two keywords.""" |
| |
| keyword_text = " ".join(keywords).lower() |
| sample_text = " ".join(samples).lower() |
| return ( |
| bool(NOVEL_REGEX_TRIGGERS.search(keyword_text)) or |
| bool(NOVEL_REGEX_TRIGGERS.search(sample_text)) |
| ) |
|
|
| |
| |
| |
| @dataclass |
| class TopicInterpretation: |
| """Structured interpretation for a single topic.""" |
| topic_id: int |
| label: str |
| category: str |
| classification: str |
| paper_count: int = 0 |
| keywords: list[str] = None |
|
|
| |
| |
| |
| def build_groq_client(api_key: Optional[str] = None): |
| key = api_key or os.getenv("GROQ_API_KEY") |
| if not key: |
| raise ValueError("No Groq API key provided.") |
| return Groq(api_key=key, max_retries=0) |
|
|
| def call_gemini_label(prompt: str, api_key: str) -> dict: |
| """Call Google AI Studio (Gemini) API.""" |
| if not api_key: return {} |
| url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={api_key}" |
| headers = {"Content-Type": "application/json"} |
| payload = {"contents": [{"parts": [{"text": prompt}]}], "generationConfig": {"temperature": 0.2}} |
| try: |
| response = requests.post(url, headers=headers, json=payload, timeout=10) |
| data = response.json() |
| if "error" in data or "candidates" not in data: |
| logger.error(f"Gemini error / missing candidates. Response: {data}") |
| return {} |
| raw = data["candidates"][0]["content"]["parts"][0]["text"].strip() |
| raw = raw.replace("```json", "").replace("```", "").strip() |
| start = raw.find("{") |
| end = raw.rfind("}") + 1 |
| if start != -1 and end != 0: |
| raw = raw[start:end] |
| return json.loads(raw) |
| except Exception as e: |
| logger.warning(f"Gemini call failed: {e}") |
| return {} |
|
|
| def call_mistral_label(prompt: str, api_key: str) -> dict: |
| """Call Mistral API.""" |
| if not api_key: return {} |
| try: |
| response = requests.post( |
| "https://api.mistral.ai/v1/chat/completions", |
| headers={"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}, |
| json={ |
| "model": "mistral-small-latest", |
| "messages": [{"role": "user", "content": prompt}], |
| "temperature": 0.2, |
| }, |
| timeout=10, |
| ) |
| data = response.json() |
| raw = data["choices"][0]["message"]["content"].strip() |
| raw = raw.replace("```json", "").replace("```", "").strip() |
| start, end = raw.find("{"), raw.rfind("}") + 1 |
| return json.loads(raw[start:end]) |
| except Exception as e: |
| logger.warning(f"Mistral call failed: {e}") |
| return {} |
|
|
| def _call_llm_json(client, prompt: str, model: str) -> dict: |
| """Call Groq API with robust JSON parsing.""" |
| try: |
| response = client.chat.completions.create( |
| model=model, messages=[{"role": "user", "content": prompt}], temperature=0.2, timeout=10, |
| ) |
| raw = response.choices[0].message.content.strip() |
| raw = raw.replace("```json", "").replace("```", "").strip() |
| start = raw.find("{") |
| end = raw.rfind("}") + 1 |
| if start != -1 and end != 0: |
| raw = raw[start:end] |
| return json.loads(raw) |
| except Exception as e: |
| logger.warning(f"Groq call failed: {e}") |
| return {} |
|
|
| |
| |
| |
| def convert_numpy_types(obj): |
| """Recursively convert numpy types to native Python types for JSON serialisation.""" |
| import numpy as np |
| if isinstance(obj, dict): |
| return {k: convert_numpy_types(v) for k, v in obj.items()} |
| elif isinstance(obj, list): |
| return [convert_numpy_types(v) for v in obj] |
| elif isinstance(obj, np.integer): |
| return int(obj) |
| elif isinstance(obj, np.floating): |
| return float(obj) |
| return obj |
|
|
| def _safe_capitalize(s: str) -> str: |
| s = str(s or "").strip() |
| return s[0].upper() + s[1:] if s else "" |
|
|
| def clean_label(label: str) -> str: |
| if not label: return "" |
| label = label.replace("\n", " ").strip() |
| label = " ".join(label.split()) |
| label = label.rstrip(" .") |
| if len(label) > 60: |
| label = label[:60].rsplit(" ", 1)[0] if " " in label[:60] else label[:60] |
| return label.strip() |
|
|
| def _get_keyword_overlap(label: str, keywords: list[str]) -> int: |
| label_words = set(label.lower().split()) |
| kw_set = set(k.lower() for k in keywords) |
| return len(label_words & kw_set) |
|
|
| def select_best_interpretation(results: list[dict], keywords: list[str]) -> dict: |
| valid = [r for r in results if r and "label" in r] |
| if not valid: return {} |
|
|
| |
| counts = {} |
| for r in valid: |
| l = clean_label(r["label"]).lower() |
| counts[l] = counts.get(l, 0) + 1 |
| for l, c in counts.items(): |
| if c >= 2: |
| best_r = next(r for r in valid if clean_label(r["label"]).lower() == l) |
| best_r["label"] = clean_label(best_r["label"]) |
| return best_r |
|
|
| |
| valid.sort(key=lambda x: (-_get_keyword_overlap(clean_label(x["label"]), keywords), len(clean_label(x["label"])))) |
| best_r = valid[0] |
| best_r["label"] = clean_label(best_r["label"]) |
| return best_r |
|
|
| def _fallback_label_from_keywords(keywords: list[str], topic_id: int) -> tuple[str, str]: |
| kw_set = set([k.lower() for k in keywords]) |
| mappings = [ |
| ({"privacy", "data", "security"}, "Digital Privacy and Security", "Cybersecurity"), |
| ({"ai", "chatbots", "agents"}, "Conversational AI", "Artificial Intelligence"), |
| ({"neural", "network", "deep"}, "Deep Learning Systems", "Machine Learning"), |
| ] |
| for trigger, label, cat in mappings: |
| if any(t in kw_set for t in trigger): return label, cat |
| return f"Topic study on {', '.join(keywords[:2])}", "Other" |
|
|
| |
| |
| |
| def _build_interpretation_prompt(keywords, samples, cats) -> str: |
| pajais_covered_str = "; ".join(PAJAIS_COVERED[:10]) |
| pajais_not_str = "; ".join(PAJAIS_NOT_COVERED[:12]) |
| return f"""You are an IS research classifier. A BERTopic algorithm produced the following topic cluster from ACM TMIS papers. |
| |
| KEYWORDS: {', '.join(keywords)} |
| REPRESENTATIVE PAPER TITLES: {' | '.join(samples[:3])} |
| |
| TASK: Generate a label and classify this topic against the PAJAIS 2019 taxonomy. |
| |
| PAJAIS 2019 COVERS β use MAPPED only if the topic clearly fits one of these: |
| {pajais_covered_str} |
| |
| PAJAIS 2019 DOES NOT COVER β use NOVEL if the topic fits here: |
| {pajais_not_str} |
| |
| CLASSIFICATION RULES: |
| - NOVEL if the topic involves: LLMs/GPT/generative AI, process mining, federated learning, differential privacy, fairness/XAI/responsible AI, blockchain analytics, COVID-19 IS, mobile malware, dark web, agentic AI, IoT analytics, talent matching, cyber insurance, or any technique that postdates 2018. |
| - MAPPED only if it clearly fits an existing PAJAIS 2019 category listed above. |
| - When in doubt, choose NOVEL. TMIS is a computational journal and most of its recent topics post-date the 2019 taxonomy. |
| |
| TAXONOMY CATEGORIES (for the taxonomy_category field only): {', '.join(cats)} |
| |
| Respond ONLY with valid JSON β no other text, no markdown fences: |
| {{ |
| "label": "<concise 5-8 word label>", |
| "taxonomy_category": "<one category from the list>", |
| "classification": "MAPPED or NOVEL", |
| "reasoning": "<one sentence explaining the MAPPED vs NOVEL decision>" |
| }}""" |
|
|
| |
| |
| |
| def validate_label_with_regex(label: str, keywords: list[str]) -> dict: |
| """ |
| Checks if the AI-generated label is grounded in the cluster's actual keywords. |
| Returns a dict with overlap score, matched terms, and a PASS/FAIL verdict. |
| This method uses only Python re β no AI involved. |
| """ |
| if not label or not keywords: |
| return {"verdict": "FAIL", "overlap_score": 0, "matched_terms": [], "reason": "Empty label or keywords"} |
|
|
| |
| label_tokens = set(re.findall(r'\b[a-z]{3,}\b', label.lower())) |
| kw_tokens = set(re.findall(r'\b[a-z]{3,}\b', " ".join(keywords).lower())) |
|
|
| |
| noise = {"the", "and", "for", "with", "using", "based", "from", "into", "this", "that", "are"} |
| label_tokens -= noise |
| kw_tokens -= noise |
|
|
| matched = list(label_tokens & kw_tokens) |
| overlap_score = len(matched) / max(len(label_tokens), 1) |
|
|
| |
| stem_matches = [] |
| for lt in label_tokens: |
| for kt in kw_tokens: |
| if len(lt) >= 4 and (kt.startswith(lt[:4]) or lt.startswith(kt[:4])): |
| stem_matches.append(f"{lt}β{kt}") |
|
|
| total_score = min(1.0, overlap_score + 0.15 * len(stem_matches)) |
| verdict = "PASS" if (len(matched) >= 1 or len(stem_matches) >= 1) else "FAIL" |
|
|
| return { |
| "verdict": verdict, |
| "overlap_score": round(total_score, 3), |
| "matched_terms": matched, |
| "stem_matches": stem_matches[:5], |
| "label_tokens": list(label_tokens), |
| "reason": f"{len(matched)} exact + {len(stem_matches)} stem matches against {len(kw_tokens)} keyword tokens", |
| } |
|
|
| |
| |
| |
| def interpret_topic( |
| topic_id, keywords, samples, groq_client, mistral_key, gemini_key, |
| paper_count, representative_docs |
| ) -> TopicInterpretation: |
|
|
| prompt = _build_interpretation_prompt(keywords, samples, DEFAULT_TAXONOMY_CATEGORIES) |
|
|
| |
| |
| |
| |
| |
| |
| forced_novel = _is_deterministic_novel(keywords, samples) |
| if forced_novel: |
| logger.info(f"Topic {topic_id}: NOVEL forced by regex trigger on keywords={keywords[:4]}") |
|
|
| |
| |
| |
| |
| |
| raw_results = [] |
|
|
| groq_res = _call_llm_json(groq_client, prompt, DEFAULT_MODEL) |
| raw_results.append({"llm": "Groq/LLaMA-3.1", "response": groq_res}) |
| time.sleep(1) |
|
|
| mistral_res = call_mistral_label(prompt, mistral_key) |
| raw_results.append({"llm": "Mistral-Small", "response": mistral_res}) |
| time.sleep(1) |
|
|
| if gemini_key: |
| gemini_res = call_gemini_label(prompt, gemini_key) |
| raw_results.append({"llm": "Gemini-2.5-Flash", "response": gemini_res}) |
|
|
| results = [r["response"] for r in raw_results] |
|
|
| |
| |
| |
| best = select_best_interpretation(results, keywords) |
| if not best: |
| l, c = _fallback_label_from_keywords(keywords, topic_id) |
| best = {"label": l, "taxonomy_category": c, "classification": "MAPPED"} |
|
|
| final_label = _safe_capitalize(best.get("label")) |
|
|
| |
| |
| |
| |
| |
| |
| |
| classification_votes = [] |
| for r in results: |
| if r and "classification" in r: |
| v = str(r["classification"]).upper().strip() |
| if v in ("MAPPED", "NOVEL"): |
| classification_votes.append(v) |
|
|
| novel_votes = classification_votes.count("NOVEL") |
| mapped_votes = classification_votes.count("MAPPED") |
|
|
| |
| |
| |
| |
| |
| |
| if forced_novel or novel_votes >= 2: |
| final_classification = "NOVEL" |
| else: |
| final_classification = "MAPPED" |
|
|
| logger.info( |
| f"Topic {topic_id} classification: NOVEL_votes={novel_votes}, " |
| f"MAPPED_votes={mapped_votes}, regex_forced={forced_novel} β {final_classification}" |
| ) |
|
|
| |
| |
| |
| |
| |
| council_votes = [] |
| for r in raw_results: |
| resp = r["response"] |
| council_votes.append({ |
| "llm": r["llm"], |
| "label": clean_label(resp.get("label", "β")) if resp else "β", |
| "category": resp.get("taxonomy_category", "β") if resp else "β", |
| "classification": resp.get("classification", "β") if resp else "β", |
| "reasoning": resp.get("reasoning", "β") if resp else "β", |
| }) |
|
|
| |
| |
| |
| |
| |
| |
| regex_validation = validate_label_with_regex(final_label, keywords) |
| logger.info( |
| f"Topic {topic_id} label grounding: {regex_validation['verdict']} " |
| f"(score={regex_validation['overlap_score']}, matched={regex_validation['matched_terms']})" |
| ) |
|
|
| |
| |
| |
| interp = TopicInterpretation( |
| topic_id=topic_id, |
| label=final_label, |
| category=_safe_capitalize(best.get("taxonomy_category")), |
| classification=final_classification, |
| paper_count=paper_count, |
| keywords=keywords, |
| ) |
|
|
| |
| interp.council_votes = council_votes |
| interp.regex_validation = regex_validation |
| interp.novel_forced_by_regex = forced_novel |
| interp.classification_votes = {"NOVEL": novel_votes, "MAPPED": mapped_votes} |
|
|
| return interp |
|
|
| |
| |
| |
| def run_agent( |
| topic_results, |
| groq_key, |
| mistral_key, |
| gemini_key, |
| output_json="topics.json", |
| output_csv="topics.csv", |
| ) -> dict: |
|
|
| client = build_groq_client(groq_key) |
| res = topic_results["documents"] |
|
|
| num_clusters = len([t for t in set(res["topics"]) if t != -1]) |
| num_topics = len(res["topic_keywords"]) |
| print(f"Final cluster count: {num_clusters}") |
| print(f"Final topic count: {num_topics}") |
| if num_clusters != num_topics: |
| logger.error(f"CONSISTENCY WARNING: {num_clusters} clusters != {num_topics} topics") |
|
|
| interpretations = {} |
| for i, (tid, kw_pairs) in enumerate(res["topic_keywords"].items()): |
| interp = interpret_topic( |
| tid, |
| [w for w, _ in kw_pairs], |
| res["representative_docs"].get(tid, []), |
| client, |
| mistral_key, |
| gemini_key, |
| res["topic_freq"].get(tid, 0), |
| res["representative_docs"].get(tid, []), |
| ) |
| interpretations[tid] = interp |
| logger.info(f"Interpreted {tid}: {interp.label} [{interp.classification}]") |
|
|
| |
| interp_list = [] |
| for i in interpretations.values(): |
| d = asdict(i) |
| |
| d["council_votes"] = getattr(i, "council_votes", []) |
| d["regex_validation"] = getattr(i, "regex_validation", {}) |
| d["novel_forced_by_regex"] = getattr(i, "novel_forced_by_regex", False) |
| d["classification_votes"] = getattr(i, "classification_votes", {}) |
| interp_list.append(d) |
|
|
| clean_data = convert_numpy_types(interp_list) |
|
|
| with open(output_json, "w") as f: |
| json.dump(clean_data, f, indent=2) |
|
|
| df = pd.DataFrame(clean_data) |
| if not df.empty: |
| df["keywords"] = df["keywords"].apply( |
| lambda x: ", ".join(x) if isinstance(x, list) else str(x) |
| ) |
| df.to_csv(output_csv, index=False) |
|
|
| return { |
| "interpretations": interpretations, |
| "json_path": output_json, |
| "csv_path": output_csv, |
| } |
|
|
|
|
| if __name__ == "__main__": |
| pass |