File size: 14,043 Bytes
498db6b
 
4955081
f343031
498db6b
 
 
 
 
3ca15d8
 
498db6b
 
 
 
 
 
3ca15d8
498db6b
 
3ca15d8
498db6b
 
 
 
 
 
 
 
 
4955081
498db6b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4955081
498db6b
4955081
498db6b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4955081
 
 
498db6b
 
 
 
 
 
 
 
 
 
 
 
 
 
8e58322
 
 
498db6b
 
 
 
 
 
4955081
 
3ca15d8
 
8e58322
3ca15d8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8e58322
3ca15d8
 
4955081
3ca15d8
 
 
 
 
 
 
 
 
 
8e58322
3ca15d8
 
 
4955081
 
 
 
 
3ca15d8
 
 
 
4955081
 
 
3ca15d8
4955081
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
import asyncio
import os
import shutil
import json
from typing import Dict
import random
import datetime
import string
import docx
import pandas as pd
from src.domain.block import Block
from src.tools.doc_tools import get_title
from src.domain.doc import Doc
from src.domain.wikidoc import WikiPage
from src.view.log_msg import create_msg_from
import src.tools.semantic_db as semantic_db
from src.tools.wiki import Wiki
from src.llm.llm_tools import generate_response_to_exigence
from src.llm.llm_tools import get_wikilist, get_public_paragraph, get_private_paragraph
from src.tools.semantic_db import add_texts_to_collection, query_collection
from src.tools.excel_tools import excel_to_dict
import gradio as gr
from src.retriever.retriever import Retriever

