|
|
from io import BytesIO |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import requests |
|
|
from PIL import Image |
|
|
from flask import Flask, request, jsonify |
|
|
import ddddocr |
|
|
import logging |
|
|
import re |
|
|
import base64 |
|
|
|
|
|
app = Flask(__name__) |
|
|
|
|
|
|
|
|
logging.basicConfig(filename='app.log', level=logging.DEBUG, |
|
|
format='%(asctime)s %(levelname)s %(name)s %(threadName)s : %(message)s') |
|
|
|
|
|
|
|
|
def get_image_bytes(image_data): |
|
|
if isinstance(image_data, bytes): |
|
|
return image_data |
|
|
elif image_data.startswith('http'): |
|
|
response = requests.get(image_data, verify=False) |
|
|
response.raise_for_status() |
|
|
return response.content |
|
|
elif isinstance(image_data, str): |
|
|
return base64.b64decode(image_data) |
|
|
else: |
|
|
raise ValueError("Unsupported image data type") |
|
|
|
|
|
def image_to_base64(image, format='PNG'): |
|
|
buffered = BytesIO() |
|
|
image.save(buffered, format=format) |
|
|
img_str = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
return img_str |
|
|
|
|
|
class CAPTCHA: |
|
|
def __init__(self): |
|
|
|
|
|
self.ocr = ddddocr.DdddOcr() |
|
|
self.det = ddddocr.DdddOcr(det=True) |
|
|
|
|
|
|
|
|
def capcode(self, sliding_image, back_image, simple_target): |
|
|
try: |
|
|
sliding_bytes = get_image_bytes(sliding_image) |
|
|
back_bytes = get_image_bytes(back_image) |
|
|
res = self.ocr.slide_match(sliding_bytes, back_bytes, simple_target=simple_target) |
|
|
return res['target'][0] |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def slideComparison(self, sliding_image, back_image): |
|
|
try: |
|
|
sliding_bytes = get_image_bytes(sliding_image) |
|
|
back_bytes = get_image_bytes(back_image) |
|
|
res = self.ocr.slide_comparison(sliding_bytes, back_bytes) |
|
|
return res['target'][0] |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def classification(self, image): |
|
|
try: |
|
|
image_bytes = get_image_bytes(image) |
|
|
res = self.ocr.classification(image_bytes) |
|
|
return res |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def detection(self, image): |
|
|
try: |
|
|
image_bytes = get_image_bytes(image) |
|
|
poses = self.det.detection(image_bytes) |
|
|
return poses |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def calculate(self, image): |
|
|
try: |
|
|
image_bytes = get_image_bytes(image) |
|
|
expression = self.ocr.classification(image_bytes) |
|
|
expression = re.sub('=.*$', '', expression) |
|
|
expression = re.sub('[^0-9+\-*/()]', '', expression) |
|
|
result = eval(expression) |
|
|
return result |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
app.logger.error(f"错误类型: {type(e)}") |
|
|
app.logger.error(f"错误详细信息: {e.args}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
def crop(self, image, y_coordinate): |
|
|
try: |
|
|
image = Image.open(BytesIO(requests.get(image).content)) |
|
|
|
|
|
upper_half = image.crop((0, 0, image.width, y_coordinate)) |
|
|
middle_half = image.crop((0, y_coordinate, image.width, y_coordinate*2)) |
|
|
lower_half = image.crop((0, y_coordinate*2, image.width, image.height)) |
|
|
|
|
|
slidingImage = image_to_base64(upper_half) |
|
|
backImage = image_to_base64(lower_half) |
|
|
return jsonify({'slidingImage': slidingImage, 'backImage': backImage}) |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
app.logger.error(f"错误类型: {type(e)}") |
|
|
app.logger.error(f"错误详细信息: {e.args}") |
|
|
return None |
|
|
|
|
|
|
|
|
def select(self, image): |
|
|
try: |
|
|
image_bytes = get_image_bytes(image) |
|
|
|
|
|
image_array = np.frombuffer(image_bytes, dtype=np.uint8) |
|
|
|
|
|
im = cv2.imdecode(image_array, cv2.IMREAD_COLOR) |
|
|
bboxes = self.det.detection(image_bytes) |
|
|
json = [] |
|
|
for bbox in bboxes: |
|
|
x1, y1, x2, y2 = bbox |
|
|
x1, y1, x2, y2 = int(x1), int(y1), int(x2), int(y2) |
|
|
cropped_image = im[y1:y2, x1:x2] |
|
|
|
|
|
_, buffer = cv2.imencode('.png', cropped_image) |
|
|
|
|
|
image_base64 = base64.b64encode(buffer).decode('utf-8') |
|
|
result = self.ocr.classification(image_base64) |
|
|
json.append({result: bbox}) |
|
|
|
|
|
return json |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
app.logger.error(f"错误类型: {type(e)}") |
|
|
app.logger.error(f"错误详细信息: {e.args}") |
|
|
return None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
captcha = CAPTCHA() |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/capcode', methods=['POST']) |
|
|
def capcode(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
sliding_image = data['slidingImage'] |
|
|
back_image = data['backImage'] |
|
|
simple_target = data.get('simpleTarget', True) |
|
|
result = captcha.capcode(sliding_image, back_image, simple_target) |
|
|
if result is None: |
|
|
app.logger.error('处理过程中出现错误.') |
|
|
return jsonify({'error': '处理过程中出现错误.'}), 500 |
|
|
return jsonify({'result': result}) |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return jsonify({'error': f"出现错误: {e}"}), 400 |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/slideComparison', methods=['POST']) |
|
|
def slideComparison(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
sliding_image = data['slidingImage'] |
|
|
back_image = data['backImage'] |
|
|
|
|
|
result = captcha.slideComparison(sliding_image, back_image) |
|
|
if result is None: |
|
|
app.logger.error('处理过程中出现错误.') |
|
|
return jsonify({'error': '处理过程中出现错误.'}), 500 |
|
|
return jsonify({'result': result}) |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return jsonify({'error': f"出现错误: {e}"}), 400 |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/classification', methods=['POST']) |
|
|
def classification(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
image = data['image'] |
|
|
result = captcha.classification(image) |
|
|
if result is None: |
|
|
app.logger.error('处理过程中出现错误.') |
|
|
return jsonify({'error': '处理过程中出现错误.'}), 500 |
|
|
return jsonify({'result': result}) |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return jsonify({'error': f"出现错误: {e}"}), 400 |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/detection', methods=['POST']) |
|
|
def detection(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
image = data['image'] |
|
|
result = captcha.detection(image) |
|
|
if result is None: |
|
|
app.logger.error('处理过程中出现错误.') |
|
|
return jsonify({'error': '处理过程中出现错误.'}), 500 |
|
|
return jsonify({'result': result}) |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return jsonify({'error': f"出现错误: {e}"}), 400 |
|
|
|
|
|
|
|
|
|
|
|
@app.route('/calculate', methods=['POST']) |
|
|
def calculate(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
image = data['image'] |
|
|
result = captcha.calculate(image) |
|
|
if result is None: |
|
|
app.logger.error('处理过程中出现错误.') |
|
|
return jsonify({'error': '处理过程中出现错误.'}), 500 |
|
|
return jsonify({'result': result}) |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return jsonify({'error': f"出现错误: {e}"}), 400 |
|
|
|
|
|
|
|
|
@app.route('/crop', methods=['POST']) |
|
|
def crop(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
image = data['image'] |
|
|
y_coordinate = data['y_coordinate'] |
|
|
result = captcha.crop(image, y_coordinate) |
|
|
if result is None: |
|
|
app.logger.error('处理过程中出现错误.') |
|
|
return jsonify({'error': '处理过程中出现错误.'}), 500 |
|
|
return result |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return jsonify({'error': f"出现错误: {e}"}), 400 |
|
|
|
|
|
|
|
|
@app.route('/select', methods=['POST']) |
|
|
def select(): |
|
|
try: |
|
|
data = request.get_json() |
|
|
image = data['image'] |
|
|
|
|
|
result = captcha.select(image) |
|
|
if result is None: |
|
|
app.logger.error('处理过程中出现错误.') |
|
|
return jsonify({'error': '处理过程中出现错误.'}), 500 |
|
|
return result |
|
|
except Exception as e: |
|
|
app.logger.error(f"出现错误: {e}") |
|
|
return jsonify({'error': f"出现错误: {e}"}), 400 |
|
|
|
|
|
|
|
|
@app.route('/') |
|
|
def hello_world(): |
|
|
return 'API运行成功!' |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
app.run(host='::', threaded=True, debug=True, port=7777) |