Spaces:
Runtime error
Runtime error
| """Utility helpers used across the Naexya Docs AI application. | |
| The project pulls together configuration, database persistence, and a Gradio | |
| interface. This module keeps shared helper functions in one place so they can | |
| be reused by both the UI and background processes. Each function includes | |
| extensive documentation that explains the intended behaviour, common edge | |
| cases, and recommended extension points. | |
| """ | |
| from __future__ import annotations | |
| import html | |
| import json | |
| import logging | |
| import re | |
| import sqlite3 | |
| from collections import defaultdict | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Mapping, Optional, Sequence, Set, Tuple | |
| try: # ``jinja2`` is optional at runtime but recommended for template rendering. | |
| from jinja2 import Template | |
| except ImportError: # pragma: no cover - executed only when dependency missing. | |
| Template = None # type: ignore[misc] | |
| from config import AI_PROVIDERS, EXPORT_TEMPLATES, SPECIFICATION_TYPES | |
| from database import DATABASE_PATH | |
| # Configure a dedicated logger so user action tracking and validation warnings | |
| # can be filtered or redirected by the application-wide logging configuration. | |
| LOGGER = logging.getLogger(__name__) | |
| # Resolve repository root and template directory. ``BASE_DIR`` allows us to | |
| # construct absolute paths for template loading even when the application is | |
| # executed from a different working directory (e.g. when packaged as a module). | |
| BASE_DIR = Path(__file__).resolve().parent | |
| TEMPLATES_DIR = BASE_DIR / "templates" | |
| # Mapping of canonical specification categories to Markdown template placeholders. | |
| MARKDOWN_SECTION_KEYS: Dict[str, str] = { | |
| "User Stories": "user_stories_section", | |
| "Features": "features_section", | |
| "API Endpoints": "api_endpoints_section", | |
| "Database Design": "database_design_section", | |
| "System Architecture": "system_architecture_section", | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Input validation helpers | |
| # --------------------------------------------------------------------------- | |
| def validate_api_key(provider: str, api_key: str) -> bool: | |
| """Perform lightweight validation of an API key for a given provider. | |
| The function does **not** make external HTTP requests. Instead it checks | |
| that the provider exists in :mod:`config`, ensures a key was supplied, and | |
| verifies that it is of a plausible length. Applications can call this when | |
| a user enters credentials to provide immediate feedback before attempting a | |
| full API call. | |
| Args: | |
| provider: Provider identifier matching a key in ``AI_PROVIDERS``. | |
| api_key: Raw string provided by the end user. | |
| Returns: | |
| ``True`` when the key passes basic validation. ``False`` is returned | |
| when validation fails but the caller prefers to handle messaging | |
| without exceptions. | |
| Raises: | |
| ValueError: If the provider is unknown or the key is blank. | |
| """ | |
| if provider not in AI_PROVIDERS: | |
| raise ValueError( | |
| f"Provider '{provider}' is not recognised. Please choose one of: " | |
| f"{', '.join(sorted(AI_PROVIDERS))}." | |
| ) | |
| if not isinstance(api_key, str) or not api_key.strip(): | |
| raise ValueError("API key must be a non-empty string.") | |
| cleaned = api_key.strip() | |
| if len(cleaned) < 8: | |
| LOGGER.warning( | |
| "API key for provider '%s' appears unusually short.", provider | |
| ) | |
| return False | |
| # Many providers rely on Authorization headers containing ``{api_key}``. | |
| header_format = AI_PROVIDERS[provider]["headers"].get("Authorization", "") | |
| if "{api_key}" not in header_format: | |
| LOGGER.debug( | |
| "Provider '%s' does not use a standard Authorization header template.", | |
| provider, | |
| ) | |
| return True | |
| # --------------------------------------------------------------------------- | |
| # Conversation formatting | |
| # --------------------------------------------------------------------------- | |
| def format_conversation_history( | |
| messages: Sequence[Mapping[str, Any]] | |
| ) -> str: | |
| """Create a readable transcript from stored conversation messages. | |
| Each message mapping should contain at minimum ``role`` and ``content`` | |
| keys, with ``timestamp`` being optional. ``format_conversation_history`` | |
| sorts the messages chronologically (when timestamps are available) and | |
| returns a newline-delimited string ready for display in the Gradio | |
| interface or export templates. | |
| Args: | |
| messages: Iterable of dictionary-like objects representing chat turns. | |
| Returns: | |
| A human-friendly string. When no messages are provided a helpful | |
| placeholder message is returned instead of an empty string. | |
| """ | |
| if not messages: | |
| return "No conversation history available yet." | |
| def _sort_key(message: Mapping[str, Any]): | |
| ts = message.get("timestamp") | |
| if isinstance(ts, datetime): | |
| return (0, ts) | |
| if isinstance(ts, str): | |
| try: | |
| parsed = datetime.fromisoformat(ts) | |
| return (0, parsed) | |
| except ValueError: | |
| return (1, ts) | |
| return (2, "") | |
| sorted_messages = sorted(messages, key=_sort_key) | |
| formatted_lines = [] | |
| for entry in sorted_messages: | |
| role = str(entry.get("role", "unknown")).title() | |
| timestamp = entry.get("timestamp") | |
| human_time = f" [{timestamp}]" if timestamp else "" | |
| content = entry.get("content", "") | |
| if not isinstance(content, str): | |
| content = str(content) | |
| formatted_lines.append(f"{role}{human_time}:\n{content.strip()}\n") | |
| return "\n".join(formatted_lines).strip() | |
| # --------------------------------------------------------------------------- | |
| # Export helpers | |
| # --------------------------------------------------------------------------- | |
| def _render_template(path: Path, context: Mapping[str, Any]) -> str: | |
| """Render a template file using Jinja2 when available. | |
| This private helper keeps file reading and template rendering consistent for | |
| both HTML and Markdown exports. When :mod:`jinja2` is unavailable the | |
| function gracefully falls back to Python's :py:meth:`str.format` syntax, | |
| ensuring the application still works albeit without advanced templating | |
| features like loops or conditionals. | |
| """ | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Template file '{path}' was not found.") | |
| template_text = path.read_text(encoding="utf-8") | |
| if Template is None: | |
| LOGGER.warning( | |
| "jinja2 is not installed; falling back to basic placeholder replacement for %s", | |
| path, | |
| ) | |
| if "{%" in template_text or "%}" in template_text: | |
| raise RuntimeError( | |
| "The export template requires jinja2 for conditional rendering. Install jinja2 to continue." | |
| ) | |
| rendered = template_text | |
| for key, value in context.items(): | |
| rendered = rendered.replace(f"{{{{{key}}}}}", str(value)) | |
| return rendered | |
| return Template(template_text).render(**context) | |
| def format_prompt(prompt: str) -> str: | |
| """Normalise user prompts before sending them to a provider.""" | |
| if not isinstance(prompt, str): | |
| raise ValueError("Prompt must be provided as a string.") | |
| cleaned = sanitize_input(prompt) | |
| if not cleaned: | |
| raise ValueError("Prompt cannot be empty after sanitisation.") | |
| return cleaned | |
| def render_export(template_name: str, context: Mapping[str, Any]) -> str: | |
| """Load a template from ``templates/`` and render it with ``context``.""" | |
| if not template_name or not isinstance(template_name, str): | |
| raise ValueError("Template name must be a non-empty string.") | |
| template_path = Path(template_name) | |
| if not template_path.is_absolute(): | |
| template_path = TEMPLATES_DIR / template_path | |
| return _render_template(template_path, context) | |
| def _group_specifications( | |
| specifications: Sequence[Mapping[str, Any]] | |
| ) -> Dict[str, List[Mapping[str, Any]]]: | |
| """Organise specification rows by their ``spec_type`` value.""" | |
| grouped: Dict[str, List[Mapping[str, Any]]] = {} | |
| for spec in specifications: | |
| spec_type = str(spec.get("spec_type") or "Uncategorised") | |
| grouped.setdefault(spec_type, []).append(spec) | |
| return grouped | |
| def _prepare_html_export_context( | |
| project_data: Mapping[str, Any], | |
| specifications: Sequence[Mapping[str, Any]], | |
| ) -> Dict[str, Any]: | |
| """Assemble template context for the HTML export.""" | |
| brand_name = str(project_data.get("brand_name") or "Naexya Docs AI").strip() or "Naexya Docs AI" | |
| project_name = str(project_data.get("name") or "Untitled Project").strip() or "Untitled Project" | |
| description_raw = project_data.get("description") | |
| project_description = ( | |
| html.escape(str(description_raw).strip()) if description_raw else "" | |
| ) | |
| project_created_at = _format_datetime_for_display(project_data.get("created_at")) | |
| project_identifier = html.escape(str(project_data.get("id") or "N/A")) | |
| grouped = _group_specifications(specifications) | |
| ordered_types: List[str] = list(SPECIFICATION_TYPES) | |
| for spec_type in grouped.keys(): | |
| if spec_type not in ordered_types: | |
| ordered_types.append(spec_type) | |
| counts_by_type: Dict[str, int] = {spec_type: len(grouped.get(spec_type, [])) for spec_type in ordered_types} | |
| total_specs = sum(counts_by_type.values()) | |
| status_counts: Dict[str, int] = defaultdict(int) | |
| timestamp_candidates: List[datetime] = [] | |
| for items in grouped.values(): | |
| for spec in items: | |
| status = str(spec.get("status") or "pending").strip().lower() | |
| status_counts[status] += 1 | |
| parsed = _parse_datetime(spec.get("created_at")) | |
| if parsed is not None: | |
| timestamp_candidates.append(parsed) | |
| latest_activity = _format_datetime_for_display(max(timestamp_candidates)) if timestamp_candidates else "Not available" | |
| table_of_contents = _build_table_of_contents(ordered_types, counts_by_type) | |
| conversation_base_url = str(project_data.get("conversation_base_url") or "").strip() | |
| sections_html, conversation_ids = _build_specification_sections( | |
| grouped, ordered_types, conversation_base_url | |
| ) | |
| statistics_block = _build_statistics_block(total_specs, counts_by_type, status_counts, latest_activity) | |
| conversation_references = _build_conversation_reference_section( | |
| conversation_ids, conversation_base_url | |
| ) | |
| return { | |
| "brand_name": brand_name, | |
| "project_name": html.escape(project_name), | |
| "project_description": project_description, | |
| "project_created_at": project_created_at, | |
| "project_identifier": project_identifier, | |
| "specification_total": total_specs, | |
| "table_of_contents": table_of_contents, | |
| "specification_sections": sections_html, | |
| "statistics_block": statistics_block, | |
| "conversation_references": conversation_references, | |
| "latest_activity": latest_activity, | |
| } | |
| def _build_table_of_contents(spec_types: Sequence[str], counts: Mapping[str, int]) -> str: | |
| """Create an ordered list linking to each specification section.""" | |
| if not spec_types: | |
| return ( | |
| "<p class=\"empty-state\">No specification categories are configured. " | |
| "Update SPECIFICATION_TYPES to populate the table of contents.</p>" | |
| ) | |
| lines = ["<ol class=\"toc-list\">"] | |
| for spec_type in spec_types: | |
| slug = _slugify(spec_type) | |
| count = counts.get(spec_type, 0) | |
| lines.append( | |
| " <li>" | |
| f"<a href=\"#{slug}\">" | |
| f"<span class=\"toc-title\">{html.escape(spec_type)}</span>" | |
| f"<span class=\"toc-count\">{count}</span>" | |
| "</a>" | |
| "</li>" | |
| ) | |
| lines.append("</ol>") | |
| return "\n".join(lines) | |
| def _build_specification_sections( | |
| grouped: Mapping[str, Sequence[Mapping[str, Any]]], | |
| ordered_types: Sequence[str], | |
| conversation_base_url: str, | |
| ) -> Tuple[str, Set[str]]: | |
| """Render each specification category into HTML sections.""" | |
| sections: List[str] = [] | |
| conversation_ids: Set[str] = set() | |
| if not grouped: | |
| sections.append( | |
| "<section class=\"spec-section empty\">" | |
| "<p>No specifications have been captured yet. Approve drafts to populate this report.</p>" | |
| "</section>" | |
| ) | |
| return "\n".join(sections), conversation_ids | |
| for spec_type in ordered_types: | |
| items = list(grouped.get(spec_type, [])) | |
| slug = _slugify(spec_type) | |
| sections.append(f"<section id=\"{slug}\" class=\"spec-section\">") | |
| header_html = ( | |
| " <header class=\"section-header\">" | |
| f"<h2>{html.escape(spec_type)}</h2>" | |
| f"<span class=\"badge\">{len(items)} items</span>" | |
| "</header>" | |
| ) | |
| sections.append(header_html) | |
| if not items: | |
| sections.append( | |
| " <p class=\"empty-state\">No specifications approved for this category yet.</p>" | |
| ) | |
| sections.append("</section>") | |
| continue | |
| for spec in items: | |
| title = html.escape(str(spec.get("title") or "Untitled").strip() or "Untitled") | |
| raw_status = str(spec.get("status") or "pending").strip() or "pending" | |
| status_label = html.escape(raw_status.replace("_", " ").title()) | |
| status_class = _slugify(raw_status) | |
| created_display = _format_datetime_for_display(spec.get("created_at")) | |
| conversation_id = spec.get("conversation_id") | |
| conversation_link = "" | |
| if conversation_id is not None: | |
| identifier = str(conversation_id) | |
| conversation_ids.add(identifier) | |
| link_href = _build_conversation_link(conversation_base_url, identifier) | |
| link_text = html.escape(identifier) | |
| conversation_link = ( | |
| f'<a class="conversation-link" href="{link_href}">' | |
| f'View source conversation #{link_text}</a>' | |
| ) | |
| body_html = _render_rich_text(str(spec.get("content") or "")) | |
| sections.append(" <article class=\"spec-card\">") | |
| sections.append(f" <h3>{title}</h3>") | |
| sections.append(" <div class=\"spec-meta\">") | |
| sections.append( | |
| f" <span class=\"status-pill status-{status_class}\">{status_label}</span>" | |
| ) | |
| if created_display: | |
| sections.append( | |
| f" <span class=\"timestamp\">Captured: {created_display}</span>" | |
| ) | |
| if conversation_link: | |
| sections.append(f" {conversation_link}") | |
| sections.append(" </div>") | |
| sections.append(f" <div class=\"spec-body\">{body_html}</div>") | |
| sections.append(" </article>") | |
| sections.append("</section>") | |
| return "\n".join(sections), conversation_ids | |
| def _build_statistics_block( | |
| total_specs: int, | |
| counts_by_type: Mapping[str, int], | |
| status_counts: Mapping[str, int], | |
| latest_activity: str, | |
| ) -> str: | |
| """Summarise key metrics for the exported project.""" | |
| cards: List[str] = ["<div class=\"statistics-grid\">"] | |
| cards.append( | |
| " <div class=\"stat-card\">" | |
| "<span class=\"stat-label\">Total Specifications</span>" | |
| f"<span class=\"stat-value\">{total_specs}</span>" | |
| "</div>" | |
| ) | |
| for spec_type, count in counts_by_type.items(): | |
| cards.append( | |
| " <div class=\"stat-card\">" | |
| f"<span class=\"stat-label\">{html.escape(spec_type)}</span>" | |
| f"<span class=\"stat-value\">{count}</span>" | |
| "</div>" | |
| ) | |
| if status_counts: | |
| status_items: List[str] = [] | |
| for status, count in sorted(status_counts.items()): | |
| status_items.append( | |
| "<li>" | |
| f"<span class=\"status-name\">{html.escape(status.replace('_', ' ').title())}</span>" | |
| f"<span class=\"status-count\">{count}</span>" | |
| "</li>" | |
| ) | |
| cards.append( | |
| " <div class=\"stat-card span-2\">" | |
| "<span class=\"stat-label\">By Status</span>" | |
| f"<ul class=\"status-list\">{''.join(status_items)}</ul>" | |
| "</div>" | |
| ) | |
| cards.append( | |
| " <div class=\"stat-card span-2\">" | |
| "<span class=\"stat-label\">Last Updated</span>" | |
| f"<span class=\"stat-value\">{latest_activity}</span>" | |
| "</div>" | |
| ) | |
| cards.append("</div>") | |
| return "\n".join(cards) | |
| def _build_conversation_reference_section( | |
| conversation_ids: Set[str], conversation_base_url: str | |
| ) -> str: | |
| """Generate a section listing links back to the originating conversations.""" | |
| header = "<section id=\"conversation-references\" class=\"conversation-section\">" | |
| header += "<h2>Conversation References</h2>" | |
| if not conversation_ids: | |
| return ( | |
| header | |
| + "<p>No linked conversations were captured for these specifications. Continue collaborating to enrich this section.</p>" | |
| + "</section>" | |
| ) | |
| items: List[str] = [] | |
| for identifier in sorted(conversation_ids, key=lambda value: (len(value), value)): | |
| href = _build_conversation_link(conversation_base_url, identifier) | |
| items.append( | |
| f"<li><a href=\"{href}\">Conversation #{html.escape(identifier)}</a></li>" | |
| ) | |
| return header + "<ul class=\"conversation-list\">" + "".join(items) + "</ul></section>" | |
| def _build_conversation_link(base_url: str, conversation_id: Any) -> str: | |
| """Return a safe hyperlink for a conversation reference.""" | |
| identifier = str(conversation_id) | |
| if base_url: | |
| href = f"{base_url.rstrip('/')}/{identifier}" | |
| else: | |
| href = f"#conversation-{identifier}" | |
| return html.escape(href, quote=True) | |
| def _render_rich_text(content: str) -> str: | |
| """Convert plain text into minimal HTML while preserving structure.""" | |
| stripped = content.strip() | |
| if not stripped: | |
| return "<p><em>No additional details provided.</em></p>" | |
| escaped = html.escape(stripped) | |
| paragraphs = [para for para in escaped.split("\n\n") if para] | |
| if not paragraphs: | |
| paragraphs = [escaped] | |
| formatted: List[str] = [] | |
| for paragraph in paragraphs: | |
| formatted.append("<p>" + paragraph.replace("\n", "<br />") + "</p>") | |
| return "\n".join(formatted) | |
| def _parse_datetime(value: Any) -> Optional[datetime]: | |
| """Safely parse a datetime from various formats used in the database.""" | |
| if isinstance(value, datetime): | |
| dt = value | |
| elif isinstance(value, str): | |
| candidate = value.strip() | |
| if not candidate: | |
| return None | |
| try: | |
| dt = datetime.fromisoformat(candidate) | |
| except ValueError: | |
| return None | |
| else: | |
| return None | |
| if dt.tzinfo is None: | |
| dt = dt.replace(tzinfo=timezone.utc) | |
| return dt | |
| def _format_datetime_for_display(value: Any) -> str: | |
| """Render a datetime value in a human-friendly string.""" | |
| parsed = _parse_datetime(value) | |
| if parsed is None: | |
| if isinstance(value, str) and value.strip(): | |
| return html.escape(value.strip()) | |
| return "" | |
| display = parsed.astimezone(timezone.utc).strftime("%d %B %Y %H:%M %Z") | |
| return html.escape(display) | |
| def _slugify(value: str) -> str: | |
| """Convert arbitrary text into an anchor-friendly slug.""" | |
| slug = re.sub(r"[^a-z0-9]+", "-", value.lower()) | |
| slug = slug.strip("-") | |
| return slug or "section" | |
| def _build_markdown_context( | |
| project_data: Mapping[str, Any], | |
| specifications: Sequence[Mapping[str, Any]], | |
| generated_at: str, | |
| ) -> Dict[str, Any]: | |
| """Assemble structured context for the Markdown export template.""" | |
| project_name = ( | |
| str(project_data.get("name") or "Untitled Project").strip() or "Untitled Project" | |
| ) | |
| description_raw = project_data.get("description") | |
| project_description = ( | |
| str(description_raw).strip() if description_raw and str(description_raw).strip() else "Not provided." | |
| ) | |
| conversation_base_url = str(project_data.get("conversation_base_url") or "").strip() | |
| project_identifier = str(project_data.get("id") or "N/A") | |
| created_dt = _parse_datetime(project_data.get("created_at")) | |
| project_created_at = ( | |
| created_dt.astimezone(timezone.utc).isoformat() if created_dt else "not_recorded" | |
| ) | |
| grouped = _group_specifications(specifications) | |
| ordered_types: List[str] = list(SPECIFICATION_TYPES) | |
| for spec_type in grouped: | |
| if spec_type not in ordered_types: | |
| ordered_types.append(spec_type) | |
| spec_counts = [ | |
| {"type": spec_type, "count": len(grouped.get(spec_type, []))} | |
| for spec_type in ordered_types | |
| ] | |
| total_specs = sum(entry["count"] for entry in spec_counts) | |
| status_totals: Dict[str, int] = defaultdict(int) | |
| conversation_links: List[Dict[str, str]] = [] | |
| seen_conversations: Set[str] = set() | |
| latest_candidates: List[datetime] = [] | |
| sections: Dict[str, str] = { | |
| placeholder: f"_No {category} documented yet._" | |
| for category, placeholder in MARKDOWN_SECTION_KEYS.items() | |
| } | |
| additional_section_blocks: List[str] = [] | |
| for spec_type in ordered_types: | |
| items = list(grouped.get(spec_type, [])) | |
| for spec in items: | |
| status = str(spec.get("status") or "pending").strip() or "pending" | |
| status_totals[status] += 1 | |
| created = _parse_datetime(spec.get("created_at")) | |
| if created is not None: | |
| latest_candidates.append(created) | |
| conversation_id = spec.get("conversation_id") | |
| if conversation_id is not None: | |
| identifier = str(conversation_id) | |
| if identifier not in seen_conversations: | |
| seen_conversations.add(identifier) | |
| if conversation_base_url: | |
| link_url = f"{conversation_base_url.rstrip('/')}/{identifier}" | |
| else: | |
| link_url = f"#conversation-{identifier}" | |
| conversation_links.append({"id": identifier, "url": link_url}) | |
| section_text = _format_markdown_section(spec_type, items, conversation_base_url) | |
| placeholder = MARKDOWN_SECTION_KEYS.get(spec_type) | |
| if placeholder: | |
| sections[placeholder] = section_text | |
| elif section_text: | |
| additional_section_blocks.append(f"### {spec_type}\n{section_text}") | |
| status_counts = [ | |
| {"status": status.replace("_", " ").title(), "count": count} | |
| for status, count in sorted(status_totals.items()) | |
| ] | |
| conversation_links.sort(key=lambda item: (len(item["id"]), item["id"])) | |
| latest_activity = ( | |
| max(latest_candidates).astimezone(timezone.utc).isoformat() | |
| if latest_candidates | |
| else "not_recorded" | |
| ) | |
| implementation_notes = str(project_data.get("implementation_notes") or "").strip() | |
| if not implementation_notes: | |
| implementation_notes = "_No implementation notes provided yet._" | |
| additional_sections = "\n\n".join(additional_section_blocks) | |
| metadata = { | |
| "project_id": project_identifier, | |
| "project_created_at": project_created_at, | |
| "spec_counts": spec_counts, | |
| "status_counts": status_counts, | |
| "conversation_links": conversation_links, | |
| "latest_activity": latest_activity, | |
| } | |
| context: Dict[str, Any] = { | |
| "project_name": project_name, | |
| "project_description": project_description, | |
| "generation_date": generated_at, | |
| "spec_count": total_specs, | |
| "implementation_notes": implementation_notes, | |
| "additional_sections": additional_sections, | |
| "metadata": metadata, | |
| } | |
| context.update(sections) | |
| return context | |
| def _format_markdown_section( | |
| spec_type: str, | |
| items: Sequence[Mapping[str, Any]], | |
| conversation_base_url: str, | |
| ) -> str: | |
| """Render a specification collection as a YAML-like Markdown block.""" | |
| if not items: | |
| return f"_No {spec_type} documented yet._" | |
| lines: List[str] = [] | |
| for spec in items: | |
| lines.extend(_format_markdown_entry(spec, conversation_base_url)) | |
| return "\n".join(lines) | |
| def _format_markdown_entry( | |
| spec: Mapping[str, Any], conversation_base_url: str | |
| ) -> List[str]: | |
| """Create a structured bullet list representation for a specification.""" | |
| title = str(spec.get("title") or "Untitled").strip() or "Untitled" | |
| status = str(spec.get("status") or "pending").strip() or "pending" | |
| status_label = status.replace("_", " ").title() | |
| entry: List[str] = [f"- title: {json.dumps(title)}", f" status: {json.dumps(status_label)}"] | |
| spec_id = spec.get("id") | |
| if spec_id is not None: | |
| entry.append(f" specification_id: {json.dumps(str(spec_id))}") | |
| conversation_id = spec.get("conversation_id") | |
| if conversation_id is not None: | |
| identifier = str(conversation_id) | |
| entry.append(f" conversation_id: {json.dumps(identifier)}") | |
| if conversation_base_url: | |
| url = f"{conversation_base_url.rstrip('/')}/{identifier}" | |
| else: | |
| url = f"#conversation-{identifier}" | |
| entry.append(f" conversation_url: {json.dumps(url)}") | |
| created = _parse_datetime(spec.get("created_at")) | |
| if created is not None: | |
| entry.append(f" captured_at: {json.dumps(created.astimezone(timezone.utc).isoformat())}") | |
| body = str(spec.get("content") or "").strip() | |
| if body: | |
| entry.append(" details: |") | |
| for line in body.splitlines(): | |
| entry.append(f" {line}") | |
| else: | |
| entry.append(" details: _No additional narrative provided._") | |
| return entry | |
| def generate_export_html( | |
| project_data: Mapping[str, Any], | |
| specifications: Sequence[Mapping[str, Any]], | |
| ) -> str: | |
| """Generate an HTML report for a project and its specifications. | |
| Args: | |
| project_data: Metadata describing the project (name, description, etc.). | |
| specifications: Collection of specification records to include. | |
| Returns: | |
| Rendered HTML string ready for download. | |
| Raises: | |
| ValueError: When the HTML template configuration is missing. | |
| """ | |
| html_template_meta = EXPORT_TEMPLATES.get("html") | |
| if not html_template_meta: | |
| raise ValueError("HTML export template is not configured.") | |
| template_path = Path(html_template_meta["path"]) | |
| if not template_path.is_absolute(): | |
| template_path = BASE_DIR / template_path | |
| context = _prepare_html_export_context(project_data, specifications) | |
| context["generated_at"] = get_current_timestamp() | |
| return _render_template(template_path, context) | |
| def generate_export_markdown( | |
| project_data: Mapping[str, Any], | |
| specifications: Sequence[Mapping[str, Any]], | |
| ) -> str: | |
| """Generate a Markdown report mirroring the HTML export.""" | |
| md_template_meta = EXPORT_TEMPLATES.get("markdown") | |
| if not md_template_meta: | |
| raise ValueError("Markdown export template is not configured.") | |
| template_path = Path(md_template_meta["path"]) | |
| if not template_path.is_absolute(): | |
| template_path = BASE_DIR / template_path | |
| generated_at = get_current_timestamp() | |
| context = _build_markdown_context(project_data, specifications, generated_at) | |
| return _render_template(template_path, context) | |
| # --------------------------------------------------------------------------- | |
| # Security and auditing helpers | |
| # --------------------------------------------------------------------------- | |
| def sanitize_input(text: Optional[str]) -> str: | |
| """Escape potentially dangerous user input before rendering. | |
| This helper strips leading/trailing whitespace, normalises line endings, and | |
| escapes HTML-sensitive characters. It does **not** attempt to remove | |
| Markdown formatting or SQL injection vectors; those concerns should be | |
| handled by parameterised queries and additional context-specific checks. | |
| """ | |
| if text is None: | |
| return "" | |
| cleaned = text.replace("\r\n", "\n").replace("\r", "\n").strip() | |
| return html.escape(cleaned) | |
| def log_user_action(action: str, details: Optional[Mapping[str, Any]] = None) -> None: | |
| """Record high-level user events to aid debugging and auditing. | |
| Args: | |
| action: Short description of the operation (e.g. ``"create_project"``). | |
| details: Optional mapping of additional metadata for structured logs. | |
| """ | |
| if not action: | |
| raise ValueError("Action description must be provided for logging.") | |
| if details is None: | |
| details = {} | |
| LOGGER.info("User action: %s | Details: %s", action, details) | |
| # --------------------------------------------------------------------------- | |
| # Time helpers | |
| # --------------------------------------------------------------------------- | |
| def get_current_timestamp() -> str: | |
| """Return the current UTC timestamp in ISO 8601 format.""" | |
| return datetime.now(timezone.utc).isoformat() | |
| # --------------------------------------------------------------------------- | |
| # Analytics helpers | |
| # --------------------------------------------------------------------------- | |
| def calculate_project_stats(project_id: int) -> Dict[str, Any]: | |
| """Compute aggregate metrics for a single project. | |
| The resulting dictionary includes counts of conversations, messages, pending | |
| specifications, approved specifications, and the timestamp of the most | |
| recent activity. These metrics power dashboards or can be surfaced in the | |
| "Specifications" tab to give users a quick overview of project health. | |
| """ | |
| if not isinstance(project_id, int) or project_id <= 0: | |
| raise ValueError("project_id must be a positive integer.") | |
| stats = { | |
| "total_conversations": 0, | |
| "total_messages": 0, | |
| "pending_specifications": 0, | |
| "approved_specifications": 0, | |
| "last_activity": None, | |
| } | |
| try: | |
| with sqlite3.connect(DATABASE_PATH) as conn: | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| "SELECT COUNT(*) AS count FROM conversations WHERE project_id = ?", | |
| (project_id,), | |
| ) | |
| stats["total_conversations"] = cursor.fetchone()["count"] | |
| cursor.execute( | |
| """ | |
| SELECT COUNT(*) AS count | |
| FROM messages m | |
| JOIN conversations c ON c.id = m.conversation_id | |
| WHERE c.project_id = ? | |
| """, | |
| (project_id,), | |
| ) | |
| stats["total_messages"] = cursor.fetchone()["count"] | |
| cursor.execute( | |
| "SELECT COUNT(*) AS count FROM specifications WHERE project_id = ? AND status = 'pending'", | |
| (project_id,), | |
| ) | |
| stats["pending_specifications"] = cursor.fetchone()["count"] | |
| cursor.execute( | |
| "SELECT COUNT(*) AS count FROM specifications WHERE project_id = ? AND status = 'approved'", | |
| (project_id,), | |
| ) | |
| stats["approved_specifications"] = cursor.fetchone()["count"] | |
| cursor.execute( | |
| """ | |
| SELECT MAX(ts) AS last_activity | |
| FROM ( | |
| SELECT MAX(created_at) AS ts FROM conversations WHERE project_id = ? | |
| UNION ALL | |
| SELECT MAX(timestamp) AS ts FROM messages m JOIN conversations c ON c.id = m.conversation_id WHERE c.project_id = ? | |
| UNION ALL | |
| SELECT MAX(created_at) AS ts FROM specifications WHERE project_id = ? | |
| ) | |
| """, | |
| (project_id, project_id, project_id), | |
| ) | |
| row = cursor.fetchone() | |
| stats["last_activity"] = row["last_activity"] if row else None | |
| except sqlite3.DatabaseError as error: | |
| LOGGER.exception("Failed to calculate stats for project %s: %s", project_id, error) | |
| raise | |
| return stats | |
| __all__ = [ | |
| "validate_api_key", | |
| "format_prompt", | |
| "format_conversation_history", | |
| "render_export", | |
| "generate_export_html", | |
| "generate_export_markdown", | |
| "sanitize_input", | |
| "log_user_action", | |
| "get_current_timestamp", | |
| "calculate_project_stats", | |
| ] | |