class Controller:

    def __init__(self, config: Dict, client_db, retriever):
        self.templates_path = config['templates_path']
        self.generated_docs_path = config['generated_docs_path']
        self.styled_docs_path = config['styled_docs_path']
        self.excel_doc_path = config['excel_doc_path']
        self.new_docs = []
        self.gen_docs = []
        self.input_csv = ""
        template_path = config['templates_path'] + '/' + config['templates'][config['default_template_index']]
        self.default_template = Doc(template_path)
        self.template = self.default_template
        self.log = []
        self.differences = []
        self.list_differences = []
        self.client_db = client_db
        self.retriever = retriever

    def copy_docs(self, temp_docs: []):
        """
        Initial copy of the incoming document
        + 
        create collection for requirments retrieval
        + 
        Initiate paths 

        TODO: Rename or refactor the function ->  1 mission / function
        TODO: To be tested on several documents
        TODO: Rename create_collection in create_requirement_collection
        """
        doc_names = [doc.name for doc in temp_docs]
        for i in range(len(doc_names)):
            if '/' in doc_names[i]:
                doc_names[i] = doc_names[i].split('/')[-1]
            elif '\\' in doc_names[i]:
                doc_names[i] = doc_names[i].split('\\')[-1]
            doc_names[i] = doc_names[i].split('.')[0]
        docs = [Doc(path=doc.name) for doc in temp_docs]
        self.create_collection(docs)
        style_paths = [f"{self.generated_docs_path}/{dn}_.docx" for dn in doc_names]
        gen_paths = [f"{self.generated_docs_path}/{dn}_e.docx" for dn in doc_names]
        for doc, style_path, gen_path in zip(docs, style_paths, gen_paths):
            new_doc = doc.copy(style_path)
            self.new_docs.append(new_doc)

    def clear_docs(self):
        for new_doc in self.new_docs:
            if os.path.exists(new_doc.path):
                new_doc.clear()
        for gen_doc in self.gen_docs:
            if os.path.exists(gen_doc.path):
                gen_doc.clear()
        self.new_docs = []
        self.gen_docs = []
        self.log = []
        path_to_clear = os.path.abspath(self.generated_docs_path)
        second_path_to_clear = os.path.abspath(self.excel_doc_path)
        [os.remove(f"{path_to_clear}/{doc}") for doc in os.listdir(path_to_clear)]
        [os.remove(f"{second_path_to_clear}/{doc}") for doc in os.listdir(second_path_to_clear)]

    def set_template(self, template_name: str = ""):
        if not template_name:
            self.template = self.default_template
        else:
            template_path = f"{self.templates_path}/{template_name}"
            self.template = Doc(template_path)

    def add_template(self, template_path: str):
        """
        TODO: message to be but in config
        """
        if not template_path:
            return
        elif not template_path.name.endswith(".docx"):
            gr.Warning("Seuls les fichiers .docx sont acceptés")
            return
        doc = docx.Document(template_path.name)
        doc.save(self.templates_path + '/' + get_title(template_path.name))

    def delete_curr_template(self, template_name: str):
        if not template_name:
            return
        os.remove(f"{self.templates_path}/{template_name}")

    def retrieve_number_of_misapplied_styles(self):
        """
        not used: buggy !!
        """
        res = {}
        for new_doc in self.new_docs:
            res[new_doc] = new_doc.retrieve_number_of_misapplied_styles()
        return res
     
    def get_difference_with_template(self):
        self.differences = []
        for new_doc in self.new_docs:
            diff_styles = new_doc.get_different_styles_with_template(template=self.template)
            diff_dicts = [{'doc': new_doc, 'style': s} for s in diff_styles]
            self.differences += diff_dicts
        template_styles = self.template.xdoc.styles
        template_styles = [style for style in template_styles if style.name in self.template.styles.names]
        return self.differences, template_styles
    
    def get_list_styles(self):
        self.list_differences = []
        for new_doc in self.new_docs:
            list_styles = new_doc.get_list_styles()
            all_lists_styles = [{'doc': new_doc, 'list_style': s} for s in list_styles]
            self.list_differences += all_lists_styles
        return self.list_differences

    def map_style(self, this_style_index: int, template_style_name: str):
        """
        maps a style from 'this' document into a style from the template
        """
        #dont make any change if the style is already the same
        diff_dict = self.differences[this_style_index]
        doc = diff_dict['doc']
        this_style_name = diff_dict['style']
        log = doc.copy_one_style(this_style_name, template_style_name, self.template)
        if log:
            self.log.append({doc.name: log})

    def update_list_style(self, this_style_index: int, template_style_name: str):
        """
        maps a style from 'this' document into a style from the template
        """
        #dont make any change if the style is already the same
        diff_dict = self.list_differences[this_style_index]
        doc = diff_dict['doc']
        this_style_name = diff_dict['list_style']
        log = doc.change_bullet_style(this_style_name, template_style_name, self.template)
        if log:
            self.log.append({doc.name: log})

    def update_style(self,index,style_to_modify):
        return self.map_style(index, style_to_modify) if style_to_modify else None

    def apply_template(self, options_list):
        for new_doc in self.new_docs:
            log = new_doc.apply_template(template=self.template, options_list=options_list)
            if log:
                self.log.append({new_doc.name: log})

    def reset(self):
        for new_doc in self.new_docs:
            new_doc.delete()
        for gen_doc in self.gen_docs:
            gen_doc.delete()
        self.new_docs = []
        self.gen_docs = []


    def get_log(self):
        msg_log = create_msg_from(self.log, self.new_docs)
        return msg_log

    """
    Source Control
    """

    def get_or_create_collection(self, id_: str) -> str:
        """
        generates a new id if needed
        TODO: rename into get_or_create_generation_collection
        TODO: have a single DB with separate collections, one for requirements, one for generation
        """
        if id_ != '-1':
            return id_
        else:
            now = datetime.datetime.now().strftime("%m%d%H%M")
            letters = string.ascii_lowercase + string.digits
            id_ = now + '-' + ''.join(random.choice(letters) for _ in range(10))
            semantic_db.get_or_create_collection(id_)
        return id_

    async def wiki_fetch(self) -> [str]:
        """
        returns the title of the wikipages corresponding to the tasks described in the input text
        """
        all_tasks = []
        for new_doc in self.new_docs:
            all_tasks += new_doc.tasks
        async_tasks = [asyncio.create_task(get_wikilist(task)) for task in all_tasks]
        wiki_lists = await asyncio.gather(*async_tasks)
        flatten_wiki_list = list(set().union(*[set(w) for w in wiki_lists]))
        return flatten_wiki_list

    async def wiki_upload_and_store(self, wiki_title: str, collection_name: str):
        """
        uploads one wikipage and stores them into the right collection
        """
        wikipage = Wiki().fetch(wiki_title)
        wiki_title = wiki_title
        if type(wikipage) != str:
            texts = WikiPage(wikipage.page_content).get_paragraphs()
            add_texts_to_collection(coll_name=collection_name, texts=texts, file=wiki_title, source='wiki')
        else:
            print(wikipage)

    """
    Generate Control
    """


    async def generate_doc_from_db(self, collection_name: str, from_files: [str]) -> [str]:

        def query_from_task(task):
            return get_public_paragraph(task)

        async def retrieve_text_and_generate(t, collection_name: str, from_files: [str]):
            """
            retreives the texts from the database and generates the documents
            """
            # retreive the texts from the database
            task_query = query_from_task(t)
            texts = query_collection(coll_name=collection_name, query=task_query, from_files=from_files)
            task_resolutions = get_private_paragraph(task=t, texts=texts) 
            return task_resolutions
        
        async def real_doc_generation(new_doc):
            async_task_resolutions = [asyncio.create_task(retrieve_text_and_generate(t=task, collection_name=collection_name, from_files=from_files))
                    for task in new_doc.tasks]
            tasks_resolutions = await asyncio.gather(*async_task_resolutions) #A VOIR
            gen_path = f"{self.generated_docs_path}/{new_doc.name}e.docx"
            gen_doc = new_doc.copy(gen_path)
            gen_doc.replace_tasks(tasks_resolutions)
            gen_doc.save_as_docx()
            gen_paths.append(gen_doc.path)
            self.gen_docs.append(gen_doc)
            return gen_paths
        
        gen_paths = []
        gen_paths = await asyncio.gather(*[asyncio.create_task(real_doc_generation(new_doc)) for new_doc in self.new_docs])
        gen_paths = [path for sublist in gen_paths for path in sublist]
        gen_paths = list(set(gen_paths))
        return gen_paths
    


    """
    Requirements
    """
    def clear_input_csv(self):
        self.input_csv = ""
        [os.remove(f"{self.excel_doc_path}/{doc}") for doc in os.listdir(self.excel_doc_path)]

    def set_input_csv(self, csv_path: str):
        """
        TODO: rename to set_requirements_file
        """
        self.input_csv = csv_path

    def create_collection(self, docs: [Doc]):
        """
        TODO: rename to create_requirements_collection
        TODO: merge with semantic tool to have only one DB Object
        """
        coll_name = "collection_for_docs"
        collection = self.client_db.get_or_create_collection(coll_name)
        if collection.count() == 0:
            for doc in docs:
                self.fill_collection(doc, collection)
        self.retriever.collection = collection

    def fill_collection(self, doc: Doc, collection: str):
        """
        fills the collection with the blocks of the documents
        """
        Retriever(doc=doc, collection=collection)


    @staticmethod
    def  _select_best_sources(sources: [Block], delta_1_2=0.15, delta_1_n=0.3, absolute=1.2, alpha=0.9, max_blocks=3) -> [Block]:
        """
        Select the best sources: not far from the very best, not far from the last selected, and not too bad per se
        """
        best_sources = []
        for idx, s in enumerate(sources):
            if idx == 0 \
                    or (s.distance - sources[idx - 1].distance < delta_1_2
                        and s.distance - sources[0].distance < delta_1_n) \
                    or s.distance < absolute:
                best_sources.append(s)
                delta_1_2 *= alpha
                delta_1_n *= alpha
                absolute *= alpha
            else:
                break
        best_sources = sorted(best_sources, key=lambda x: x.distance)[:max_blocks]
        return best_sources

    def generate_response_to_requirements(self):
        dict_of_excel_content = self.get_requirements_from_csv()
        for exigence in dict_of_excel_content:
            blocks_sources = self.retriever.similarity_search(queries = exigence["Exigence"])
            best_sources = self._select_best_sources(blocks_sources)
            sources_contents = [f"Paragraph title : {s.title}\n-----\n{s.content}" if s.title else f"Paragraph {s.index}\n-----\n{s.content}" for s in best_sources]
            context = '\n'.join(sources_contents)
            i = 1
            while (len(context) > 15000) and i < len(sources_contents):
                context = "\n".join(sources_contents[:-i])
                i += 1
            reponse_exigence = generate_response_to_exigence(exigence = exigence["Exigence"], titre_exigence = exigence["Titre"], content = context)
            dict_of_excel_content[dict_of_excel_content.index(exigence)]["Conformité"] = reponse_exigence
            dict_of_excel_content[dict_of_excel_content.index(exigence)]["Document"] = best_sources[0].doc
            dict_of_excel_content[dict_of_excel_content.index(exigence)]["Paragraphes"] = "; ".join([block.index for block in best_sources])
        excel_name = self.input_csv
        if '/' in excel_name:
            excel_name = excel_name.split('/')[-1]
        elif '\\' in excel_name:
            excel_name = excel_name.split('\\')[-1]
        
        df = pd.DataFrame(data=dict_of_excel_content)
        df.to_excel(f"{self.excel_doc_path}/{excel_name}", index=False)
        return f"{self.excel_doc_path}/{excel_name}"


    def get_requirements_from_csv(self):
        excel_content = excel_to_dict(self.input_csv)
        return excel_content