Spaces:
Runtime error
Runtime error
| import requests | |
| import json | |
| import pdfplumber | |
| import pandas as pd | |
| import time | |
| from cnocr import CnOcr | |
| from sentence_transformers import SentenceTransformer, models, util | |
| # import mysql.connector | |
| from fastapi import FastAPI, UploadFile, File | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.middleware.cors import CORSMiddleware | |
| app = FastAPI() | |
| origins = [ | |
| "http://localhost", | |
| "http://localhost:8080", | |
| "http://localhost:3000", | |
| "https://invoice-pdf-xi.vercel.app" | |
| ] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=origins, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| def home(): | |
| html_content = open('index.html').read() | |
| return HTMLResponse(content=html_content, status_code=200) | |
| async def up_file(file: UploadFile = File(...)): | |
| ocr = CnOcr() | |
| doc_text_list = [] | |
| with pdfplumber.open(file.file) as pdf: | |
| for i in range(len(pdf.pages)): | |
| # Read page i+1 of a PDF document | |
| page = pdf.pages[i] | |
| res_list = page.extract_text().split('\n')[:-1] | |
| for j in range(len(page.images)): | |
| # Get the binary stream of the image | |
| img = page.images[j] | |
| file_name = '{}-{}-{}.png'.format(str(time.time()), str(i), str(j)) | |
| with open(file_name, mode='wb') as f: | |
| f.write(img['stream'].get_data()) | |
| try: | |
| res = ocr.ocr(file_name) | |
| except Exception as e: | |
| res = [] | |
| if len(res) > 0: | |
| res_list.append(' '.join([re['text'] for re in res])) | |
| tables = page.extract_tables() | |
| for table in tables: | |
| # The first column is used as the header | |
| df = pd.DataFrame(table[1:], columns=table[0]) | |
| try: | |
| records = json.loads(df.to_json(orient="records", force_ascii=False)) | |
| for rec in records: | |
| res_list.append(json.dumps(rec, ensure_ascii=False)) | |
| except Exception as e: | |
| res_list.append(str(df)) | |
| doc_text_list += res_list | |
| # doc_text_list = [str(text).strip() for text in doc_text_list if len(str(text).strip()) > 0] | |
| print(doc_text_list) | |
| return doc_text_list | |
| # @app.get("/{provider_id}") | |
| # def get_provider(provider_id): | |
| conn = mysql.connector.connect( | |
| host="localhost", | |
| user="root", | |
| passwd="1234", | |
| database="cg_app", | |
| auth_plugin='mysql_native_password' | |
| ) | |
| if conn.is_connected(): | |
| print('Connected to MySQL database') | |
| cursor = conn.cursor() | |
| query = f"SELECT * FROM email_automation_invoice_provider_profile_updated WHERE provider_id={provider_id}" | |
| cursor.execute(query) | |
| # Get column names from cursor description | |
| column_names = [col[0] for col in cursor.description] | |
| # Create list of dictionaries representing rows | |
| rows = [] | |
| for row in cursor.fetchall(): | |
| row_dict = {} | |
| for i, value in enumerate(row): | |
| row_dict[column_names[i]] = value | |
| rows.append(row_dict) | |
| cursor.close() | |
| conn.close() | |
| # Return result as JSON response | |
| return rows |