pdf-api / app.py
stephenz007's picture
un comments the code
7db5397
import requests
import json
import pdfplumber
import pandas as pd
import time
from cnocr import CnOcr
from sentence_transformers import SentenceTransformer, models, util
# import mysql.connector
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
app = FastAPI()
origins = [
"http://localhost",
"http://localhost:8080",
"http://localhost:3000",
"https://invoice-pdf-xi.vercel.app"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def home():
html_content = open('index.html').read()
return HTMLResponse(content=html_content, status_code=200)
@app.post("/file")
async def up_file(file: UploadFile = File(...)):
ocr = CnOcr()
doc_text_list = []
with pdfplumber.open(file.file) as pdf:
for i in range(len(pdf.pages)):
# Read page i+1 of a PDF document
page = pdf.pages[i]
res_list = page.extract_text().split('\n')[:-1]
for j in range(len(page.images)):
# Get the binary stream of the image
img = page.images[j]
file_name = '{}-{}-{}.png'.format(str(time.time()), str(i), str(j))
with open(file_name, mode='wb') as f:
f.write(img['stream'].get_data())
try:
res = ocr.ocr(file_name)
except Exception as e:
res = []
if len(res) > 0:
res_list.append(' '.join([re['text'] for re in res]))
tables = page.extract_tables()
for table in tables:
# The first column is used as the header
df = pd.DataFrame(table[1:], columns=table[0])
try:
records = json.loads(df.to_json(orient="records", force_ascii=False))
for rec in records:
res_list.append(json.dumps(rec, ensure_ascii=False))
except Exception as e:
res_list.append(str(df))
doc_text_list += res_list
# doc_text_list = [str(text).strip() for text in doc_text_list if len(str(text).strip()) > 0]
print(doc_text_list)
return doc_text_list
# @app.get("/{provider_id}")
# def get_provider(provider_id):
conn = mysql.connector.connect(
host="localhost",
user="root",
passwd="1234",
database="cg_app",
auth_plugin='mysql_native_password'
)
if conn.is_connected():
print('Connected to MySQL database')
cursor = conn.cursor()
query = f"SELECT * FROM email_automation_invoice_provider_profile_updated WHERE provider_id={provider_id}"
cursor.execute(query)
# Get column names from cursor description
column_names = [col[0] for col in cursor.description]
# Create list of dictionaries representing rows
rows = []
for row in cursor.fetchall():
row_dict = {}
for i, value in enumerate(row):
row_dict[column_names[i]] = value
rows.append(row_dict)
cursor.close()
conn.close()
# Return result as JSON response
return rows