widgettdc-api / apps /backend /python /m365_realtime_watcher.py
Kraft102's picture
Update backend source
34367da verified
#!/usr/bin/env python3
"""
👁️ M365 REALTIME WATCHER
=========================
Overvåger ændringer i M365 lokale filer og harvester automatisk.
Features:
- File system watcher på OneDrive/SharePoint sync
- Outlook COM events (new mail)
- Teams IndexedDB changes
- Auto-harvest ved ændringer
- Neo4j realtime sync
"""
import os
import sys
import time
import json
import hashlib
import threading
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import List, Dict, Callable, Optional
from queue import Queue
import re
# Watchdog for file system monitoring
try:
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileModifiedEvent, FileCreatedEvent
WATCHDOG_AVAILABLE = True
except ImportError:
WATCHDOG_AVAILABLE = False
print("⚠️ watchdog ikke installeret. Kør: pip install watchdog")
# Neo4j
from neo4j import GraphDatabase
# ============================================================
# CONFIGURATION
# ============================================================
NEO4J_URI = "neo4j+s://054eff27.databases.neo4j.io"
NEO4J_USER = "neo4j"
NEO4J_PASSWORD = "Qrt37mkb0xBZ7_ts5tG1J70K2mVDGPMF2L7Njlm7cg8"
USER_HOME = Path(os.environ.get("USERPROFILE", os.path.expanduser("~")))
APPDATA_LOCAL = Path(os.environ.get("LOCALAPPDATA", USER_HOME / "AppData" / "Local"))
# Watch paths
WATCH_PATHS = {
"onedrive": USER_HOME / "OneDrive",
"teams": APPDATA_LOCAL / "Packages" / "MSTeams_8wekyb3d8bbwe" / "LocalCache" / "Microsoft" / "MSTeams",
}
# File extensions to watch
WATCH_EXTENSIONS = {'.docx', '.xlsx', '.pptx', '.pdf', '.txt', '.md', '.doc', '.xls', '.ppt', '.json'}
# Keywords
SEARCH_KEYWORDS = [
"strategi", "cyber", "NIS2", "SOC", "MDR", "cloud", "Azure", "AI",
"Copilot", "Columbus", "ERP", "budget", "forecast", "kunde", "kontrakt",
"rammeaftale", "SKI", "produkt", "CloudKey", "arkitektur", "roadmap",
"projekt", "meeting", "beslutning", "action", "deadline"
]
# ============================================================
# DATA CLASSES
# ============================================================
@dataclass
class WatchEvent:
"""Repræsenterer en file change event"""
event_type: str # created, modified, deleted
path: str
source: str # onedrive, teams, outlook
timestamp: str
keywords: List[str]
content_preview: str
# ============================================================
# FILE CONTENT EXTRACTOR
# ============================================================
class ContentExtractor:
"""Udtræk content fra forskellige filtyper"""
@staticmethod
def extract(filepath: Path) -> str:
"""Udtræk tekst fra fil"""
try:
suffix = filepath.suffix.lower()
if suffix in ['.txt', '.md', '.json']:
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()[:5000]
elif suffix == '.docx':
return ContentExtractor._extract_docx(filepath)
elif suffix == '.xlsx':
return ContentExtractor._extract_xlsx(filepath)
elif suffix == '.pptx':
return ContentExtractor._extract_pptx(filepath)
except Exception as e:
pass
return ""
@staticmethod
def _extract_docx(filepath: Path) -> str:
try:
import zipfile
with zipfile.ZipFile(filepath, 'r') as z:
xml = z.read('word/document.xml').decode('utf-8', errors='ignore')
text = re.sub(r'<[^>]+>', ' ', xml)
return re.sub(r'\s+', ' ', text)[:5000]
except:
return ""
@staticmethod
def _extract_xlsx(filepath: Path) -> str:
try:
import zipfile
with zipfile.ZipFile(filepath, 'r') as z:
if 'xl/sharedStrings.xml' in z.namelist():
xml = z.read('xl/sharedStrings.xml').decode('utf-8', errors='ignore')
text = re.sub(r'<[^>]+>', ' ', xml)
return re.sub(r'\s+', ' ', text)[:5000]
except:
pass
return ""
@staticmethod
def _extract_pptx(filepath: Path) -> str:
try:
import zipfile
texts = []
with zipfile.ZipFile(filepath, 'r') as z:
for name in z.namelist():
if name.startswith('ppt/slides/slide') and name.endswith('.xml'):
xml = z.read(name).decode('utf-8', errors='ignore')
text = re.sub(r'<[^>]+>', ' ', xml)
texts.append(text)
return ' '.join(texts)[:5000]
except:
return ""
# ============================================================
# FILE SYSTEM WATCHER
# ============================================================
class M365FileHandler(FileSystemEventHandler):
"""Handler for file system events"""
def __init__(self, source: str, event_queue: Queue, keywords: List[str]):
self.source = source
self.event_queue = event_queue
self.keywords = keywords
self.processed_files = set()
self.cooldown = {} # Prevent duplicate events
def _should_process(self, path: str) -> bool:
"""Check om fil skal processeres"""
filepath = Path(path)
# Check extension
if filepath.suffix.lower() not in WATCH_EXTENSIONS:
return False
# Check cooldown (same file within 2 seconds)
now = time.time()
if path in self.cooldown and (now - self.cooldown[path]) < 2:
return False
self.cooldown[path] = now
return True
def _match_keywords(self, text: str) -> List[str]:
"""Match keywords i tekst"""
if not text:
return []
text_lower = text.lower()
return [kw for kw in self.keywords if kw.lower() in text_lower]
def on_created(self, event):
if event.is_directory:
return
if self._should_process(event.src_path):
self._process_event("created", event.src_path)
def on_modified(self, event):
if event.is_directory:
return
if self._should_process(event.src_path):
self._process_event("modified", event.src_path)
def _process_event(self, event_type: str, path: str):
"""Process en file event"""
filepath = Path(path)
# Extract content
content = ContentExtractor.extract(filepath)
# Match keywords
full_text = f"{filepath.name} {content}"
keywords = self._match_keywords(full_text)
# Create event
watch_event = WatchEvent(
event_type=event_type,
path=path,
source=self.source,
timestamp=datetime.now().isoformat(),
keywords=keywords,
content_preview=content[:500] if content else ""
)
self.event_queue.put(watch_event)
# ============================================================
# OUTLOOK WATCHER (COM Events)
# ============================================================
class OutlookWatcher:
"""Watch for new Outlook emails via COM"""
def __init__(self, event_queue: Queue, keywords: List[str]):
self.event_queue = event_queue
self.keywords = keywords
self.running = False
self.outlook = None
def _match_keywords(self, text: str) -> List[str]:
if not text:
return []
text_lower = text.lower()
return [kw for kw in self.keywords if kw.lower() in text_lower]
def start(self):
"""Start Outlook watcher i separat thread"""
self.running = True
thread = threading.Thread(target=self._watch_loop, daemon=True)
thread.start()
return thread
def stop(self):
self.running = False
def _watch_loop(self):
"""Watch loop for new emails"""
try:
import win32com.client
import pythoncom
pythoncom.CoInitialize()
self.outlook = win32com.client.Dispatch("Outlook.Application")
namespace = self.outlook.GetNamespace("MAPI")
inbox = namespace.GetDefaultFolder(6) # Inbox
last_count = inbox.Items.Count
print(" 📧 Outlook watcher startet")
while self.running:
try:
current_count = inbox.Items.Count
if current_count > last_count:
# New emails!
new_items = current_count - last_count
items = inbox.Items
items.Sort("[ReceivedTime]", True)
for i in range(min(new_items, 10)):
try:
item = items.Item(i + 1)
if item.Class == 43: # MailItem
subject = str(item.Subject or "")
body = str(item.Body or "")[:2000]
keywords = self._match_keywords(f"{subject} {body}")
event = WatchEvent(
event_type="new_email",
path=f"Inbox/{subject[:50]}",
source="outlook",
timestamp=datetime.now().isoformat(),
keywords=keywords,
content_preview=body[:500]
)
self.event_queue.put(event)
except:
continue
last_count = current_count
time.sleep(5) # Check every 5 seconds
except Exception as e:
time.sleep(10)
pythoncom.CoUninitialize()
except Exception as e:
print(f" ❌ Outlook watcher error: {e}")
# ============================================================
# NEO4J SYNC
# ============================================================
class Neo4jSync:
"""Sync events til Neo4j"""
def __init__(self):
self.driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
def save_event(self, event: WatchEvent):
"""Gem event i Neo4j"""
content_hash = hashlib.md5(f"{event.source}:{event.path}:{event.timestamp}".encode()).hexdigest()
with self.driver.session() as session:
session.run("""
MERGE (e:M365RealtimeEvent {contentHash: $hash})
ON CREATE SET
e.eventType = $eventType,
e.path = $path,
e.source = $source,
e.timestamp = $timestamp,
e.keywords = $keywords,
e.contentPreview = $preview,
e.processedAt = datetime()
MERGE (ds:DataSource {name: $dsName})
MERGE (e)-[:DETECTED_BY]->(ds)
""",
hash=content_hash,
eventType=event.event_type,
path=event.path,
source=event.source,
timestamp=event.timestamp,
keywords=event.keywords,
preview=event.content_preview[:1000],
dsName=f"M365_Watcher_{event.source}"
)
# Keyword relationships
for kw in event.keywords:
session.run("""
MERGE (k:SearchKeyword {name: $kw})
WITH k
MATCH (e:M365RealtimeEvent {contentHash: $hash})
MERGE (e)-[:MATCHES_KEYWORD]->(k)
""", kw=kw, hash=content_hash)
def close(self):
self.driver.close()
# ============================================================
# MAIN WATCHER
# ============================================================
class M365RealtimeWatcher:
"""Main realtime watcher for alle M365 sources"""
def __init__(self):
self.event_queue = Queue()
self.neo4j = Neo4jSync()
self.observers = []
self.outlook_watcher = None
self.running = False
self.stats = {
"events_detected": 0,
"events_with_keywords": 0,
"events_saved": 0
}
def start(self, watch_onedrive: bool = True, watch_teams: bool = True, watch_outlook: bool = True):
"""Start alle watchers"""
print("\n" + "=" * 60)
print("👁️ M365 REALTIME WATCHER")
print("=" * 60)
if not WATCHDOG_AVAILABLE:
print("❌ watchdog module ikke tilgængelig")
print(" Installer med: pip install watchdog")
return
self.running = True
# File system watchers
if watch_onedrive and WATCH_PATHS["onedrive"].exists():
print(f"\n☁️ OneDrive: {WATCH_PATHS['onedrive']}")
handler = M365FileHandler("onedrive", self.event_queue, SEARCH_KEYWORDS)
observer = Observer()
observer.schedule(handler, str(WATCH_PATHS["onedrive"]), recursive=True)
observer.start()
self.observers.append(observer)
print(" ✅ Watcher startet")
if watch_teams and WATCH_PATHS["teams"].exists():
print(f"\n💬 Teams: {WATCH_PATHS['teams']}")
handler = M365FileHandler("teams", self.event_queue, SEARCH_KEYWORDS)
observer = Observer()
observer.schedule(handler, str(WATCH_PATHS["teams"]), recursive=True)
observer.start()
self.observers.append(observer)
print(" ✅ Watcher startet")
# Outlook watcher
if watch_outlook:
print("\n📧 Outlook")
self.outlook_watcher = OutlookWatcher(self.event_queue, SEARCH_KEYWORDS)
self.outlook_watcher.start()
# Event processor
print("\n" + "=" * 60)
print("🔄 Lytter efter ændringer... (Ctrl+C for at stoppe)")
print("=" * 60 + "\n")
self._process_events()
def _process_events(self):
"""Process events fra queue"""
try:
while self.running:
try:
# Get event with timeout
event = self.event_queue.get(timeout=1)
self.stats["events_detected"] += 1
# Print event
icon = {"onedrive": "☁️", "teams": "💬", "outlook": "📧"}.get(event.source, "📦")
kw_str = f" [{', '.join(event.keywords[:3])}]" if event.keywords else ""
print(f"{icon} [{event.event_type}] {Path(event.path).name}{kw_str}")
# Save if has keywords
if event.keywords:
self.stats["events_with_keywords"] += 1
self.neo4j.save_event(event)
self.stats["events_saved"] += 1
print(f" 💾 Saved to Neo4j")
except:
continue
except KeyboardInterrupt:
self.stop()
def stop(self):
"""Stop alle watchers"""
print("\n\n" + "=" * 60)
print("🛑 Stopper watchers...")
print("=" * 60)
self.running = False
for observer in self.observers:
observer.stop()
observer.join()
if self.outlook_watcher:
self.outlook_watcher.stop()
self.neo4j.close()
print(f"\n📊 STATISTIK:")
print(f" Events detected: {self.stats['events_detected']}")
print(f" Events med keywords: {self.stats['events_with_keywords']}")
print(f" Events saved: {self.stats['events_saved']}")
print("=" * 60)
# ============================================================
# CLI
# ============================================================
def main():
import argparse
parser = argparse.ArgumentParser(description="M365 Realtime Watcher")
parser.add_argument("--no-onedrive", action="store_true", help="Skip OneDrive watching")
parser.add_argument("--no-teams", action="store_true", help="Skip Teams watching")
parser.add_argument("--no-outlook", action="store_true", help="Skip Outlook watching")
args = parser.parse_args()
watcher = M365RealtimeWatcher()
watcher.start(
watch_onedrive=not args.no_onedrive,
watch_teams=not args.no_teams,
watch_outlook=not args.no_outlook
)
if __name__ == "__main__":
main()