| import torch |
| from flask import Flask, render_template, request, jsonify, redirect |
| import json |
| import os |
| from transformers import pipeline |
| from gtts import gTTS |
| from pydub import AudioSegment |
| from pydub.silence import detect_nonsilent |
| from transformers import AutoConfig |
| import time |
| from waitress import serve |
| from simple_salesforce import Salesforce |
| import requests |
|
|
| app = Flask(__name__) |
|
|
| |
| device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| |
| config = AutoConfig.from_pretrained("openai/whisper-small") |
| config.update({"timeout": 60}) |
|
|
| |
| try: |
| print("Attempting to connect to Salesforce...") |
| sf = Salesforce(username='diggavalli98@gmail.com', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') |
| print("Connected to Salesforce successfully!") |
| print("User Info:", sf.UserInfo) |
| except Exception as e: |
| print(f"Failed to connect to Salesforce: {str(e)}") |
|
|
|
|
| |
| def create_salesforce_record(sf, name, email, phone_number): |
| try: |
| |
| customer_login = sf.Customer_Login__c.create({ |
| 'Name': name, |
| 'Email__c': email, |
| 'Phone_Number__c': phone_number |
| }) |
| return customer_login |
| except Exception as e: |
| raise Exception(f"Failed to create record: {str(e)}") |
|
|
|
|
| def get_menu_items(sf): |
| query = "SELECT Name, Price__c, Ingredients__c, Category__c FROM Menu_Item__c" |
| result = sf.query(query) |
| return result['records'] |
|
|
| |
| def generate_audio_prompt(text, filename): |
| try: |
| tts = gTTS(text) |
| tts.save(os.path.join("static", filename)) |
| except gtts.tts.gTTSError as e: |
| print(f"Error: {e}") |
| print("Retrying after 5 seconds...") |
| time.sleep(5) |
| generate_audio_prompt(text, filename) |
|
|
| |
| def convert_to_wav(input_path, output_path): |
| try: |
| audio = AudioSegment.from_file(input_path) |
| audio = audio.set_frame_rate(16000).set_channels(1) |
| audio.export(output_path, format="wav") |
| except Exception as e: |
| print(f"Error: {str(e)}") |
| raise Exception(f"Audio conversion failed: {str(e)}") |
|
|
| def is_silent_audio(audio_path): |
| audio = AudioSegment.from_wav(audio_path) |
| nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) |
| print(f"Detected nonsilent parts: {nonsilent_parts}") |
| return len(nonsilent_parts) == 0 |
|
|
| |
| @app.route("/") |
| def index(): |
| return render_template("index.html") |
|
|
| @app.route("/dashboard", methods=["GET"]) |
| def dashboard(): |
| return render_template("dashboard.html") |
|
|
| @app.route('/submit', methods=['POST']) |
| def submit(): |
| try: |
| |
| data = request.get_json() |
| print("Received Data:", data) |
|
|
| if not data: |
| return jsonify({"success": False, "message": "Invalid or empty JSON received"}), 400 |
|
|
| |
| name = data.get("name") |
| email = data.get("email") |
| phone = data.get("phone") |
|
|
| |
| if not name or not email or not phone: |
| return jsonify({"success": False, "message": "Missing required fields"}), 400 |
|
|
| |
| salesforce_data = { |
| "Name": name, |
| "Email__c": email, |
| "Phone_Number__c": phone |
| } |
|
|
| |
| try: |
| result = sf.Customer_Login__c.create(salesforce_data) |
| print(f"Salesforce Response: {result}") |
|
|
| return jsonify({"success": True, "message": "Data submitted successfully"}) |
| |
| except Exception as e: |
| print(f"Salesforce Insertion Error: {str(e)}") |
| return jsonify({"success": False, "message": "Salesforce submission failed", "error": str(e)}), 500 |
|
|
| except Exception as e: |
| print(f"Server Error: {str(e)}") |
| return jsonify({"success": False, "message": "Internal server error", "error": str(e)}), 500 |
| |
| @app.route("/menu", methods=["GET"]) |
| def menu_page(): |
| menu_items = get_menu_items(sf) |
| menu_data = [{"name": item['Name'], "price": item['Price__c'], "ingredients": item['Ingredients__c'], "category": item['Category__c']} for item in menu_items] |
| return render_template("menu_page.html", menu_items=menu_data) |
|
|
| |
| @app.route("/order", methods=["POST"]) |
| def place_order(): |
| item_name = request.json.get('item_name') |
| quantity = request.json.get('quantity') |
| order_data = {"Item__c": item_name, "Quantity__c": quantity} |
| sf.Order__c.create(order_data) |
| return jsonify({"success": True, "message": f"Order for {item_name} placed successfully."}) |
|
|
| |
| @app.route("/cart", methods=["GET"]) |
| def cart(): |
| cart_items = [] |
| return render_template("cart_page.html", cart_items=cart_items) |
|
|
| |
| @app.route("/order-summary", methods=["GET"]) |
| def order_summary(): |
| order_details = [] |
| return render_template("order_summary.html", order_details=order_details) |
|
|
| @app.route("/transcribe", methods=["POST"]) |
| def transcribe(): |
| if "audio" not in request.files: |
| print("No audio file provided") |
| return jsonify({"error": "No audio file provided"}), 400 |
|
|
| audio_file = request.files["audio"] |
| input_audio_path = os.path.join("static", "temp_input.wav") |
| output_audio_path = os.path.join("static", "temp.wav") |
| audio_file.save(input_audio_path) |
|
|
| try: |
| |
| convert_to_wav(input_audio_path, output_audio_path) |
|
|
| |
| if is_silent_audio(output_audio_path): |
| return jsonify({"error": "No speech detected. Please try again."}), 400 |
| else: |
| print("Audio contains speech, proceeding with transcription.") |
|
|
| |
| result = None |
| retry_attempts = 3 |
| for attempt in range(retry_attempts): |
| try: |
| result = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1, config=config) |
| print(f"Transcribed text: {result['text']}") |
| break |
| except requests.exceptions.ReadTimeout: |
| print(f"Timeout occurred, retrying attempt {attempt + 1}/{retry_attempts}...") |
| time.sleep(5) |
|
|
| if result is None: |
| return jsonify({"error": "Unable to transcribe audio after retries."}), 500 |
|
|
| transcribed_text = result["text"].strip().capitalize() |
| print(f"Transcribed text: {transcribed_text}") |
|
|
| |
| parts = transcribed_text.split() |
| name = parts[0] if len(parts) > 0 else "Unknown Name" |
| email = parts[1] if '@' in parts[1] else "unknown@domain.com" |
| phone_number = parts[2] if len(parts) > 2 else "0000000000" |
| print(f"Parsed data - Name: {name}, Email: {email}, Phone Number: {phone_number}") |
|
|
| |
| confirmation = f"Is this correct? Name: {name}, Email: {email}, Phone: {phone_number}" |
| generate_audio_prompt(confirmation, "confirmation.mp3") |
|
|
| |
| user_confirms = True |
|
|
| if user_confirms: |
| |
| salesforce_response = create_salesforce_record(name, email, phone_number) |
|
|
| |
| print(f"Salesforce record creation response: {salesforce_response}") |
|
|
| |
| if "error" in salesforce_response: |
| print(f"Error creating record in Salesforce: {salesforce_response['error']}") |
| return jsonify(salesforce_response), 500 |
|
|
| return jsonify({"text": transcribed_text, "salesforce_record": salesforce_response}) |
|
|
| except Exception as e: |
| print(f"Error in transcribing or processing: {str(e)}") |
| return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500 |
|
|
| |
| if __name__ == "__main__": |
| serve(app, host="0.0.0.0", port=7860) |
|
|