Spaces:
Runtime error
Runtime error
| """ | |
| Temporal Application Tracker | |
| Implements time-aware tracking of job applications with versioned history | |
| Based on the Temporal AI Agents pattern for maintaining historical context | |
| """ | |
| import json | |
| import logging | |
| from typing import Dict, List, Tuple, Optional, Any | |
| from datetime import datetime, timedelta | |
| from dataclasses import dataclass, field | |
| from pathlib import Path | |
| import hashlib | |
| from models.schemas import JobPosting, OrchestrationResult | |
| logger = logging.getLogger(__name__) | |
| class Triplet: | |
| """ | |
| A time-stamped fact in subject-predicate-object format | |
| Example: (JobID123, status, applied, 2025-01-15) | |
| """ | |
| subject: str | |
| predicate: str | |
| object: Any | |
| valid_at: datetime | |
| expired_at: Optional[datetime] = None | |
| confidence: float = 1.0 | |
| source: str = "user" | |
| metadata: Dict = field(default_factory=dict) | |
| def to_dict(self) -> Dict: | |
| return { | |
| 'subject': self.subject, | |
| 'predicate': self.predicate, | |
| 'object': str(self.object), | |
| 'valid_at': self.valid_at.isoformat(), | |
| 'expired_at': self.expired_at.isoformat() if self.expired_at else None, | |
| 'confidence': self.confidence, | |
| 'source': self.source, | |
| 'metadata': self.metadata | |
| } | |
| def from_dict(cls, data: Dict) -> 'Triplet': | |
| return cls( | |
| subject=data['subject'], | |
| predicate=data['predicate'], | |
| object=data['object'], | |
| valid_at=datetime.fromisoformat(data['valid_at']), | |
| expired_at=datetime.fromisoformat(data['expired_at']) if data.get('expired_at') else None, | |
| confidence=data.get('confidence', 1.0), | |
| source=data.get('source', 'user'), | |
| metadata=data.get('metadata', {}) | |
| ) | |
| class TemporalKnowledgeGraph: | |
| """ | |
| Knowledge graph that tracks changes over time | |
| Maintains history of all application states and changes | |
| """ | |
| def __init__(self, storage_path: str = "temporal_graph.json"): | |
| self.storage_path = Path(storage_path) | |
| self.triplets: List[Triplet] = [] | |
| self.load() | |
| def add_triplet(self, triplet: Triplet) -> None: | |
| """Add a new fact to the graph""" | |
| # Check for contradictions | |
| existing = self.find_current(triplet.subject, triplet.predicate) | |
| if existing and existing.object != triplet.object: | |
| # Invalidate old triplet | |
| existing.expired_at = triplet.valid_at | |
| logger.info(f"Invalidated old triplet: {existing.subject}-{existing.predicate}") | |
| self.triplets.append(triplet) | |
| self.save() | |
| def find_current( | |
| self, | |
| subject: str, | |
| predicate: str, | |
| at_time: Optional[datetime] = None | |
| ) -> Optional[Triplet]: | |
| """Find the current valid triplet for a subject-predicate pair""" | |
| at_time = at_time or datetime.now() | |
| for triplet in reversed(self.triplets): # Check most recent first | |
| if (triplet.subject == subject and | |
| triplet.predicate == predicate and | |
| triplet.valid_at <= at_time and | |
| (triplet.expired_at is None or triplet.expired_at > at_time)): | |
| return triplet | |
| return None | |
| def get_history( | |
| self, | |
| subject: str, | |
| predicate: Optional[str] = None | |
| ) -> List[Triplet]: | |
| """Get full history for a subject""" | |
| history = [] | |
| for triplet in self.triplets: | |
| if triplet.subject == subject: | |
| if predicate is None or triplet.predicate == predicate: | |
| history.append(triplet) | |
| return sorted(history, key=lambda t: t.valid_at) | |
| def query_timerange( | |
| self, | |
| start_date: datetime, | |
| end_date: datetime, | |
| predicate: Optional[str] = None | |
| ) -> List[Triplet]: | |
| """Query all triplets valid within a time range""" | |
| results = [] | |
| for triplet in self.triplets: | |
| if (triplet.valid_at >= start_date and | |
| triplet.valid_at <= end_date): | |
| if predicate is None or triplet.predicate == predicate: | |
| results.append(triplet) | |
| return results | |
| def save(self) -> None: | |
| """Save graph to disk""" | |
| data = { | |
| 'triplets': [t.to_dict() for t in self.triplets], | |
| 'last_updated': datetime.now().isoformat() | |
| } | |
| with open(self.storage_path, 'w') as f: | |
| json.dump(data, f, indent=2) | |
| def load(self) -> None: | |
| """Load graph from disk""" | |
| if not self.storage_path.exists(): | |
| return | |
| try: | |
| with open(self.storage_path, 'r') as f: | |
| data = json.load(f) | |
| self.triplets = [ | |
| Triplet.from_dict(t) for t in data.get('triplets', []) | |
| ] | |
| logger.info(f"Loaded {len(self.triplets)} triplets from storage") | |
| except Exception as e: | |
| logger.error(f"Error loading temporal graph: {e}") | |
| class TemporalApplicationTracker: | |
| """ | |
| Track job applications with full temporal history | |
| Maintains versioned states and changes over time | |
| """ | |
| def __init__(self): | |
| self.graph = TemporalKnowledgeGraph("application_history.json") | |
| def track_application( | |
| self, | |
| job: JobPosting, | |
| status: str, | |
| metadata: Optional[Dict] = None | |
| ) -> None: | |
| """Track a new application or status change""" | |
| job_id = self._get_job_id(job) | |
| now = datetime.now() | |
| # Core application triplets | |
| triplets = [ | |
| Triplet(job_id, "company", job.company, now), | |
| Triplet(job_id, "position", job.title, now), | |
| Triplet(job_id, "status", status, now), | |
| Triplet(job_id, "applied_date", now.isoformat(), now), | |
| ] | |
| # Optional fields | |
| if job.location: | |
| triplets.append(Triplet(job_id, "location", job.location, now)) | |
| if job.salary: | |
| triplets.append(Triplet(job_id, "salary", job.salary, now)) | |
| if job.url: | |
| triplets.append(Triplet(job_id, "url", job.url, now)) | |
| # Add metadata as triplets | |
| if metadata: | |
| for key, value in metadata.items(): | |
| triplets.append( | |
| Triplet(job_id, f"meta_{key}", value, now, metadata={'source': 'metadata'}) | |
| ) | |
| # Add all triplets | |
| for triplet in triplets: | |
| self.graph.add_triplet(triplet) | |
| logger.info(f"Tracked application for {job.company} - {job.title}") | |
| def update_status( | |
| self, | |
| job_id: str, | |
| new_status: str, | |
| notes: Optional[str] = None | |
| ) -> None: | |
| """Update application status""" | |
| now = datetime.now() | |
| # Add new status triplet (old one auto-invalidated) | |
| self.graph.add_triplet( | |
| Triplet(job_id, "status", new_status, now) | |
| ) | |
| # Add notes if provided | |
| if notes: | |
| self.graph.add_triplet( | |
| Triplet(job_id, "status_notes", notes, now, metadata={'type': 'note'}) | |
| ) | |
| # Track status change event | |
| self.graph.add_triplet( | |
| Triplet( | |
| job_id, | |
| "status_changed", | |
| f"Changed to {new_status}", | |
| now, | |
| metadata={'event_type': 'status_change'} | |
| ) | |
| ) | |
| def add_interview( | |
| self, | |
| job_id: str, | |
| interview_date: datetime, | |
| interview_type: str, | |
| notes: Optional[str] = None | |
| ) -> None: | |
| """Track interview scheduling""" | |
| now = datetime.now() | |
| self.graph.add_triplet( | |
| Triplet( | |
| job_id, | |
| "interview_scheduled", | |
| interview_date.isoformat(), | |
| now, | |
| metadata={'type': interview_type} | |
| ) | |
| ) | |
| if notes: | |
| self.graph.add_triplet( | |
| Triplet(job_id, "interview_notes", notes, now) | |
| ) | |
| # Auto-update status | |
| self.update_status(job_id, "interview_scheduled") | |
| def get_application_timeline(self, job_id: str) -> List[Dict]: | |
| """Get complete timeline for an application""" | |
| history = self.graph.get_history(job_id) | |
| timeline = [] | |
| for triplet in history: | |
| timeline.append({ | |
| 'date': triplet.valid_at.isoformat(), | |
| 'event': f"{triplet.predicate}: {triplet.object}", | |
| 'expired': triplet.expired_at is not None | |
| }) | |
| return timeline | |
| def get_active_applications(self) -> List[Dict]: | |
| """Get all currently active applications""" | |
| # Find all unique job IDs | |
| job_ids = set() | |
| for triplet in self.graph.triplets: | |
| if triplet.subject.startswith('JOB_'): | |
| job_ids.add(triplet.subject) | |
| active = [] | |
| for job_id in job_ids: | |
| status = self.graph.find_current(job_id, "status") | |
| if status and status.object not in ['rejected', 'withdrawn', 'archived']: | |
| company = self.graph.find_current(job_id, "company") | |
| position = self.graph.find_current(job_id, "position") | |
| active.append({ | |
| 'job_id': job_id, | |
| 'company': company.object if company else 'Unknown', | |
| 'position': position.object if position else 'Unknown', | |
| 'status': status.object, | |
| 'last_updated': status.valid_at.isoformat() | |
| }) | |
| return active | |
| def analyze_patterns(self) -> Dict[str, Any]: | |
| """Analyze application patterns over time""" | |
| now = datetime.now() | |
| # Applications per week | |
| week_ago = now - timedelta(days=7) | |
| month_ago = now - timedelta(days=30) | |
| week_apps = self.graph.query_timerange(week_ago, now, "status") | |
| month_apps = self.graph.query_timerange(month_ago, now, "status") | |
| # Status distribution | |
| status_counts = {} | |
| for triplet in self.graph.triplets: | |
| if triplet.predicate == "status" and triplet.expired_at is None: | |
| status = triplet.object | |
| status_counts[status] = status_counts.get(status, 0) + 1 | |
| # Response rate | |
| total_apps = len([t for t in self.graph.triplets if t.predicate == "status" and t.object == "applied"]) | |
| responses = len([t for t in self.graph.triplets if t.predicate == "status" and t.object in ["interview_scheduled", "rejected", "offer"]]) | |
| response_rate = (responses / total_apps * 100) if total_apps > 0 else 0 | |
| return { | |
| 'applications_this_week': len(week_apps), | |
| 'applications_this_month': len(month_apps), | |
| 'status_distribution': status_counts, | |
| 'response_rate': f"{response_rate:.1f}%", | |
| 'total_applications': total_apps | |
| } | |
| def _get_job_id(self, job: JobPosting) -> str: | |
| """Generate consistent job ID""" | |
| if job.id: | |
| return job.id | |
| # Generate ID from company and title | |
| key = f"{job.company}_{job.title}".lower().replace(' ', '_') | |
| hash_val = hashlib.md5(key.encode()).hexdigest()[:8] | |
| return f"JOB_{hash_val}" | |
| class TemporalInvalidationAgent: | |
| """ | |
| Agent that checks for and invalidates outdated information | |
| Based on the invalidation pattern from the article | |
| """ | |
| def __init__(self, graph: TemporalKnowledgeGraph): | |
| self.graph = graph | |
| def check_contradictions( | |
| self, | |
| new_triplet: Triplet, | |
| threshold: float = 0.8 | |
| ) -> Optional[Triplet]: | |
| """Check if new triplet contradicts existing ones""" | |
| # Find existing triplets with same subject-predicate | |
| existing = self.graph.find_current( | |
| new_triplet.subject, | |
| new_triplet.predicate | |
| ) | |
| if not existing: | |
| return None | |
| # Check for contradiction | |
| if existing.object != new_triplet.object: | |
| # Calculate confidence in contradiction | |
| time_diff = (new_triplet.valid_at - existing.valid_at).total_seconds() | |
| # More recent info is more likely to be correct | |
| if time_diff > 0: # New triplet is more recent | |
| confidence = min(1.0, time_diff / (24 * 3600)) # Max confidence after 1 day | |
| if confidence > threshold: | |
| return existing # Return triplet to invalidate | |
| return None | |
| def cleanup_expired(self, days_old: int = 90) -> int: | |
| """Archive triplets older than specified days""" | |
| cutoff = datetime.now() - timedelta(days=days_old) | |
| archived = 0 | |
| for triplet in self.graph.triplets: | |
| if triplet.expired_at and triplet.expired_at < cutoff: | |
| # Move to archive (in real implementation) | |
| triplet.metadata['archived'] = True | |
| archived += 1 | |
| if archived > 0: | |
| self.graph.save() | |
| logger.info(f"Archived {archived} expired triplets") | |
| return archived | |
| # Usage example | |
| def demo_temporal_tracking(): | |
| """Demonstrate temporal tracking""" | |
| tracker = TemporalApplicationTracker() | |
| # Create sample job | |
| job = JobPosting( | |
| id="JOB_001", | |
| title="Senior Software Engineer", | |
| company="TechCorp", | |
| location="San Francisco", | |
| salary="$150k-$200k", | |
| url="https://techcorp.com/jobs/123" | |
| ) | |
| # Track initial application | |
| tracker.track_application(job, "applied", { | |
| 'cover_letter_version': 'v1', | |
| 'resume_version': 'v2' | |
| }) | |
| # Simulate status updates over time | |
| import time | |
| time.sleep(1) | |
| tracker.update_status("JOB_001", "screening", "Passed initial ATS scan") | |
| time.sleep(1) | |
| tracker.add_interview( | |
| "JOB_001", | |
| datetime.now() + timedelta(days=7), | |
| "phone_screen", | |
| "30 min call with hiring manager" | |
| ) | |
| # Get timeline | |
| timeline = tracker.get_application_timeline("JOB_001") | |
| print("Application Timeline:") | |
| for event in timeline: | |
| print(f" {event['date']}: {event['event']}") | |
| # Get active applications | |
| active = tracker.get_active_applications() | |
| print(f"\nActive Applications: {len(active)}") | |
| # Analyze patterns | |
| patterns = tracker.analyze_patterns() | |
| print(f"\nPatterns: {patterns}") | |
| if __name__ == "__main__": | |
| demo_temporal_tracking() |