Spaces:
Runtime error
Runtime error
| import cv2 | |
| import numpy as np | |
| import requests | |
| import base64 | |
| import time | |
| import websockets | |
| import asyncio | |
| import cv2 | |
| import json | |
| import os | |
| import imutils | |
| class CaesarSendWeb: | |
| def send_video_https(self,uri = "http://192.168.0.10:7860/caesarobjectdetect"): | |
| cap = cv2.VideoCapture(0) | |
| while True: | |
| _, image = cap.read() | |
| #uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect" | |
| response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]}) | |
| valresp = response.json() | |
| imagebase64 = np.array(valresp["frame"]) | |
| image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3) | |
| cv2.imshow("image", image) | |
| if ord("q") == cv2.waitKey(1): | |
| break | |
| cap.release() | |
| cv2.destroyAllWindows() | |
| def send_video_websocket(self,uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws',jsondata=None,phone=False): | |
| if phone == False: | |
| camera = cv2.VideoCapture(0) | |
| async def main(): | |
| # Connect to the server | |
| #uri = 'wss://palondomus-caesarai.hf.space/sendvideows' | |
| #uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws' | |
| async with websockets.connect(uri) as ws: | |
| while True: | |
| if phone == False: | |
| success, frame = camera.read() | |
| elif phone == True: | |
| img_resp = requests.get("http://192.168.0.14:8080/shot.jpg") | |
| frame_arr = np.array(bytearray(img_resp.content),dtype=np.uint8) | |
| frame = cv2.imdecode(frame_arr,-1) | |
| frame = imutils.resize(frame,width=500,height=600) | |
| success = True | |
| #print(frame.shape) | |
| #1080, 1920, 3) | |
| #print(success) | |
| #print(frame.shape) | |
| if not success: | |
| break | |
| else: | |
| #print("hi") | |
| ret, buffer = cv2.imencode('.png', frame) | |
| await ws.send(buffer.tobytes()) | |
| if jsondata: | |
| await ws.send(json.dumps(jsondata)) | |
| contents = await ws.recv() | |
| if type(contents) == bytes: | |
| arr = np.frombuffer(contents, np.uint8) | |
| frameobj = cv2.imdecode(arr, cv2.IMREAD_UNCHANGED) | |
| cv2.imshow('frame',frameobj) | |
| cv2.waitKey(1) | |
| if type(contents) == str: | |
| print(json.loads(contents)) | |
| asyncio.run(main()) | |
| def send_image_recieve_text(self,uri ="http://192.168.0.10:7860/caesarocr",showimage=False): | |
| cap = cv2.VideoCapture(0) | |
| _, image = cap.read() | |
| #uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect" | |
| response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]}) | |
| messageresp = response.json() | |
| if showimage == True: | |
| cv2.imshow('frame',image) | |
| cv2.waitKey(0) | |
| # closing all open windows | |
| cv2.destroyAllWindows() | |
| return messageresp | |
| def send_image_recieve_image(self,uri ="http://192.168.0.10:7860/caesarfacesnap",showimage=False,saveimage=False): | |
| cap = cv2.VideoCapture(0) | |
| _, image = cap.read() | |
| #uri = "http://palondomus-caesarai.hf.space/caesarobjectdetect" | |
| response = requests.post(uri,json={"frame":base64.b64encode(image).decode(),"shape":[image.shape[0],image.shape[1]]}) | |
| valresp = response.json() | |
| if valresp["frame"] == "no face was detected.": | |
| print("No face was detected.") | |
| if valresp["frame"] != "no face was detected.": | |
| imagebase64 = np.array(valresp["frame"]) | |
| image = np.frombuffer(base64.b64decode(imagebase64),dtype="uint8").reshape(valresp["shape"][0],valresp["shape"][1],3) | |
| if showimage == True: | |
| cv2.imshow('frame',image) | |
| cv2.waitKey(0) | |
| # closing all open windows | |
| cv2.destroyAllWindows() | |
| if saveimage == True: | |
| filename = "croppedimage.png" | |
| count = 0 | |
| while filename in os.listdir("CaesarFaceDetection/FaceDataPoints/"): | |
| filename = f"{filename.replace('.png','').replace(f'{count-1}','')}{count}.png" | |
| count +=1 | |
| cv2.imwrite(f"CaesarFaceDetection/FaceDataPoints/{filename}", image) | |
| return image | |
| if __name__ == "__main__": | |
| #success, frame = camera.read() | |
| #yolo_uri = 'wss://palondomus-caesarai.hf.space/caesarobjectdetectws' | |
| #CaesarSendWeb.send_video_websocket(uri = yolo_uri) | |
| #video_uri = 'wss://palondomus-caesarai.hf.space/sendvideows' | |
| #CaesarSendWeb.send_video_websocket(uri = video_uri) | |
| #extraction_uri = "wss://palondomus-caesarai.hf.space/caesarocrextractionws" | |
| #CaesarSendWeb.send_video_websocket(uri = extraction_uri,jsondata={"target_words":["your","brain","power"]}) | |
| #ocrws_uri = "wss://palondomus-caesarai.hf.space/caesarocrws" | |
| #CaesarSendWeb.send_video_websocket(uri = ocrws_uri) | |
| #ocr_uri = "http://192.168.0.10:7860/caesarocr" | |
| #message = CaesarSendWeb.send_image_recieve_text(uri = ocr_uri,showimage=True) | |
| #print(message) | |
| facedetect_uri = "wss://palondomus-caesarai.hf.space/caesarfacedetectws"#"wss://palondomus-caesarai.hf.space/caesarfacedetectws" | |
| CaesarSendWeb.send_video_websocket(uri = facedetect_uri,phone=True) | |
| #facesnap_uri = "http://192.168.0.10:7860/caesarfacesnap" | |
| #cropped_image = CaesarSendWeb.send_image_recieve_image(uri = facesnap_uri,saveimage=True) | |
| #print(cropped_image) | |
| #https_uri = 'http://192.168.0.10:7860/caesarobjectdetect' | |
| #CaesarSendWeb.send_video_https(uri = https_uri) | |