Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import os | |
| import json | |
| import shutil | |
| import tempfile | |
| from app_kit.config import load_app_config | |
| from app_kit.demo_packs import load_demo_pack | |
| from app_kit.logging_utils import setup_logging | |
| from app_kit.model_registry import load_model_registry | |
| from app_kit.project import ProjectSpec | |
| from app_kit.storage import SQLiteStore | |
| from app_kit.tracing import utc_now, write_trace_artifact | |
| THEME_CSS_PATH = Path(__file__).resolve().parents[2] / "assets" / "theme.css" | |
| class AppRuntime: | |
| spec: ProjectSpec | |
| config: object | |
| store: SQLiteStore | |
| registry: dict | |
| def run_pack_with_trace(spec: ProjectSpec, store: SQLiteStore, config: object, path: str): | |
| demo_pack = load_demo_pack(path) | |
| started_at = utc_now() | |
| output = spec.run_pack(demo_pack, store, config) | |
| finished_at = utc_now() | |
| trace_payload = { | |
| 'kind': 'app-load', | |
| 'project': spec.key, | |
| 'pack_id': demo_pack.pack_id, | |
| 'pack_path': str(path), | |
| 'started_at': started_at, | |
| 'finished_at': finished_at, | |
| 'result': output, | |
| } | |
| if isinstance(output, dict): | |
| for key in ('model_name', 'model_id', 'adapter_name', 'generation_stats'): | |
| if key in output and output[key] not in (None, '', [], {}, ()): | |
| trace_payload[key] = output[key] | |
| trace_path = write_trace_artifact( | |
| config.artifact_dir, | |
| trace_payload, | |
| ) | |
| return output, f'β Loaded **{demo_pack.pack_id}** successfully! Trace artifact written.', trace_path | |
| def _format_pipeline_result(output: dict | list | None) -> str: | |
| """Format pipeline result as readable Markdown instead of raw JSON.""" | |
| if not output: | |
| return "" | |
| if isinstance(output, list): | |
| output = output[0] if output else {} | |
| lines = [] | |
| # Triage badge | |
| triage = output.get('triage', '') | |
| triage_icons = {'urgent': 'π΄ URGENT', 'important': 'π‘ IMPORTANT', 'FYI': 'π’ FYI'} | |
| triage_display = triage_icons.get(triage, triage.upper()) | |
| lines.append(f"### {triage_display}") | |
| lines.append("") | |
| # Summary | |
| summary = output.get('summary', '') | |
| if summary: | |
| lines.append(f"**Summary:** {summary}") | |
| lines.append("") | |
| # Q&A Section | |
| qa = output.get('qa', []) | |
| if qa: | |
| lines.append("---") | |
| lines.append("### π Document Analysis") | |
| for item in qa: | |
| q = item.get('question', '') | |
| a = item.get('answer', 'not stated') | |
| icon = 'β ' if a != 'not stated' else 'β' | |
| lines.append(f"- {icon} **{q}**") | |
| lines.append(f" > {a}") | |
| lines.append("") | |
| # File info | |
| file_type = output.get('file_type', '') | |
| source_file = output.get('source_file', '') | |
| title = output.get('title', '') | |
| if title or file_type: | |
| lines.append("---") | |
| lines.append(f"π **Document:** {title} ({file_type})") | |
| # Inbox items | |
| inbox_items = output.get('inbox_items', []) | |
| if inbox_items and len(inbox_items) > 1: | |
| lines.append("") | |
| lines.append("### π₯ Processed Documents") | |
| for item in inbox_items: | |
| t = item.get('triage', '') | |
| badge = triage_icons.get(t, t) | |
| lines.append(f"- {badge} **{item.get('title', 'Untitled')}** β {item.get('summary', '')[:120]}") | |
| return "\n".join(lines) | |
| def _format_search_results(results: list | None) -> str: | |
| """Format search results as readable Markdown.""" | |
| if not results: | |
| return "*No results found. Try a different search query.*" | |
| lines = ["### π Search Results", ""] | |
| for i, result in enumerate(results, 1): | |
| title = result.get('title', 'Untitled') | |
| text = result.get('primary_text', '')[:200] | |
| status = result.get('status', '') | |
| lines.append(f"**{i}. {title}** `{status}`") | |
| lines.append(f"> {text}") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def _format_history(records: list | None) -> str: | |
| """Format history/inbox as readable Markdown.""" | |
| if not records: | |
| return "*No records yet. Upload a document to get started.*" | |
| lines = ["### π₯ Document History", ""] | |
| triage_icons = {'urgent': 'π΄', 'important': 'π‘', 'FYI': 'π’'} | |
| for record in records: | |
| title = record.get('title', 'Untitled') | |
| created = record.get('created_at', '')[:19] | |
| try: | |
| blob = json.loads(record.get('json_blob', '{}')) if isinstance(record.get('json_blob'), str) else record.get('json_blob', {}) | |
| except Exception: | |
| blob = {} | |
| triage = blob.get('triage', '') | |
| icon = triage_icons.get(triage, 'π') | |
| summary = blob.get('summary', record.get('primary_text', ''))[:150] | |
| lines.append(f"{icon} **{title}** β `{created}`") | |
| lines.append(f"> {summary}") | |
| lines.append("") | |
| return "\n".join(lines) | |
| def _process_uploaded_files(files, spec, store, config): | |
| """Process uploaded files through the pipeline.""" | |
| from app_kit.demo_packs import DemoPack | |
| if not files: | |
| return "β οΈ No files uploaded.", "Please upload one or more documents." | |
| # Copy uploaded files to a temp directory that looks like a demo pack | |
| temp_dir = Path(tempfile.mkdtemp(prefix="upload_")) | |
| file_paths = [] | |
| for f in files: | |
| src = Path(f) | |
| dst = temp_dir / src.name | |
| shutil.copy2(src, dst) | |
| file_paths.append(dst) | |
| manifest_path = temp_dir / "manifest.json" | |
| if not any(f.name in ("manifest.json", "manifest.yaml", "manifest.yml") for f in file_paths): | |
| import json | |
| manifest_data = { | |
| "project": spec.key, | |
| "pack_id": f"upload_{temp_dir.name.split('_')[-1]}", | |
| "inputs": [{"path": p.name, "kind": "document"} for p in file_paths] | |
| } | |
| manifest_path.write_text(json.dumps(manifest_data), encoding='utf-8') | |
| try: | |
| # Build a minimal DemoPack-like structure | |
| demo_pack = load_demo_pack(str(temp_dir)) | |
| started_at = utc_now() | |
| output = spec.run_pack(demo_pack, store, config) | |
| finished_at = utc_now() | |
| trace_payload = { | |
| 'kind': 'app-upload', | |
| 'project': spec.key, | |
| 'pack_id': demo_pack.pack_id, | |
| 'file_count': len(file_paths), | |
| 'started_at': started_at, | |
| 'finished_at': finished_at, | |
| 'result': output, | |
| } | |
| if isinstance(output, dict): | |
| for key in ('model_name', 'model_id', 'adapter_name', 'generation_stats'): | |
| if key in output and output[key] not in (None, '', [], {}, ()): | |
| trace_payload[key] = output[key] | |
| write_trace_artifact( | |
| config.artifact_dir, | |
| trace_payload, | |
| ) | |
| formatted = _format_pipeline_result(output) | |
| status = f"β Processed {len(file_paths)} document(s) successfully." | |
| return formatted, status | |
| except Exception as e: | |
| return f"β **Error processing documents:** {e}", f"Error: {e}" | |
| def run_app(spec: ProjectSpec) -> int: | |
| import gradio as gr | |
| config = load_app_config(spec.key) | |
| logger = setup_logging(spec.key) | |
| registry = load_model_registry(config.model_registry_path) | |
| logger.info('%s app listening', spec.key.upper()) | |
| store = SQLiteStore(config.sqlite_path, config.artifact_dir) | |
| # Friendly titles | |
| display_titles = { | |
| 'p1': ('Elder Care Document Assistant', 'Upload documents to get instant triage, summaries, and action items for elderly care paperwork.'), | |
| 'p4': ('Household Food Waste Tracker', 'Upload receipts and fridge notes to generate waste analysis reports.'), | |
| } | |
| display_title = display_titles.get(spec.key, (spec.title, spec.description)) | |
| with gr.Blocks(title=display_title[0], css_paths=THEME_CSS_PATH) as demo: | |
| # Header | |
| gr.Markdown(f"""# {display_title[0]} | |
| {display_title[1]}""") | |
| with gr.Tabs(): | |
| with gr.Tab("π App Workspace"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # File upload area | |
| file_upload = gr.File( | |
| label="π Upload Documents", | |
| file_count="multiple", | |
| file_types=[".pdf", ".png", ".jpg", ".jpeg", ".txt", ".md", ".json", ".csv"], | |
| type="filepath", | |
| elem_classes=["upload-area"], | |
| ) | |
| status_display = gr.Markdown( | |
| value="*Upload documents above to get started.*", | |
| elem_classes=["status-box"], | |
| ) | |
| upload_btn = gr.Button("π€ Process Documents", variant="primary", size="lg") | |
| with gr.Column(scale=3): | |
| # Pipeline result display | |
| result_display = gr.Markdown( | |
| value="### π Welcome\nUpload a PDF, image, or text document to see the AI-powered triage and analysis.", | |
| elem_classes=["result-card"], | |
| ) | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| search_query = gr.Textbox( | |
| label="π Search Documents", | |
| placeholder="Type a keyword to search your document history...", | |
| elem_classes=["search-box"], | |
| ) | |
| search_btn = gr.Button("Search", variant="secondary") | |
| with gr.Column(scale=2): | |
| search_result_display = gr.Markdown( | |
| value="*Enter a search query to find documents.*", | |
| elem_classes=["result-card"], | |
| ) | |
| gr.Markdown("---") | |
| # History section | |
| gr.Markdown("### π Document History") | |
| history_display = gr.Markdown( | |
| value="*No documents processed yet.*", | |
| elem_classes=["history-card"], | |
| ) | |
| refresh_btn = gr.Button("π Refresh History", variant="secondary") | |
| with gr.Tab("π How It Works"): | |
| if spec.key == 'p1': | |
| gr.Markdown( | |
| """ | |
| ### How to use the Elder Care Document Assistant | |
| 1. **Upload Documents:** Drag and drop or click the **Upload Documents** area to upload paperwork, medical receipts, invoices, or letters related to elder care (supports PDF, images, text). | |
| 2. **Process:** Click the **Process Documents** button. The local AI agent will parse the text, assign a triage level (e.g., `π΄ URGENT`, `π‘ IMPORTANT`, `π’ FYI`), extract a concise summary, and answer relevant clinical or administrative questions. | |
| 3. **View Results:** The AI output will be displayed immediately as a formatted card. | |
| 4. **Search and Reference:** Use the **Search Documents** feature to search past logs by query keyword. Click **Refresh History** to fetch the full database history of processed files. | |
| *All data is stored and processed locally on your offline device for compliance and privacy.* | |
| """ | |
| ) | |
| else: # p4 | |
| gr.Markdown( | |
| """ | |
| ### How to use the Household Food Waste Tracker | |
| 1. **Upload Grocery Data:** Drag and drop or browse shopping receipts, food inventory CSVs, or daily logs of discarded food. | |
| 2. **Analyze Waste:** Click the **Process Documents** button to analyze purchases, flag high-risk perishables, estimate shelf-lives, and generate a household food conservation summary. | |
| 3. **View Diagnostics:** Review the formatted report detailing waste trends, warnings, and sustainability tips. | |
| 4. **Search & History:** Retrieve previous inventory reviews using the **Search** box, and click **Refresh History** to list your cumulative food waste entries. | |
| *Processes data locally to ensure household privacy and secure offline storage.* | |
| """ | |
| ) | |
| # Event handlers | |
| def handle_upload(files): | |
| if not files: | |
| return "### π Welcome\nUpload a PDF, image, or text document to see the AI-powered triage and analysis.", "*Please upload at least one file.*" | |
| formatted, status = _process_uploaded_files(files, spec, store, config) | |
| return formatted, status | |
| def refresh_history(_=None): | |
| records = store.history(spec.key) | |
| return _format_history(records) | |
| def search_history(query: str): | |
| if not spec.search_enabled: | |
| return "*Search is not enabled for this project.*" | |
| if not query.strip(): | |
| return "*Enter a search query to find documents.*" | |
| results = store.search_records(spec.key, query) | |
| return _format_search_results(results) | |
| upload_btn.click( | |
| handle_upload, | |
| inputs=[file_upload], | |
| outputs=[result_display, status_display], | |
| ) | |
| refresh_btn.click(refresh_history, inputs=[], outputs=[history_display]) | |
| search_btn.click(search_history, inputs=[search_query], outputs=[search_result_display]) | |
| server_name = os.environ.get('GRADIO_SERVER_NAME', '0.0.0.0') | |
| server_port = int(os.environ.get('PORT', '7860')) | |
| demo.launch( | |
| server_name=server_name, | |
| show_error=True, | |
| share=False, | |
| ) | |
| return 0 | |