Spaces:
Sleeping
Sleeping
feat: Implement core gazette parsing, data extraction, and foundational AI auditor system components.
f0c339c | # Dashboard Generator - Issue at a Glance | |
| from typing import Dict, List, Any, Optional | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy import func | |
| from datetime import datetime | |
| from models import ( | |
| GazetteIssue, Decree, BankruptcyCase, Tender, | |
| GeneralAssembly, CommercialAgency, CompanyAnnouncement | |
| ) | |
| def get_issue_summary(session: Session, issue_id: int) -> Dict[str, Any]: | |
| """Generate an 'Issue at a Glance' summary for a gazette issue.""" | |
| # Get base issue info | |
| issue = session.query(GazetteIssue).filter(GazetteIssue.id == issue_id).first() | |
| if not issue: | |
| return {"error": "Issue not found"} | |
| summary = { | |
| "issue_info": { | |
| "issue_number": issue.issue_number, | |
| "issue_date": issue.issue_date.isoformat() if issue.issue_date else None, | |
| "hijri_date": issue.hijri_date, | |
| "year": issue.year, | |
| "total_pages": issue.total_pages, | |
| "processed_at": issue.processed_at.isoformat() if issue.processed_at else None | |
| }, | |
| "counts": {}, | |
| "highlights": [] | |
| } | |
| # Count each entity type | |
| counts = summary["counts"] | |
| # Decrees | |
| decree_count = session.query(func.count(Decree.id)).filter( | |
| Decree.issue_id == issue_id | |
| ).scalar() | |
| if decree_count: | |
| counts["decrees"] = decree_count | |
| summary["highlights"].append(f"π {decree_count} Decrees & Ministerial Decisions") | |
| # Bankruptcy Cases | |
| bankruptcy_count = session.query(func.count(BankruptcyCase.id)).filter( | |
| BankruptcyCase.issue_id == issue_id | |
| ).scalar() | |
| if bankruptcy_count: | |
| counts["bankruptcy_cases"] = bankruptcy_count | |
| summary["highlights"].append(f"βοΈ {bankruptcy_count} Bankruptcy Cases") | |
| # Tenders | |
| tender_count = session.query(func.count(Tender.id)).filter( | |
| Tender.issue_id == issue_id | |
| ).scalar() | |
| if tender_count: | |
| counts["tenders"] = tender_count | |
| # Count postponed | |
| postponed = session.query(func.count(Tender.id)).filter( | |
| Tender.issue_id == issue_id, | |
| Tender.is_postponed == True | |
| ).scalar() | |
| if postponed: | |
| summary["highlights"].append(f"π {tender_count} Tenders ({postponed} postponed)") | |
| else: | |
| summary["highlights"].append(f"π {tender_count} Tenders & Practices") | |
| # General Assemblies | |
| assembly_count = session.query(func.count(GeneralAssembly.id)).filter( | |
| GeneralAssembly.issue_id == issue_id | |
| ).scalar() | |
| if assembly_count: | |
| counts["general_assemblies"] = assembly_count | |
| summary["highlights"].append(f"π’ {assembly_count} General Assembly Minutes") | |
| # Commercial Agencies | |
| agency_count = session.query(func.count(CommercialAgency.id)).filter( | |
| CommercialAgency.issue_id == issue_id | |
| ).scalar() | |
| if agency_count: | |
| counts["commercial_agencies"] = agency_count | |
| summary["highlights"].append(f"π€ {agency_count} Commercial Agency Registrations") | |
| # Company Announcements | |
| announcement_count = session.query(func.count(CompanyAnnouncement.id)).filter( | |
| CompanyAnnouncement.issue_id == issue_id | |
| ).scalar() | |
| if announcement_count: | |
| counts["company_announcements"] = announcement_count | |
| summary["highlights"].append(f"π’ {announcement_count} Company Announcements") | |
| # Total entities | |
| total = sum(counts.values()) | |
| counts["total_entities"] = total | |
| return summary | |
| def get_ministry_breakdown(session: Session, issue_id: int) -> Dict[str, int]: | |
| """Get breakdown of tenders by ministry.""" | |
| results = session.query( | |
| Tender.ministry, | |
| func.count(Tender.id).label('count') | |
| ).filter( | |
| Tender.issue_id == issue_id | |
| ).group_by( | |
| Tender.ministry | |
| ).all() | |
| return {row.ministry or "Unknown": row.count for row in results} | |
| def get_agency_countries(session: Session, issue_id: int) -> Dict[str, int]: | |
| """Get breakdown of commercial agencies by country.""" | |
| results = session.query( | |
| CommercialAgency.principal_country, | |
| func.count(CommercialAgency.id).label('count') | |
| ).filter( | |
| CommercialAgency.issue_id == issue_id | |
| ).group_by( | |
| CommercialAgency.principal_country | |
| ).all() | |
| return {row.principal_country or "Unknown": row.count for row in results} | |
| def get_company_types(session: Session, issue_id: int) -> Dict[str, int]: | |
| """Get breakdown of general assemblies by company type.""" | |
| results = session.query( | |
| GeneralAssembly.company_type, | |
| func.count(GeneralAssembly.id).label('count') | |
| ).filter( | |
| GeneralAssembly.issue_id == issue_id | |
| ).group_by( | |
| GeneralAssembly.company_type | |
| ).all() | |
| return {row.company_type or "Unknown": row.count for row in results} | |
| def generate_dashboard_data(session: Session, issue_id: int) -> Dict[str, Any]: | |
| """Generate complete dashboard data for an issue.""" | |
| return { | |
| "summary": get_issue_summary(session, issue_id), | |
| "ministry_breakdown": get_ministry_breakdown(session, issue_id), | |
| "agency_countries": get_agency_countries(session, issue_id), | |
| "company_types": get_company_types(session, issue_id) | |
| } | |
| # Query helpers for chat interface | |
| def count_entities(session: Session, entity_type: str, filters: Dict = None) -> int: | |
| """Count entities of a specific type with optional filters.""" | |
| model_map = { | |
| "decree": Decree, | |
| "bankruptcy": BankruptcyCase, | |
| "tender": Tender, | |
| "assembly": GeneralAssembly, | |
| "agency": CommercialAgency, | |
| "announcement": CompanyAnnouncement | |
| } | |
| model = model_map.get(entity_type.lower()) | |
| if not model: | |
| return 0 | |
| query = session.query(func.count(model.id)) | |
| if filters: | |
| for key, value in filters.items(): | |
| if hasattr(model, key): | |
| query = query.filter(getattr(model, key) == value) | |
| return query.scalar() | |
| def list_entities( | |
| session: Session, | |
| entity_type: str, | |
| filters: Dict = None, | |
| limit: int = 50 | |
| ) -> List[Dict]: | |
| """List entities of a specific type with optional filters.""" | |
| model_map = { | |
| "decree": Decree, | |
| "bankruptcy": BankruptcyCase, | |
| "tender": Tender, | |
| "assembly": GeneralAssembly, | |
| "agency": CommercialAgency, | |
| "announcement": CompanyAnnouncement | |
| } | |
| model = model_map.get(entity_type.lower()) | |
| if not model: | |
| return [] | |
| query = session.query(model) | |
| if filters: | |
| for key, value in filters.items(): | |
| if hasattr(model, key): | |
| query = query.filter(getattr(model, key).ilike(f"%{value}%")) | |
| results = query.limit(limit).all() | |
| # Convert to dicts (basic serialization) | |
| entities = [] | |
| for row in results: | |
| entity = {} | |
| for column in row.__table__.columns: | |
| value = getattr(row, column.name) | |
| if isinstance(value, datetime): | |
| value = value.isoformat() | |
| entity[column.name] = value | |
| entities.append(entity) | |
| return entities | |