Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, Response | |
| from flask_socketio import SocketIO,emit | |
| import cv2 | |
| import base64 | |
| import numpy as np | |
| import firebase_admin | |
| from firebase_admin import credentials,firestore | |
| from datetime import datetime | |
| import pytz | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'secret!' | |
| socket = SocketIO(app,async_mode="eventlet") | |
| image_ = None | |
| time = None | |
| cred = credentials.Certificate("./cognitveaid-firebase-adminsdk-j24nr-49a0ff10ce.json") | |
| firebase_admin.initialize_app(cred) | |
| fire = firestore.client() | |
| def base64_to_image(base64_string): | |
| # Extract the base64 encoded binary data from the input string | |
| base64_data = base64_string.split(",")[1] | |
| # Decode the base64 data to bytes | |
| image_bytes = base64.b64decode(base64_data) | |
| # Convert the bytes to numpy array | |
| image_array = np.frombuffer(image_bytes, dtype=np.uint8) | |
| # Decode the numpy array as an image using OpenCV | |
| image = cv2.imdecode(image_array, cv2.IMREAD_COLOR) | |
| return image | |
| def test_connect(): | |
| print("Connected") | |
| emit("my response", {"data": "Connected"}) | |
| def receive_image(image): | |
| # Decode the base64-encoded image data | |
| try: | |
| global time | |
| #handel time | |
| time = datetime.now(pytz.timezone('Asia/Kolkata')) | |
| time.strftime("%Y-%m-%d %H:%M:%S") | |
| global image_ | |
| #handel image | |
| fire.collection("video").document(str(time)).set({"data":image}) | |
| image_ = base64_to_image(image) | |
| emit('result',{"success":True}) | |
| except: | |
| emit('result',{"success":False}) | |
| def gen_frame(): | |
| global image_ | |
| global time | |
| if image_ is None: | |
| # Create a placeholder image if image_ is not yet initialized | |
| # placeholder_image = np.zeros((480, 640, 3), dtype=np.uint8) | |
| # ret, buffer = cv2.imencode('.jpg', placeholder_image) | |
| # print("fail") | |
| # yield (b'--frame\r\n' | |
| # b'Content-Type: image/jpeg\r\n\r\n' + buffer.tobytes() + b'\r\n' | |
| # b'Timestamp: ' + str(time).encode() + b'\r\n') | |
| yield (b'no_input') | |
| else: | |
| try: | |
| ret, buffer = cv2.imencode('.jpg', image_) | |
| frame = buffer.tobytes() | |
| yield (b'--frame\r\n' | |
| b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n' | |
| b'Timestamp: ' + str(time).encode() + b'\r\n') | |
| finally: | |
| image_ = None | |
| def video(): | |
| return Response(gen_frame(), mimetype='multipart/x-mixed-replace; boundary=frame') | |
| def index(): | |
| return render_template('index.html') | |
| if __name__ == '__main__': | |
| socket.run(app,port="7860",host="0.0.0.0") | |