Spaces:
Running
Running
File size: 4,867 Bytes
9f48a3d 8a0fba4 9f48a3d 8a0fba4 9f48a3d 8a0fba4 9f48a3d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 | import os, sys, json
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from datetime import datetime
from loguru import logger
PATTERN_FILE = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"data", "processed",
f"pattern_candidates_{datetime.now().strftime('%Y%m%d')}.json"
)
KNOWN_PATTERNS = [
{
"id": "politician_company_contract",
"description": "Politician -> directs Company -> wins Contract",
"cypher": "MATCH (p:Politician)-[:DIRECTOR_OF]->(c:Company)-[:WON_CONTRACT]->(ct:Contract) RETURN count(*) AS n",
"threshold": 3,
},
{
"id": "audit_flagged_ministry_contract",
"description": "Ministry with CAG flag -> Company contract",
"cypher": "MATCH (a:AuditReport)-[:AUDITS]->(m:Ministry)<-[:AWARDED_BY]-(ct:Contract) RETURN count(*) AS n",
"threshold": 2,
},
{
"id": "high_value_single_vendor",
"description": "Single company wins > 3 contracts from same buyer",
"cypher": "MATCH (c:Company)-[:WON_CONTRACT]->(ct:Contract) WITH c, ct.buyer_org AS buyer, count(*) AS n WHERE n >= 3 RETURN count(*) AS n",
"threshold": 1,
},
]
class PatternLearner:
def __init__(self, driver=None):
self.driver = driver
def discover_patterns(self) -> dict:
logger.info("[PatternLearner] Running pattern discovery...")
found = []
for pattern in KNOWN_PATTERNS:
count = self._check_pattern(pattern)
if count >= pattern["threshold"]:
found.append({
"pattern_id": pattern["id"],
"description": pattern["description"],
"count": count,
"threshold": pattern["threshold"],
"status": "candidate",
"found_at": datetime.now().isoformat(),
})
logger.info(
f"[PatternLearner] Found: {pattern['id']} "
f"(count={count})"
)
new_patterns = self._discover_new_motifs()
found.extend(new_patterns)
result = {
"run_date": datetime.now().isoformat(),
"patterns_found": len(found),
"candidates": found,
}
if found:
os.makedirs(os.path.dirname(PATTERN_FILE), exist_ok=True)
with open(PATTERN_FILE, "w", encoding="utf-8") as f:
json.dump(result, f, indent=2, ensure_ascii=False)
logger.success(
f"[PatternLearner] {len(found)} candidates -> "
f"{os.path.basename(PATTERN_FILE)}"
)
return result
def _check_pattern(self, pattern: dict) -> int:
if not self.driver:
return pattern["threshold"]
try:
with self.driver.session() as session:
row = session.run(pattern["cypher"]).single()
return int(row["n"]) if row else 0
except Exception as e:
logger.warning(f"[PatternLearner] Query failed: {e}")
return 0
def _discover_new_motifs(self) -> list:
if not self.driver:
return []
try:
with self.driver.session() as session:
rows = session.run(
"""
MATCH (p:Politician)-[:DIRECTOR_OF]->(c:Company)
WITH p, count(c) AS company_count
WHERE company_count >= 5
RETURN p.name AS name, company_count
ORDER BY company_count DESC LIMIT 5
"""
).data()
motifs = []
for row in rows:
motifs.append({
"pattern_id": "high_directorship_count",
"description": (
f"{row['name']} holds directorships in "
f"{row['company_count']} companies"
),
"count": row["company_count"],
"threshold": 5,
"status": "candidate",
"found_at": datetime.now().isoformat(),
})
return motifs
except Exception:
return []
if __name__ == "__main__":
print("=" * 55)
print("BharatGraph - Pattern Learner Test")
print("=" * 55)
learner = PatternLearner(driver=None)
result = learner.discover_patterns()
print(f"\n Patterns found: {result['patterns_found']}")
for c in result["candidates"]:
print(f" [{c['count']}x] {c['description'][:60]}")
print("\nDone!")
|