import requests import json import pdfplumber import pandas as pd import time from cnocr import CnOcr from sentence_transformers import SentenceTransformer, models, util # import mysql.connector from fastapi import FastAPI, UploadFile, File from fastapi.responses import HTMLResponse from fastapi.middleware.cors import CORSMiddleware app = FastAPI() origins = [ "http://localhost", "http://localhost:8080", "http://localhost:3000", "https://invoice-pdf-xi.vercel.app" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/") def home(): html_content = open('index.html').read() return HTMLResponse(content=html_content, status_code=200) @app.post("/file") async def up_file(file: UploadFile = File(...)): ocr = CnOcr() doc_text_list = [] with pdfplumber.open(file.file) as pdf: for i in range(len(pdf.pages)): # Read page i+1 of a PDF document page = pdf.pages[i] res_list = page.extract_text().split('\n')[:-1] for j in range(len(page.images)): # Get the binary stream of the image img = page.images[j] file_name = '{}-{}-{}.png'.format(str(time.time()), str(i), str(j)) with open(file_name, mode='wb') as f: f.write(img['stream'].get_data()) try: res = ocr.ocr(file_name) except Exception as e: res = [] if len(res) > 0: res_list.append(' '.join([re['text'] for re in res])) tables = page.extract_tables() for table in tables: # The first column is used as the header df = pd.DataFrame(table[1:], columns=table[0]) try: records = json.loads(df.to_json(orient="records", force_ascii=False)) for rec in records: res_list.append(json.dumps(rec, ensure_ascii=False)) except Exception as e: res_list.append(str(df)) doc_text_list += res_list # doc_text_list = [str(text).strip() for text in doc_text_list if len(str(text).strip()) > 0] print(doc_text_list) return doc_text_list # @app.get("/{provider_id}") # def get_provider(provider_id): conn = mysql.connector.connect( host="localhost", user="root", passwd="1234", database="cg_app", auth_plugin='mysql_native_password' ) if conn.is_connected(): print('Connected to MySQL database') cursor = conn.cursor() query = f"SELECT * FROM email_automation_invoice_provider_profile_updated WHERE provider_id={provider_id}" cursor.execute(query) # Get column names from cursor description column_names = [col[0] for col in cursor.description] # Create list of dictionaries representing rows rows = [] for row in cursor.fetchall(): row_dict = {} for i, value in enumerate(row): row_dict[column_names[i]] = value rows.append(row_dict) cursor.close() conn.close() # Return result as JSON response return rows