akryldigital's picture
Pilot (#2)
92633a7 verified
"""Report service for managing report operations."""
from typing import Dict, List, Any, Optional
from .metadata import get_report_metadata, get_available_sources, get_source_subtypes
class ReportService:
"""Service class for report operations."""
def __init__(self, chunks: List[Dict[str, Any]] = None):
"""
Initialize report service.
Args:
chunks: List of chunk dictionaries
"""
self.chunks = chunks or []
self.metadata = get_report_metadata(self.chunks) if self.chunks else {}
def get_available_sources(self) -> List[str]:
"""Get available report sources."""
if self.metadata:
return self.metadata.get("sources", [])
return get_available_sources()
def get_available_reports(self) -> List[str]:
"""Get available report filenames."""
return self.metadata.get("filenames", [])
def get_source_subtypes(self) -> Dict[str, List[str]]:
"""Get source to subtype mapping."""
# For now, use the placeholder function
# In a full implementation, this would be derived from actual data
return get_source_subtypes()
def get_reports_by_source(self, source: str) -> List[str]:
"""
Get reports filtered by source.
Args:
source: Source category
Returns:
List of report filenames
"""
if not self.chunks:
return []
reports = set()
for chunk in self.chunks:
metadata = chunk.get("metadata", {})
if metadata.get("source") == source:
filename = metadata.get("filename")
if filename:
reports.add(filename)
return sorted(list(reports))
def get_years_by_source(self, source: str) -> List[str]:
"""
Get years available for a specific source.
Args:
source: Source category
Returns:
List of years
"""
if not self.chunks:
return []
years = set()
for chunk in self.chunks:
metadata = chunk.get("metadata", {})
if metadata.get("source") == source:
year = metadata.get("year")
if year:
years.add(year)
return sorted(list(years))
def search_reports(self, query: str) -> List[str]:
"""
Search for reports by name.
Args:
query: Search query
Returns:
List of matching report filenames
"""
if not self.chunks:
return []
query_lower = query.lower()
matching_reports = set()
for chunk in self.chunks:
metadata = chunk.get("metadata", {})
filename = metadata.get("filename", "")
if query_lower in filename.lower():
matching_reports.add(filename)
return sorted(list(matching_reports))
def get_report_info(self, filename: str) -> Dict[str, Any]:
"""
Get information about a specific report.
Args:
filename: Report filename
Returns:
Dictionary with report information
"""
if not self.chunks:
return {}
report_info = {
"filename": filename,
"chunk_count": 0,
"sources": set(),
"years": set(),
"total_content_length": 0
}
for chunk in self.chunks:
metadata = chunk.get("metadata", {})
if metadata.get("filename") == filename:
report_info["chunk_count"] += 1
report_info["total_content_length"] += len(chunk.get("content", ""))
if "source" in metadata:
report_info["sources"].add(metadata["source"])
if "year" in metadata:
report_info["years"].add(metadata["year"])
# Convert sets to lists
report_info["sources"] = list(report_info["sources"])
report_info["years"] = list(report_info["years"])
return report_info