Spaces:
Sleeping
Sleeping
| """Report service for managing report operations.""" | |
| from typing import Dict, List, Any, Optional | |
| from .metadata import get_report_metadata, get_available_sources, get_source_subtypes | |
| class ReportService: | |
| """Service class for report operations.""" | |
| def __init__(self, chunks: List[Dict[str, Any]] = None): | |
| """ | |
| Initialize report service. | |
| Args: | |
| chunks: List of chunk dictionaries | |
| """ | |
| self.chunks = chunks or [] | |
| self.metadata = get_report_metadata(self.chunks) if self.chunks else {} | |
| def get_available_sources(self) -> List[str]: | |
| """Get available report sources.""" | |
| if self.metadata: | |
| return self.metadata.get("sources", []) | |
| return get_available_sources() | |
| def get_available_reports(self) -> List[str]: | |
| """Get available report filenames.""" | |
| return self.metadata.get("filenames", []) | |
| def get_source_subtypes(self) -> Dict[str, List[str]]: | |
| """Get source to subtype mapping.""" | |
| # For now, use the placeholder function | |
| # In a full implementation, this would be derived from actual data | |
| return get_source_subtypes() | |
| def get_reports_by_source(self, source: str) -> List[str]: | |
| """ | |
| Get reports filtered by source. | |
| Args: | |
| source: Source category | |
| Returns: | |
| List of report filenames | |
| """ | |
| if not self.chunks: | |
| return [] | |
| reports = set() | |
| for chunk in self.chunks: | |
| metadata = chunk.get("metadata", {}) | |
| if metadata.get("source") == source: | |
| filename = metadata.get("filename") | |
| if filename: | |
| reports.add(filename) | |
| return sorted(list(reports)) | |
| def get_years_by_source(self, source: str) -> List[str]: | |
| """ | |
| Get years available for a specific source. | |
| Args: | |
| source: Source category | |
| Returns: | |
| List of years | |
| """ | |
| if not self.chunks: | |
| return [] | |
| years = set() | |
| for chunk in self.chunks: | |
| metadata = chunk.get("metadata", {}) | |
| if metadata.get("source") == source: | |
| year = metadata.get("year") | |
| if year: | |
| years.add(year) | |
| return sorted(list(years)) | |
| def search_reports(self, query: str) -> List[str]: | |
| """ | |
| Search for reports by name. | |
| Args: | |
| query: Search query | |
| Returns: | |
| List of matching report filenames | |
| """ | |
| if not self.chunks: | |
| return [] | |
| query_lower = query.lower() | |
| matching_reports = set() | |
| for chunk in self.chunks: | |
| metadata = chunk.get("metadata", {}) | |
| filename = metadata.get("filename", "") | |
| if query_lower in filename.lower(): | |
| matching_reports.add(filename) | |
| return sorted(list(matching_reports)) | |
| def get_report_info(self, filename: str) -> Dict[str, Any]: | |
| """ | |
| Get information about a specific report. | |
| Args: | |
| filename: Report filename | |
| Returns: | |
| Dictionary with report information | |
| """ | |
| if not self.chunks: | |
| return {} | |
| report_info = { | |
| "filename": filename, | |
| "chunk_count": 0, | |
| "sources": set(), | |
| "years": set(), | |
| "total_content_length": 0 | |
| } | |
| for chunk in self.chunks: | |
| metadata = chunk.get("metadata", {}) | |
| if metadata.get("filename") == filename: | |
| report_info["chunk_count"] += 1 | |
| report_info["total_content_length"] += len(chunk.get("content", "")) | |
| if "source" in metadata: | |
| report_info["sources"].add(metadata["source"]) | |
| if "year" in metadata: | |
| report_info["years"].add(metadata["year"]) | |
| # Convert sets to lists | |
| report_info["sources"] = list(report_info["sources"]) | |
| report_info["years"] = list(report_info["years"]) | |
| return report_info | |