Spaces:
Runtime error
Runtime error
File size: 3,296 Bytes
7db5397 763bbfb 7db5397 763bbfb c78e680 7db5397 822f166 763bbfb e800c46 b3e7432 9f8fe28 e800c46 9f8fe28 b3e7432 e800c46 763bbfb e800c46 763bbfb e800c46 763bbfb e800c46 9f8fe28 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 | import requests
import json
import pdfplumber
import pandas as pd
import time
from cnocr import CnOcr
from sentence_transformers import SentenceTransformer, models, util
# import mysql.connector
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
origins = [
"http://localhost",
"http://localhost:8080",
"http://localhost:3000",
"https://invoice-pdf-xi.vercel.app"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def home():
html_content = open('index.html').read()
return HTMLResponse(content=html_content, status_code=200)
@app.post("/file")
async def up_file(file: UploadFile = File(...)):
ocr = CnOcr()
doc_text_list = []
with pdfplumber.open(file.file) as pdf:
for i in range(len(pdf.pages)):
# Read page i+1 of a PDF document
page = pdf.pages[i]
res_list = page.extract_text().split('\n')[:-1]
for j in range(len(page.images)):
# Get the binary stream of the image
img = page.images[j]
file_name = '{}-{}-{}.png'.format(str(time.time()), str(i), str(j))
with open(file_name, mode='wb') as f:
f.write(img['stream'].get_data())
try:
res = ocr.ocr(file_name)
except Exception as e:
res = []
if len(res) > 0:
res_list.append(' '.join([re['text'] for re in res]))
tables = page.extract_tables()
for table in tables:
# The first column is used as the header
df = pd.DataFrame(table[1:], columns=table[0])
try:
records = json.loads(df.to_json(orient="records", force_ascii=False))
for rec in records:
res_list.append(json.dumps(rec, ensure_ascii=False))
except Exception as e:
res_list.append(str(df))
doc_text_list += res_list
# doc_text_list = [str(text).strip() for text in doc_text_list if len(str(text).strip()) > 0]
print(doc_text_list)
return doc_text_list
# @app.get("/{provider_id}")
# def get_provider(provider_id):
conn = mysql.connector.connect(
host="localhost",
user="root",
passwd="1234",
database="cg_app",
auth_plugin='mysql_native_password'
)
if conn.is_connected():
print('Connected to MySQL database')
cursor = conn.cursor()
query = f"SELECT * FROM email_automation_invoice_provider_profile_updated WHERE provider_id={provider_id}"
cursor.execute(query)
# Get column names from cursor description
column_names = [col[0] for col in cursor.description]
# Create list of dictionaries representing rows
rows = []
for row in cursor.fetchall():
row_dict = {}
for i, value in enumerate(row):
row_dict[column_names[i]] = value
rows.append(row_dict)
cursor.close()
conn.close()
# Return result as JSON response
return rows |