Spaces:
Sleeping
Sleeping
fahmiaziz98
Refactor document processing and retrieval workflow; add utility functions for markdown conversion and logging
40ca01e
| import os | |
| import logging | |
| from typing import Any | |
| from pathlib import Path | |
| from markitdown import MarkItDown | |
| def setup_logging(): | |
| """Sets up the logging configuration.""" | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("app.log", encoding="utf-8"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| return logging.getLogger(__name__) | |
| logger = setup_logging() | |
| def extract_filename(filepath: Path) -> str: | |
| """Extracts the filename without extension. | |
| Args: | |
| filepath: The complete path to the file. | |
| Returns: | |
| The filename without extension. | |
| """ | |
| logger.info(f"Extracting filename from {filepath}") | |
| return os.path.splitext(os.path.basename(filepath))[0] # More concise way to get filename | |
| def convert_document_to_markdown(filepath: Path) -> str: | |
| """Converts a document to markdown. | |
| Args: | |
| filepath: The path to the document file. | |
| Returns: | |
| The raw markdown content. | |
| """ | |
| logger.info(f"Converting document to markdown: {filepath}") | |
| md = MarkItDown(enable_plugins=False) # Set to True to enable plugins if needed | |
| result = md.convert(filepath) | |
| return result.markdown | |
| def save_to_markdown(text: Any, path: Path) -> str: | |
| """Saves text content to a markdown file. | |
| Args: | |
| text: The text or markdown content to save. | |
| path: The complete path to the markdown file. | |
| Returns: | |
| The path to the saved markdown file as a string. | |
| """ | |
| filename = extract_filename(path) | |
| filepath = f'{filename}.md' # Create the full filepath | |
| with open(filepath, 'w', encoding='utf-8') as f: | |
| f.write(text) | |
| logger.info(f"Markdown file saved successfully at {filepath}") | |
| return filepath # Return the filepath | |
| def determine_top_k(num_chunks: int) -> int: | |
| """Determines the top_k value based on the number of chunks. | |
| Args: | |
| num_chunks: The total number of chunks. | |
| Returns: | |
| The appropriate top_k value. | |
| """ | |
| if num_chunks <= 5: | |
| top_k = num_chunks | |
| else: | |
| top_k = 5 | |
| logger.info(f"Determined top_k: {top_k} based on num_chunks: {num_chunks}") | |
| return top_k | |
| def determine_reranking_top_n(top_k: int) -> int: | |
| """Determines the top_n value for reranking based on top_k. | |
| Args: | |
| top_k: The number of top results to consider. | |
| Returns: | |
| The appropriate top_n value for reranking. | |
| """ | |
| total_top_k = top_k * 2 | |
| if total_top_k <= 5: | |
| top_n = round(total_top_k / 2) + 1 | |
| else: | |
| top_n = 6 | |
| logger.info(f"Determined top_n: {top_n} based on top_k: {top_k}") | |
| return top_n |