Arabic_NLP / updated_api.py
rakib72642's picture
updated pattern
493d4f4
import asyncio
import json
import re
from typing import List, Union
import aiofiles
import uvicorn
import nltk
import httpx
from fastapi import FastAPI
from pydantic import BaseModel
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import logging
import pytz
from datetime import datetime
from api_secrets import API_KEY_ASSEMBLYAI
# logging.basicConfig(filename0="BAT_NLP_Campaign.log",
# filemode='w')
# logger = logging.getLogger("BAT")
# logger.setLevel(logging.DEBUG)
# file_handler = logging.FileHandler("BAT_NLP_Campaign.log")
# logger.addHandler(file_handler)
# total_done = 0
# total_error = 0
def get_bd_time():
bd_timezone = pytz.timezone("Asia/Dhaka")
time_now = datetime.now(bd_timezone)
current_time = time_now.strftime("%I:%M:%S %p")
return current_time
app = FastAPI()
CHUNK_SIZE = 5_242_880 # 5MB
upload_endpoint = 'https://api.assemblyai.com/v2/upload'
transcript_endpoint = 'https://api.assemblyai.com/v2/transcript'
headers_auth_only = {'authorization': API_KEY_ASSEMBLYAI}
headers = {
"authorization": API_KEY_ASSEMBLYAI,
"content-type": "application/json"
}
class Item(BaseModel):
url: str
async def lemmatize_and_clean(text):
words = nltk.word_tokenize(text.lower())
words = [word for word in words if word.isalpha() and word not in set(stopwords.words('english'))]
lemmatizer = WordNetLemmatizer()
words = [await asyncio.to_thread(lemmatizer.lemmatize, word) for word in words]
return ' '.join(words)
patterns = {
"Clear":r"\b(((c|k)(?:(l..r|lir|lar|il))))",
"Confidence":r"\b((((f|t|th)(u|i))(?:(|c|q|k|ck)(?:(a|e|o))))|(((f|t|th)(u|i))(?:(|c|q|k|ck)(?:a|.a))))",
"Revival":r"\b(((a)(?:(sh|yush|rch))))",
"Anti-Dandruff":r"\b(((al)(?:.*?(k|q|kh)(?:a|i|o|u)(?:(s|sh))))|((k|q|kh)(?:a|i|o|u)(?:(s|sh)((?:a|o))))|((k|q|kh)(?:a|i|o|u)(?:(s|sh)(r(?:a|o|u)))))",
}
async def nlp_bat(text):
results = {}
all_match = {}
for name, pattern in patterns.items():
matches = re.findall(pattern, text, re.IGNORECASE)
all_match[name] = matches
results[name] = len(matches)
print(all_match)
return results
async def read_file(filename):
async with aiofiles.open(filename, 'rb') as f:
while True:
data = await f.read(CHUNK_SIZE)
if not data:
break
yield data
async def upload(filename):
async with httpx.AsyncClient() as client:
async for data in read_file(filename):
upload_response = await client.post(upload_endpoint, headers=headers_auth_only, data=data)
return upload_response.json()['upload_url']
async def transcribe(audio_url):
transcript_request = {'audio_url': audio_url}
async with httpx.AsyncClient() as client:
transcript_response = await client.post(transcript_endpoint, json=transcript_request, headers=headers)
return transcript_response.json()['id']
async def poll(transcript_id):
polling_endpoint = f'{transcript_endpoint}/{transcript_id}'
async with httpx.AsyncClient() as client:
polling_response = await client.get(polling_endpoint, headers=headers)
return polling_response.json()
async def get_transcription_result_url(url):
transcribe_id = await transcribe(url)
while True:
data = await poll(transcribe_id)
if data['status'] == 'completed':
return data, None
elif data['status'] == 'error':
return data, data['error']
print("Processing Audio")
await asyncio.sleep(2)
async def detect_audio(url, title):
data, error = await get_transcription_result_url(url)
text_det = data['text']
print("main text : ", text_det)
lmtz = await lemmatize_and_clean(text_det)
print("Clean text : ", lmtz)
txt = lmtz.lower()
r = await nlp_bat(txt)
return r
async def process_item(item: Item):
try:
print(item.url)
result = await detect_audio(item.url, title="file")
result = json.dumps(result)
return json.loads(result)
finally:
pass
async def process_items(items: Union[Item, List[Item]]):
if isinstance(items, list):
coroutines = [process_item(item) for item in items]
results_dict = await asyncio.gather(*coroutines)
results = {}
for item in results_dict:
results.update(item)
else:
results = await process_item(items)
return results
@app.post("/nlp")
async def create_items(items: Union[Item, List[Item]]):
try:
results = await process_items(items)
print("Result Sent to User:", results)
return results
except Exception as e:
# global total_error
# total_error += 1
# logger.info(f"Time:{get_bd_time()}, Execution Failed and Total Failed Execution : {total_error}, Payload:{items}, Response : {results}")
# logger.error(str(e))
return {"AI": f"Error: {str(e)}"}
finally:
# global total_done
# total_done +=1
# logger.info(f"Time:{get_bd_time()}, Execution Done and Total Successfull Execution : {total_done}, Payload:{items}, Response : {results}")
pass
if __name__ == "__main__":
try:
uvicorn.run(app, host="127.0.0.1", port=1111)
finally:
pass