Rsnarsna's picture
Update app.py
5b913e9 verified
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
import threading
import asyncio
import mysql.connector
import json
import logging
import pandas as pd
from llama_cpp import Llama
from transformers import pipeline
app = FastAPI()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Email and database configuration
DB_CONFIG = {
'host': '0.tcp.in.ngrok.io',
'port': 13890,
'user': 'root',
'password': '',
'database': 'shipment_details'
}
output_format = {
"origin": None,
"destination": None,
"expected_shipment_datetime": None,
"types_of_service": None,
"warehouse": None,
"description": None,
"quantities": None,
"carrier_details": None
}
# System prompt for LLM
prompt = f"""
System prompt: You will be provided with an email containing shipment details. Your task is to extract specific information based on the given instructions.
Instructions:
1. The input email may contain irrelevant information. Focus only on extracting details about future shipments.
2. The output should be in JSON format with all values to use only double quotes. If a type of information is not found, it should be marked as null.
3. Extract the following information:
- origin: The origin location of the consignment.
- destination: The destination location of the consignment.
- expected_shipment_datetime: The expected date and time of delivery to the warehouse (format: yyyy-mm-dd hh:mm:ss).
- types_of_service: The type of service (AIR, LCL, FCL).
- warehouse: The name of the warehouse.
- description: A brief description of the email (ASN).
- quantities: The number of items in the shipment.
- carrier_details: The details of the carrier.
4. The output extracted information must be in this format:
{{
"origin": "",
"destination": "",
"expected_shipment_datetime": "",
"types_of_service": "",
"warehouse": "",
"description": "",
"quantities": "",
"carrier_details": ""
}}
Examples:
1. Email: We are pleased to inform you of an upcoming shipment originating from Hamburg...
Extracted Information:
origin: Hamburg,
destination: New York,
expected_shipment_datetime: 2024-08-15 00:00:00,
types_of_service: AIR,
warehouse: Sky Logistics,
description: We are pleased to inform you of an upcoming shipment...
quantities: 200 units,
carrier_details: Sky Logistics
Output: {output_format}
"""
# Function to insert extracted shipment details into MySQL database
def insert_data(extracted_details):
try:
mydb = mysql.connector.connect(**DB_CONFIG)
cursor = mydb.cursor()
# Skip insertion if all required fields are empty
required_fields = ['origin', 'destination', 'expected_shipment_datetime',
'types_of_service', 'warehouse', 'description',
'quantities', 'carrier_details']
if all(extracted_details.get(field) in [None, ""] for field in required_fields):
logger.info("Skipping insertion: All extracted values are empty.")
return
sql = """
INSERT INTO shipment_details (
origin, destination, expected_shipment_datetime, types_of_service,
warehouse, description, quantities, carrier_details
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (
extracted_details.get('origin'),
extracted_details.get('destination'),
extracted_details.get('expected_shipment_datetime'),
extracted_details.get('types_of_service'),
extracted_details.get('warehouse'),
extracted_details.get('description'),
extracted_details.get('quantities'),
extracted_details.get('carrier_details')
)
cursor.execute(sql, values)
mydb.commit()
logger.info("Data inserted successfully.")
except mysql.connector.Error as db_err:
logger.error(f"Database error: {db_err}")
except Exception as ex:
logger.error(f"Error inserting data: {ex}")
# Function to read and process emails
def read_email():
logger.info("Loading Llama model...")
# llm = Llama.from_pretrained(
# repo_id="microsoft/Phi-3-mini-4k-instruct-gguf",
# filename="Phi-3-mini-4k-instruct-fp16.gguf", n_ctx=2048, n_threads=8
# )
llm = Llama(
model_path="./Phi-3-mini-4k-instruct-q4.gguf", # path to GGUF file
n_ctx=4096, # The max sequence length to use - note that longer sequence lengths require much more resources
n_threads=8, # The number of CPU threads to use, tailor to your system and the resulting performance
# n_gpu_layers=35, # The number of layers to offload to GPU, if you have GPU acceleration available. Set to 0 if no GPU acceleration is available on your system.
)
logger.info("Llama model loaded.")
logger.info("Reading emails from CSV...")
df = pd.read_csv('./emails.csv')
for i in df['Body']:
logger.info(f"Processing email: {i}")
output = llm(
f"<|system|>\n{prompt}<|end|><|user|>\n{i}<|end|>\n<|assistant|>",
max_tokens=256,
stop=["<|end|>"],
echo=False
)
logger.info("Extracting details...")
t = output['choices'][0]['text']
extracted_details = json.loads(t[t.find('{'):t.find('}') + 1].replace("'", '"'))
extracted_details = {key.lower().replace(" ", "_"): value for key, value in extracted_details.items()}
# Add meta data placeholders
meta_data = {
'sender': None,
'receiver': None,
'cc': None,
'bcc': None,
'subject': None
}
extracted_details.update(meta_data)
logger.info(f"Full extracted data: {extracted_details}")
insert_data(extracted_details)
# Global variable to control the email processing loop
running = False
# HTML content for the web interface
html_content = """
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Email Processing</title>
<style>
body { font-family: Arial, sans-serif; margin: 50px; }
h1 { color: #333; }
button {
padding: 10px 20px;
margin: 10px;
background-color: #4CAF50;
color: white;
border: none;
cursor: pointer;
}
button.stop { background-color: #f44336; }
#status { font-weight: bold; }
</style>
<script>
async function startLoop() {
const response = await fetch('/start', { method: 'POST' });
const result = await response.text();
document.getElementById("status").innerHTML = result;
}
async function stopLoop() {
const response = await fetch('/stop', { method: 'POST' });
const result = await response.text();
document.getElementById("status").innerHTML = result;
}
</script>
</head>
<body>
<h1>Email Processing Status: <span id="status">{{ status }}</span></h1>
<button onclick="startLoop()">Start</button>
<button class="stop" onclick="stopLoop()">Stop</button>
</body>
</html>
"""
# Function to process emails in a loop asynchronously
async def email_processing_loop():
global running
logger.info("Starting email processing loop...")
while running:
logger.info("Processing emails...")
read_email()
await asyncio.sleep(10) # Non-blocking delay for the loop
# Endpoint to display the current email processor status
@app.get("/", response_class=HTMLResponse)
async def home():
global running
status = "Running" if running else "Stopped"
return HTMLResponse(content=html_content.replace("{{ status }}", status), status_code=200)
# Endpoint to start the email processing loop
@app.post("/start")
async def start_email_loop():
global running
if not running:
running = True
asyncio.ensure_future(email_processing_loop())
logger.info("Email processing loop started.")
return "Running"
else:
return "Already running"
# Endpoint to stop the email processing loop
@app.post("/stop")
async def stop_email_loop():
global running
if running:
running = False
logger.info("Email processing loop stopped.")
return "Stopped"
else:
return "Already stopped"
if __name__ == "__main__":
logger.info("Starting FastAPI server...")
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)