Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| 🗂️ D: Drev Harvester - Importerer lokal data til Neo4j Knowledge Graph | |
| """ | |
| import os | |
| import json | |
| import hashlib | |
| import zipfile | |
| from pathlib import Path | |
| from datetime import datetime | |
| from neo4j import GraphDatabase | |
| import re | |
| class DDrevHarvester: | |
| """Harvester for D: drev data til Neo4j""" | |
| NEO4J_URI = "neo4j+s://054eff27.databases.neo4j.io" | |
| NEO4J_USER = "neo4j" | |
| NEO4J_PASSWORD = "Qrt37mkb0xBZ7_ts5tG1J70K2mVDGPMF2L7Njlm7cg8" | |
| # Prioriterede mapper at scanne | |
| PRIORITY_PATHS = { | |
| "Intel": { | |
| "path": r"D:\Intel", | |
| "category": "OSINT_INTELLIGENCE", | |
| "priority": "CRITICAL" | |
| }, | |
| "viden": { | |
| "path": r"D:\viden", | |
| "category": "KNOWLEDGE_BASE", | |
| "priority": "CRITICAL" | |
| }, | |
| "oSINT": { | |
| "path": r"D:\oSINT", | |
| "category": "OSINT_TOOLS", | |
| "priority": "HIGH" | |
| }, | |
| "Mulige_widgets": { | |
| "path": r"D:\Mulige widgets", | |
| "category": "WIDGET_CANDIDATES", | |
| "priority": "HIGH" | |
| }, | |
| "PowerPointPlugIn": { | |
| "path": r"D:\PowerPointPlugIn", | |
| "category": "INTEGRATIONS", | |
| "priority": "MEDIUM" | |
| }, | |
| "deepdarkCTI": { | |
| "path": r"D:\deepdarkCTI-main", | |
| "category": "THREAT_INTELLIGENCE", | |
| "priority": "HIGH" | |
| } | |
| } | |
| # Filer at ignorere | |
| IGNORE_PATTERNS = [ | |
| r'node_modules', r'\.git', r'__pycache__', r'\.next', | |
| r'dist', r'build', r'\.env', r'venv', r'\.venv' | |
| ] | |
| def __init__(self): | |
| self.driver = GraphDatabase.driver( | |
| self.NEO4J_URI, | |
| auth=(self.NEO4J_USER, self.NEO4J_PASSWORD) | |
| ) | |
| self.stats = { | |
| "files_scanned": 0, | |
| "files_imported": 0, | |
| "directories": 0, | |
| "zip_contents": 0 | |
| } | |
| print("🗂️ D: Drev Harvester initialized") | |
| print(f" Neo4j: {self.NEO4J_URI}") | |
| def md5_hash(self, content: str) -> str: | |
| return hashlib.md5(content.encode('utf-8')).hexdigest() | |
| def should_ignore(self, path: str) -> bool: | |
| for pattern in self.IGNORE_PATTERNS: | |
| if re.search(pattern, path): | |
| return True | |
| return False | |
| def get_file_type(self, filepath: str) -> str: | |
| ext = Path(filepath).suffix.lower() | |
| type_map = { | |
| '.py': 'PYTHON', | |
| '.js': 'JAVASCRIPT', | |
| '.ts': 'TYPESCRIPT', | |
| '.tsx': 'REACT', | |
| '.jsx': 'REACT', | |
| '.md': 'MARKDOWN', | |
| '.json': 'JSON', | |
| '.yaml': 'YAML', | |
| '.yml': 'YAML', | |
| '.txt': 'TEXT', | |
| '.pdf': 'PDF', | |
| '.zip': 'ARCHIVE', | |
| '.pptx': 'POWERPOINT', | |
| '.docx': 'WORD', | |
| '.xlsx': 'EXCEL', | |
| '.html': 'HTML', | |
| '.css': 'CSS', | |
| '.sql': 'SQL', | |
| '.sh': 'SHELL', | |
| '.bat': 'BATCH', | |
| '.ps1': 'POWERSHELL' | |
| } | |
| return type_map.get(ext, 'OTHER') | |
| def extract_zip_contents(self, zip_path: Path) -> list: | |
| """Liste indhold af zip fil uden at udpakke""" | |
| contents = [] | |
| try: | |
| with zipfile.ZipFile(zip_path, 'r') as zf: | |
| for info in zf.infolist()[:50]: # Max 50 entries | |
| if not info.is_dir(): | |
| contents.append({ | |
| "name": info.filename, | |
| "size": info.file_size, | |
| "type": self.get_file_type(info.filename) | |
| }) | |
| self.stats["zip_contents"] += 1 | |
| except: | |
| pass | |
| return contents | |
| def scan_directory(self, base_path: str, category: str, priority: str) -> list: | |
| """Scan en mappe og returner fil-metadata""" | |
| files = [] | |
| base = Path(base_path) | |
| if not base.exists(): | |
| print(f" ⚠️ Path ikke fundet: {base_path}") | |
| return files | |
| for item in base.rglob("*"): | |
| if self.should_ignore(str(item)): | |
| continue | |
| try: | |
| if item.is_file(): | |
| self.stats["files_scanned"] += 1 | |
| rel_path = item.relative_to(base) | |
| file_type = self.get_file_type(str(item)) | |
| file_info = { | |
| "name": item.name, | |
| "path": str(item), | |
| "relative_path": str(rel_path), | |
| "type": file_type, | |
| "size": item.stat().st_size, | |
| "category": category, | |
| "priority": priority, | |
| "modified": datetime.fromtimestamp(item.stat().st_mtime).isoformat() | |
| } | |
| # Zip fil indhold | |
| if file_type == "ARCHIVE" and item.suffix.lower() == '.zip': | |
| file_info["zip_contents"] = self.extract_zip_contents(item) | |
| # Læs indhold af små tekstfiler | |
| if file_type in ['MARKDOWN', 'TEXT', 'JSON', 'YAML'] and item.stat().st_size < 50000: | |
| try: | |
| file_info["content_preview"] = item.read_text(encoding='utf-8')[:2000] | |
| except: | |
| pass | |
| files.append(file_info) | |
| elif item.is_dir(): | |
| self.stats["directories"] += 1 | |
| except Exception as e: | |
| continue | |
| return files | |
| def import_to_neo4j(self, files: list, source_name: str): | |
| """Importer filer til Neo4j""" | |
| print(f"\n 💾 Importing {len(files)} files from {source_name}...") | |
| with self.driver.session() as session: | |
| # Opret DataSource node | |
| session.run(""" | |
| MERGE (ds:DataSource {name: $name}) | |
| ON CREATE SET | |
| ds.type = 'local_drive', | |
| ds.location = 'D:', | |
| ds.createdAt = datetime() | |
| ON MATCH SET | |
| ds.lastHarvest = datetime() | |
| """, name=f"DDrev_{source_name}") | |
| for f in files: | |
| content_hash = self.md5_hash(f"{f['path']}:{f['modified']}") | |
| # Opret LocalFile node | |
| session.run(""" | |
| MERGE (lf:LocalFile {contentHash: $hash}) | |
| ON CREATE SET | |
| lf.name = $name, | |
| lf.path = $path, | |
| lf.relativePath = $rel_path, | |
| lf.fileType = $file_type, | |
| lf.size = $size, | |
| lf.category = $category, | |
| lf.priority = $priority, | |
| lf.modified = $modified, | |
| lf.harvestedAt = datetime() | |
| ON MATCH SET | |
| lf.lastSeen = datetime() | |
| WITH lf | |
| MERGE (ds:DataSource {name: $source}) | |
| MERGE (lf)-[:HARVESTED_FROM]->(ds) | |
| WITH lf | |
| MERGE (cat:Category {name: $category}) | |
| MERGE (lf)-[:BELONGS_TO]->(cat) | |
| """, | |
| hash=content_hash, | |
| name=f['name'], | |
| path=f['path'], | |
| rel_path=f['relative_path'], | |
| file_type=f['type'], | |
| size=f['size'], | |
| category=f['category'], | |
| priority=f['priority'], | |
| modified=f['modified'], | |
| source=f"DDrev_{source_name}" | |
| ) | |
| # Gem content preview hvis tilgængelig | |
| if f.get('content_preview'): | |
| session.run(""" | |
| MATCH (lf:LocalFile {contentHash: $hash}) | |
| SET lf.contentPreview = $preview | |
| """, hash=content_hash, preview=f['content_preview'][:2000]) | |
| # Gem zip contents | |
| if f.get('zip_contents'): | |
| for zc in f['zip_contents'][:20]: | |
| session.run(""" | |
| MATCH (lf:LocalFile {contentHash: $hash}) | |
| MERGE (zf:ZipContent {name: $name, parent: $hash}) | |
| ON CREATE SET | |
| zf.size = $size, | |
| zf.fileType = $type | |
| MERGE (zf)-[:CONTAINED_IN]->(lf) | |
| """, | |
| hash=content_hash, | |
| name=zc['name'], | |
| size=zc['size'], | |
| type=zc['type'] | |
| ) | |
| self.stats["files_imported"] += 1 | |
| def run(self): | |
| """Kør fuld harvest""" | |
| print("\n" + "=" * 60) | |
| print("🗂️ D: DREV HARVESTER") | |
| print("=" * 60) | |
| all_files = [] | |
| for name, config in self.PRIORITY_PATHS.items(): | |
| print(f"\n📁 Scanning: {name} ({config['priority']})") | |
| print(f" Path: {config['path']}") | |
| files = self.scan_directory( | |
| config['path'], | |
| config['category'], | |
| config['priority'] | |
| ) | |
| print(f" Found: {len(files)} files") | |
| if files: | |
| self.import_to_neo4j(files, name) | |
| all_files.extend(files) | |
| # Summary | |
| print("\n" + "=" * 60) | |
| print("📊 HARVEST COMPLETE") | |
| print("=" * 60) | |
| print(f" 📁 Directories scanned: {self.stats['directories']}") | |
| print(f" 📄 Files scanned: {self.stats['files_scanned']}") | |
| print(f" 💾 Files imported: {self.stats['files_imported']}") | |
| print(f" 📦 Zip contents indexed: {self.stats['zip_contents']}") | |
| print("=" * 60) | |
| # Save local summary | |
| summary_file = Path("data/ddrev_harvest_summary.json") | |
| summary_file.parent.mkdir(parents=True, exist_ok=True) | |
| summary = { | |
| "timestamp": datetime.now().isoformat(), | |
| "stats": self.stats, | |
| "sources": list(self.PRIORITY_PATHS.keys()), | |
| "sample_files": [f['name'] for f in all_files[:50]] | |
| } | |
| with open(summary_file, 'w', encoding='utf-8') as f: | |
| json.dump(summary, f, indent=2, ensure_ascii=False) | |
| print(f"\n📁 Summary saved: {summary_file}") | |
| self.driver.close() | |
| return all_files | |
| if __name__ == "__main__": | |
| harvester = DDrevHarvester() | |
| harvester.run() | |