File size: 6,026 Bytes
d2cc651
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
# files after part 2
import requests
import time
from api_secrets import API_KEY_ASSEMBLYAI
import re
from fastapi import FastAPI
from pydantic import BaseModel
import asyncio
from typing import List, Union
import uvicorn
import json
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import string

# nltk.download('punkt')
# nltk.download('stopwords')
# nltk.download('wordnet')



app = FastAPI()

class Item(BaseModel):
    url: str

upload_endpoint = 'https://api.assemblyai.com/v2/upload'
transcript_endpoint = 'https://api.assemblyai.com/v2/transcript'

headers_auth_only = {'authorization': API_KEY_ASSEMBLYAI}

headers = {
    "authorization": API_KEY_ASSEMBLYAI,
    "content-type": "application/json"
}

CHUNK_SIZE = 5_242_880  # 5MB

def lemmatize_and_clean(text):
    # Tokenize the text into words
    words = nltk.word_tokenize(text)

    # Remove punctuation and convert to lowercase
    words = [word.lower() for word in words if word.isalpha()]

    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    words = [word for word in words if word not in stop_words]

    # Lemmatize the words
    lemmatizer = WordNetLemmatizer()
    words = [lemmatizer.lemmatize(word) for word in words]

    # Join the words back into a cleaned text
    cleaned_text = ' '.join(words)

    return cleaned_text

# Patterns
# patterns = {
#     'smoker': r"sm.k.r|s.m.k.r\b",
#     'dhumpai': r"d.m.a.|d..mp..|.om.a.|umpa.\b",
#     'alchemy': r"al.k.m|.lch.m.\b",
#     'benson': r"..ns.n\b",
#     'goldleaf': r"go.lb|gol..lea.|g.l...|g.l../b",
#     'dunhil': r"d.n.h.l|d.nh.l|.an.i.l|.an.i.l\b",
#     'smooth': r".m..th|sm.d\b",
#     'thanda_flvr': r"th.nd..fl.v|t.nd...fl.v|th.nd...fl.v|t.nd..fl.v|..de.fl.v|.and..fl.v|..anda.fl..\b",
#     'best_tobacco': r".est.t.b..|.est..a.o|.est.o.a.o|.est.o.\b"
# }
# patterns = {
#     'Unique Capsule': r"unique capsul|unit capsul|uniq...capsul|uni..capsul\b",
#     'Refreshing Taste and Smell': r"refreshing taste smell|refreshing taste milk\b",
#     'Benson & Hadges Breeze': r"benson he.es breez|benson hess breez|benson he..e breez|benson haze breez|benson hezes bee|banson breez|banson hedge breathe|banson hedge bridge|benson hedge bre|benson hedge bridge\b"
# }

# patterns = {
#     'Unique Capsule': r"unique capsul|unit capsul|uniq...capsul|uni..capsul\b",
#     'Refreshing Taste and Smell': r"refreshing taste smell|refreshing taste milk|refreshing test smell|ripe singh taste|repressing taste smell\b",
#     'Benson & Hadges Breeze': r"benson.hage.bree|benson.hage..bree|banson.hage.bree|banson.hage..bree|benson he.es breez|benson hess breez|benson he..e breez|benson haze breez|benson hezes bee|banson breez|banson hedge breathe|banson hedge bridge|benson hedge bre|benson hedge bridge| benson haze brie|banson haze breeze|banson hedge breez\b"
# }
patterns = {
    'Unique Capsule': r'\b(?:uni(?:que)?|unit|uniq\.+|uni\.+)\s*capsul',
    'Refreshing Taste and Smell': r'\b(?:refreshing|ripe|repressing)\s+(?:taste\s+(?:smell|milk)|test\s+smell)\b',
    'Benson & Hadges Breeze': r'\b(?:benson\s+h(?:ess|aze|ezes|edge)\s+breez|banson\s+(?:haze\s+breez|hedge\s+(?:breez|bre))|benson\s+h(?:aze\s+brie|edge\s+bridge))\b',
}
# Find and count matches for each pattern
def nlp_bat(text):
    results = {}
    all_match = {}
    for name, pattern in patterns.items():
        matches = re.findall(pattern, text, re.IGNORECASE)
        m = {name:matches}
        all_match.update(m)
        count = len(matches)
        results[name] = count
    
    
    print(all_match)    

    return results






def upload(filename):
    def read_file(filename):
        with open(filename, 'rb') as f:
            while True:
                data = f.read(CHUNK_SIZE)
                if not data:
                    break
                yield data

    upload_response = requests.post(upload_endpoint, headers=headers_auth_only, data=read_file(filename))
    return upload_response.json()['upload_url']


def transcribe(audio_url):
    transcript_request = {
        'audio_url': audio_url
    }

    transcript_response = requests.post(transcript_endpoint, json=transcript_request, headers=headers)
    return transcript_response.json()['id']

        
def poll(transcript_id):
    polling_endpoint = transcript_endpoint + '/' + transcript_id
    polling_response = requests.get(polling_endpoint, headers=headers)
    return polling_response.json()


def get_transcription_result_url(url):
    transcribe_id = transcribe(url)
    while True:
        data = poll(transcribe_id)
        if data['status'] == 'completed':
            return data, None
        elif data['status'] == 'error':
            return data, data['error']
            
        print("Processing Audio")
        time.sleep(2)
        
        
def detect_audio(url, title):
    data, error = get_transcription_result_url(url)
    text_det = data['text']
    lmtz = lemmatize_and_clean(text_det)
    print(lmtz)
    txt = lmtz.lower()
    r = nlp_bat(txt)
    # print(txt)
    # print(r)
    return r


async def process_item(item: Item):
    try:
        print(item.url)
        result = detect_audio(item.url,title="file")
        result = json.dumps(result)
        res = json.loads(result)
        return res
    finally:
        pass

async def process_items(items: Union[Item, List[Item]]):
    if isinstance(items, list):
        coroutines = [process_item(item) for item in items]
        results_dict = await asyncio.gather(*coroutines)
        results = {}
        for item in results_dict:
            results.update(item)
    else:
        results = await process_item(items)
    return results
 
@app.post("/nlp")
async def create_items(items: Union[Item, List[Item]]):
    try:
        results = await process_items(items)
        print("Result Sent to User:", results)
        return results
    finally:
        pass

if __name__ == "__main__":
    try:
        uvicorn.run(app, host="127.0.0.1", port=8020)
    finally:
        pass