Spaces:
Runtime error
Runtime error
| import os | |
| import nest_asyncio | |
| from io import BytesIO | |
| from typing import List | |
| from dotenv import load_dotenv | |
| from fastapi import UploadFile | |
| from llama_index.core.schema import Document | |
| from script.get_metadata import Metadata | |
| from core.prompt import PARSER_INSTRUCTION | |
| from service.llamaparse import S3ImageSaver | |
| from service.llamaparse import LlamaParseWithS3 | |
| from utils.error_handlers import handle_error, handle_exception | |
| from fastapi.responses import JSONResponse | |
| load_dotenv() | |
| nest_asyncio.apply() | |
| def get_documents(json_list: List[dict]): | |
| text_documents = [] | |
| try: | |
| for idx, page in enumerate(json_list): | |
| text_document = Document(text=page["md"], metadata={"page": page["page"]}) | |
| text_documents.append(text_document) | |
| return text_documents | |
| except Exception as e: | |
| return handle_error( | |
| e, "Error processing file in get_documents", status_code=400 | |
| ) | |
| def parse_journal(title, content: bytes, file_name: str, lang: str = "en"): | |
| """Parse the journal using LlamaParse.""" | |
| try: | |
| # Initialize the parser | |
| s3_image_saver = S3ImageSaver( | |
| bucket_name=os.getenv("S3_BUCKET_NAME"), | |
| access_key=os.getenv("AWS_ACCESS_KEY_ID"), | |
| secret_key=os.getenv("AWS_SECRET_ACCESS_KEY"), | |
| region_name="us-west-2", | |
| ) | |
| print("s3 image saver",s3_image_saver) | |
| s3_parser = LlamaParseWithS3( | |
| api_key=os.getenv( | |
| "LLAMA_PARSE_API_KEY" | |
| ), # can also be set in your env as LLAMA_CLOUD_API_KEY | |
| parsing_instruction=PARSER_INSTRUCTION, | |
| result_type="markdown", # "markdown" and "text" are available | |
| verbose=True, | |
| language=lang, # Optionally you can define a language, default=en | |
| s3_image_saver=s3_image_saver, | |
| ) | |
| md_json_objs = s3_parser.get_json_result( | |
| content, extra_info={"file_name": file_name} | |
| ) | |
| json_list = md_json_objs[0]["pages"] | |
| image_dicts = s3_parser.get_images(md_json_objs, title) | |
| if isinstance(image_dicts, JSONResponse): | |
| image_urls=image_dicts # Return the error response directly | |
| else: | |
| image_urls = [ | |
| {"page_number": img["page_number"], "image_link": img["image_link"]} | |
| for img in image_dicts | |
| if img["image_link"] is not None | |
| ] | |
| return json_list, image_urls | |
| except Exception as e: | |
| return handle_error( | |
| e, "Error processing file in parse_journal", status_code=400 | |
| ) | |
| async def upload_file(reference, file: UploadFile, lang: str = "en"): | |
| try: | |
| # Read the binary content of the uploaded file once | |
| content = await file.read() | |
| # Store the file content in a BytesIO stream for reuse later | |
| file_stream = BytesIO(content) | |
| # Parse the journal | |
| title = reference["title"] | |
| json_list, image_urls = parse_journal(title, content, file.filename, lang) | |
| parsed_documents = get_parsed_documents(json_list, image_urls) | |
| if isinstance(image_urls, JSONResponse): | |
| return image_urls # Return the error response directly | |
| metadata_gen = Metadata(reference) | |
| documents_with_metadata = metadata_gen.apply_metadata(parsed_documents) | |
| print("Banyak documents : \n", len(documents_with_metadata)) | |
| # Return both parsed documents and metadata | |
| return documents_with_metadata, file_stream | |
| except Exception as e: | |
| print("error ", e) | |
| return handle_exception(e) | |
| def get_parsed_documents(json_dicts=None, image_links=None): | |
| try: | |
| """Split docs into nodes, by separator.""" | |
| parsed_documents = [] | |
| # Preprocess metadata | |
| md_texts = [d["md"] for d in json_dicts] if json_dicts is not None else None | |
| # Create a dictionary to store lists of image links for each page number | |
| image_link_dict = {} | |
| if image_links: | |
| for item in image_links: | |
| page_number = item["page_number"] | |
| image_link = item["image_link"] | |
| if page_number in image_link_dict: | |
| image_link_dict[page_number].append(image_link) | |
| else: | |
| image_link_dict[page_number] = [image_link] | |
| md_texts = [d["md"] for d in json_dicts] | |
| for idx, md_text in enumerate(md_texts): | |
| page_number = idx + 1 | |
| chunk_metadata = {"page_number": page_number} | |
| # Set the image link if it exists; otherwise, set it to None | |
| chunk_metadata["image_links"] = image_link_dict.get(page_number, []) | |
| # Add parsed text and create the Document object | |
| parsed_document = Document( | |
| text=md_text, | |
| metadata=chunk_metadata, | |
| ) | |
| parsed_documents.append(parsed_document) | |
| return parsed_documents | |
| except Exception as e: | |
| return handle_error( | |
| e, "Error processing documents in get_text_documents", status_code=400 | |
| ) | |