|
|
import asyncio |
|
|
import json |
|
|
import re |
|
|
from typing import List, Union |
|
|
|
|
|
import aiofiles |
|
|
import uvicorn |
|
|
import nltk |
|
|
import httpx |
|
|
from fastapi import FastAPI |
|
|
from pydantic import BaseModel |
|
|
from nltk.corpus import stopwords |
|
|
from nltk.stem import WordNetLemmatizer |
|
|
import logging |
|
|
import pytz |
|
|
from datetime import datetime |
|
|
from api_secrets import API_KEY_ASSEMBLYAI |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_bd_time(): |
|
|
bd_timezone = pytz.timezone("Asia/Dhaka") |
|
|
time_now = datetime.now(bd_timezone) |
|
|
current_time = time_now.strftime("%I:%M:%S %p") |
|
|
return current_time |
|
|
|
|
|
app = FastAPI() |
|
|
|
|
|
CHUNK_SIZE = 5_242_880 |
|
|
|
|
|
upload_endpoint = 'https://api.assemblyai.com/v2/upload' |
|
|
transcript_endpoint = 'https://api.assemblyai.com/v2/transcript' |
|
|
|
|
|
headers_auth_only = {'authorization': API_KEY_ASSEMBLYAI} |
|
|
|
|
|
headers = { |
|
|
"authorization": API_KEY_ASSEMBLYAI, |
|
|
"content-type": "application/json" |
|
|
} |
|
|
|
|
|
|
|
|
class Item(BaseModel): |
|
|
url: str |
|
|
|
|
|
|
|
|
async def lemmatize_and_clean(text): |
|
|
words = nltk.word_tokenize(text.lower()) |
|
|
words = [word for word in words if word.isalpha() and word not in set(stopwords.words('english'))] |
|
|
lemmatizer = WordNetLemmatizer() |
|
|
words = [await asyncio.to_thread(lemmatizer.lemmatize, word) for word in words] |
|
|
return ' '.join(words) |
|
|
|
|
|
|
|
|
patterns = { |
|
|
"Clear":r"\b(((c|k)(?:(l..r|lir|lar|il))))", |
|
|
"Confidence":r"\b((((f|t|th)(u|i))(?:(|c|q|k|ck)(?:(a|e|o))))|(((f|t|th)(u|i))(?:(|c|q|k|ck)(?:a|.a))))", |
|
|
"Revival":r"\b(((a)(?:(sh|yush|rch))))", |
|
|
"Anti-Dandruff":r"\b(((al)(?:.*?(k|q|kh)(?:a|i|o|u)(?:(s|sh))))|((k|q|kh)(?:a|i|o|u)(?:(s|sh)((?:a|o))))|((k|q|kh)(?:a|i|o|u)(?:(s|sh)(r(?:a|o|u)))))", |
|
|
} |
|
|
|
|
|
async def nlp_bat(text): |
|
|
results = {} |
|
|
all_match = {} |
|
|
for name, pattern in patterns.items(): |
|
|
matches = re.findall(pattern, text, re.IGNORECASE) |
|
|
all_match[name] = matches |
|
|
results[name] = len(matches) |
|
|
|
|
|
print(all_match) |
|
|
return results |
|
|
|
|
|
|
|
|
async def read_file(filename): |
|
|
async with aiofiles.open(filename, 'rb') as f: |
|
|
while True: |
|
|
data = await f.read(CHUNK_SIZE) |
|
|
if not data: |
|
|
break |
|
|
yield data |
|
|
|
|
|
|
|
|
async def upload(filename): |
|
|
async with httpx.AsyncClient() as client: |
|
|
async for data in read_file(filename): |
|
|
upload_response = await client.post(upload_endpoint, headers=headers_auth_only, data=data) |
|
|
return upload_response.json()['upload_url'] |
|
|
|
|
|
|
|
|
async def transcribe(audio_url): |
|
|
transcript_request = {'audio_url': audio_url} |
|
|
async with httpx.AsyncClient() as client: |
|
|
transcript_response = await client.post(transcript_endpoint, json=transcript_request, headers=headers) |
|
|
return transcript_response.json()['id'] |
|
|
|
|
|
|
|
|
async def poll(transcript_id): |
|
|
polling_endpoint = f'{transcript_endpoint}/{transcript_id}' |
|
|
async with httpx.AsyncClient() as client: |
|
|
polling_response = await client.get(polling_endpoint, headers=headers) |
|
|
return polling_response.json() |
|
|
|
|
|
|
|
|
async def get_transcription_result_url(url): |
|
|
transcribe_id = await transcribe(url) |
|
|
while True: |
|
|
data = await poll(transcribe_id) |
|
|
if data['status'] == 'completed': |
|
|
return data, None |
|
|
elif data['status'] == 'error': |
|
|
return data, data['error'] |
|
|
print("Processing Audio") |
|
|
await asyncio.sleep(2) |
|
|
|
|
|
|
|
|
async def detect_audio(url, title): |
|
|
data, error = await get_transcription_result_url(url) |
|
|
text_det = data['text'] |
|
|
print("main text : ", text_det) |
|
|
lmtz = await lemmatize_and_clean(text_det) |
|
|
print("Clean text : ", lmtz) |
|
|
txt = lmtz.lower() |
|
|
r = await nlp_bat(txt) |
|
|
return r |
|
|
|
|
|
|
|
|
async def process_item(item: Item): |
|
|
try: |
|
|
print(item.url) |
|
|
result = await detect_audio(item.url, title="file") |
|
|
result = json.dumps(result) |
|
|
return json.loads(result) |
|
|
finally: |
|
|
pass |
|
|
|
|
|
|
|
|
async def process_items(items: Union[Item, List[Item]]): |
|
|
if isinstance(items, list): |
|
|
coroutines = [process_item(item) for item in items] |
|
|
results_dict = await asyncio.gather(*coroutines) |
|
|
results = {} |
|
|
for item in results_dict: |
|
|
results.update(item) |
|
|
else: |
|
|
results = await process_item(items) |
|
|
return results |
|
|
|
|
|
|
|
|
@app.post("/nlp") |
|
|
async def create_items(items: Union[Item, List[Item]]): |
|
|
try: |
|
|
results = await process_items(items) |
|
|
print("Result Sent to User:", results) |
|
|
return results |
|
|
except Exception as e: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return {"AI": f"Error: {str(e)}"} |
|
|
finally: |
|
|
|
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
try: |
|
|
uvicorn.run(app, host="127.0.0.1", port=1111) |
|
|
finally: |
|
|
pass |
|
|
|