File size: 14,043 Bytes
498db6b 4955081 f343031 498db6b 3ca15d8 498db6b 3ca15d8 498db6b 3ca15d8 498db6b 4955081 498db6b 4955081 498db6b 4955081 498db6b 4955081 498db6b 8e58322 498db6b 4955081 3ca15d8 8e58322 3ca15d8 8e58322 3ca15d8 4955081 3ca15d8 8e58322 3ca15d8 4955081 3ca15d8 4955081 3ca15d8 4955081 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 |
import asyncio
import os
import shutil
import json
from typing import Dict
import random
import datetime
import string
import docx
import pandas as pd
from src.domain.block import Block
from src.tools.doc_tools import get_title
from src.domain.doc import Doc
from src.domain.wikidoc import WikiPage
from src.view.log_msg import create_msg_from
import src.tools.semantic_db as semantic_db
from src.tools.wiki import Wiki
from src.llm.llm_tools import generate_response_to_exigence
from src.llm.llm_tools import get_wikilist, get_public_paragraph, get_private_paragraph
from src.tools.semantic_db import add_texts_to_collection, query_collection
from src.tools.excel_tools import excel_to_dict
import gradio as gr
from src.retriever.retriever import Retriever
class Controller:
def __init__(self, config: Dict, client_db, retriever):
self.templates_path = config['templates_path']
self.generated_docs_path = config['generated_docs_path']
self.styled_docs_path = config['styled_docs_path']
self.excel_doc_path = config['excel_doc_path']
self.new_docs = []
self.gen_docs = []
self.input_csv = ""
template_path = config['templates_path'] + '/' + config['templates'][config['default_template_index']]
self.default_template = Doc(template_path)
self.template = self.default_template
self.log = []
self.differences = []
self.list_differences = []
self.client_db = client_db
self.retriever = retriever
def copy_docs(self, temp_docs: []):
"""
Initial copy of the incoming document
+
create collection for requirments retrieval
+
Initiate paths
TODO: Rename or refactor the function -> 1 mission / function
TODO: To be tested on several documents
TODO: Rename create_collection in create_requirement_collection
"""
doc_names = [doc.name for doc in temp_docs]
for i in range(len(doc_names)):
if '/' in doc_names[i]:
doc_names[i] = doc_names[i].split('/')[-1]
elif '\\' in doc_names[i]:
doc_names[i] = doc_names[i].split('\\')[-1]
doc_names[i] = doc_names[i].split('.')[0]
docs = [Doc(path=doc.name) for doc in temp_docs]
self.create_collection(docs)
style_paths = [f"{self.generated_docs_path}/{dn}_.docx" for dn in doc_names]
gen_paths = [f"{self.generated_docs_path}/{dn}_e.docx" for dn in doc_names]
for doc, style_path, gen_path in zip(docs, style_paths, gen_paths):
new_doc = doc.copy(style_path)
self.new_docs.append(new_doc)
def clear_docs(self):
for new_doc in self.new_docs:
if os.path.exists(new_doc.path):
new_doc.clear()
for gen_doc in self.gen_docs:
if os.path.exists(gen_doc.path):
gen_doc.clear()
self.new_docs = []
self.gen_docs = []
self.log = []
path_to_clear = os.path.abspath(self.generated_docs_path)
second_path_to_clear = os.path.abspath(self.excel_doc_path)
[os.remove(f"{path_to_clear}/{doc}") for doc in os.listdir(path_to_clear)]
[os.remove(f"{second_path_to_clear}/{doc}") for doc in os.listdir(second_path_to_clear)]
def set_template(self, template_name: str = ""):
if not template_name:
self.template = self.default_template
else:
template_path = f"{self.templates_path}/{template_name}"
self.template = Doc(template_path)
def add_template(self, template_path: str):
"""
TODO: message to be but in config
"""
if not template_path:
return
elif not template_path.name.endswith(".docx"):
gr.Warning("Seuls les fichiers .docx sont acceptés")
return
doc = docx.Document(template_path.name)
doc.save(self.templates_path + '/' + get_title(template_path.name))
def delete_curr_template(self, template_name: str):
if not template_name:
return
os.remove(f"{self.templates_path}/{template_name}")
def retrieve_number_of_misapplied_styles(self):
"""
not used: buggy !!
"""
res = {}
for new_doc in self.new_docs:
res[new_doc] = new_doc.retrieve_number_of_misapplied_styles()
return res
def get_difference_with_template(self):
self.differences = []
for new_doc in self.new_docs:
diff_styles = new_doc.get_different_styles_with_template(template=self.template)
diff_dicts = [{'doc': new_doc, 'style': s} for s in diff_styles]
self.differences += diff_dicts
template_styles = self.template.xdoc.styles
template_styles = [style for style in template_styles if style.name in self.template.styles.names]
return self.differences, template_styles
def get_list_styles(self):
self.list_differences = []
for new_doc in self.new_docs:
list_styles = new_doc.get_list_styles()
all_lists_styles = [{'doc': new_doc, 'list_style': s} for s in list_styles]
self.list_differences += all_lists_styles
return self.list_differences
def map_style(self, this_style_index: int, template_style_name: str):
"""
maps a style from 'this' document into a style from the template
"""
#dont make any change if the style is already the same
diff_dict = self.differences[this_style_index]
doc = diff_dict['doc']
this_style_name = diff_dict['style']
log = doc.copy_one_style(this_style_name, template_style_name, self.template)
if log:
self.log.append({doc.name: log})
def update_list_style(self, this_style_index: int, template_style_name: str):
"""
maps a style from 'this' document into a style from the template
"""
#dont make any change if the style is already the same
diff_dict = self.list_differences[this_style_index]
doc = diff_dict['doc']
this_style_name = diff_dict['list_style']
log = doc.change_bullet_style(this_style_name, template_style_name, self.template)
if log:
self.log.append({doc.name: log})
def update_style(self,index,style_to_modify):
return self.map_style(index, style_to_modify) if style_to_modify else None
def apply_template(self, options_list):
for new_doc in self.new_docs:
log = new_doc.apply_template(template=self.template, options_list=options_list)
if log:
self.log.append({new_doc.name: log})
def reset(self):
for new_doc in self.new_docs:
new_doc.delete()
for gen_doc in self.gen_docs:
gen_doc.delete()
self.new_docs = []
self.gen_docs = []
def get_log(self):
msg_log = create_msg_from(self.log, self.new_docs)
return msg_log
"""
Source Control
"""
def get_or_create_collection(self, id_: str) -> str:
"""
generates a new id if needed
TODO: rename into get_or_create_generation_collection
TODO: have a single DB with separate collections, one for requirements, one for generation
"""
if id_ != '-1':
return id_
else:
now = datetime.datetime.now().strftime("%m%d%H%M")
letters = string.ascii_lowercase + string.digits
id_ = now + '-' + ''.join(random.choice(letters) for _ in range(10))
semantic_db.get_or_create_collection(id_)
return id_
async def wiki_fetch(self) -> [str]:
"""
returns the title of the wikipages corresponding to the tasks described in the input text
"""
all_tasks = []
for new_doc in self.new_docs:
all_tasks += new_doc.tasks
async_tasks = [asyncio.create_task(get_wikilist(task)) for task in all_tasks]
wiki_lists = await asyncio.gather(*async_tasks)
flatten_wiki_list = list(set().union(*[set(w) for w in wiki_lists]))
return flatten_wiki_list
async def wiki_upload_and_store(self, wiki_title: str, collection_name: str):
"""
uploads one wikipage and stores them into the right collection
"""
wikipage = Wiki().fetch(wiki_title)
wiki_title = wiki_title
if type(wikipage) != str:
texts = WikiPage(wikipage.page_content).get_paragraphs()
add_texts_to_collection(coll_name=collection_name, texts=texts, file=wiki_title, source='wiki')
else:
print(wikipage)
"""
Generate Control
"""
async def generate_doc_from_db(self, collection_name: str, from_files: [str]) -> [str]:
def query_from_task(task):
return get_public_paragraph(task)
async def retrieve_text_and_generate(t, collection_name: str, from_files: [str]):
"""
retreives the texts from the database and generates the documents
"""
# retreive the texts from the database
task_query = query_from_task(t)
texts = query_collection(coll_name=collection_name, query=task_query, from_files=from_files)
task_resolutions = get_private_paragraph(task=t, texts=texts)
return task_resolutions
async def real_doc_generation(new_doc):
async_task_resolutions = [asyncio.create_task(retrieve_text_and_generate(t=task, collection_name=collection_name, from_files=from_files))
for task in new_doc.tasks]
tasks_resolutions = await asyncio.gather(*async_task_resolutions) #A VOIR
gen_path = f"{self.generated_docs_path}/{new_doc.name}e.docx"
gen_doc = new_doc.copy(gen_path)
gen_doc.replace_tasks(tasks_resolutions)
gen_doc.save_as_docx()
gen_paths.append(gen_doc.path)
self.gen_docs.append(gen_doc)
return gen_paths
gen_paths = []
gen_paths = await asyncio.gather(*[asyncio.create_task(real_doc_generation(new_doc)) for new_doc in self.new_docs])
gen_paths = [path for sublist in gen_paths for path in sublist]
gen_paths = list(set(gen_paths))
return gen_paths
"""
Requirements
"""
def clear_input_csv(self):
self.input_csv = ""
[os.remove(f"{self.excel_doc_path}/{doc}") for doc in os.listdir(self.excel_doc_path)]
def set_input_csv(self, csv_path: str):
"""
TODO: rename to set_requirements_file
"""
self.input_csv = csv_path
def create_collection(self, docs: [Doc]):
"""
TODO: rename to create_requirements_collection
TODO: merge with semantic tool to have only one DB Object
"""
coll_name = "collection_for_docs"
collection = self.client_db.get_or_create_collection(coll_name)
if collection.count() == 0:
for doc in docs:
self.fill_collection(doc, collection)
self.retriever.collection = collection
def fill_collection(self, doc: Doc, collection: str):
"""
fills the collection with the blocks of the documents
"""
Retriever(doc=doc, collection=collection)
@staticmethod
def _select_best_sources(sources: [Block], delta_1_2=0.15, delta_1_n=0.3, absolute=1.2, alpha=0.9, max_blocks=3) -> [Block]:
"""
Select the best sources: not far from the very best, not far from the last selected, and not too bad per se
"""
best_sources = []
for idx, s in enumerate(sources):
if idx == 0 \
or (s.distance - sources[idx - 1].distance < delta_1_2
and s.distance - sources[0].distance < delta_1_n) \
or s.distance < absolute:
best_sources.append(s)
delta_1_2 *= alpha
delta_1_n *= alpha
absolute *= alpha
else:
break
best_sources = sorted(best_sources, key=lambda x: x.distance)[:max_blocks]
return best_sources
def generate_response_to_requirements(self):
dict_of_excel_content = self.get_requirements_from_csv()
for exigence in dict_of_excel_content:
blocks_sources = self.retriever.similarity_search(queries = exigence["Exigence"])
best_sources = self._select_best_sources(blocks_sources)
sources_contents = [f"Paragraph title : {s.title}\n-----\n{s.content}" if s.title else f"Paragraph {s.index}\n-----\n{s.content}" for s in best_sources]
context = '\n'.join(sources_contents)
i = 1
while (len(context) > 15000) and i < len(sources_contents):
context = "\n".join(sources_contents[:-i])
i += 1
reponse_exigence = generate_response_to_exigence(exigence = exigence["Exigence"], titre_exigence = exigence["Titre"], content = context)
dict_of_excel_content[dict_of_excel_content.index(exigence)]["Conformité"] = reponse_exigence
dict_of_excel_content[dict_of_excel_content.index(exigence)]["Document"] = best_sources[0].doc
dict_of_excel_content[dict_of_excel_content.index(exigence)]["Paragraphes"] = "; ".join([block.index for block in best_sources])
excel_name = self.input_csv
if '/' in excel_name:
excel_name = excel_name.split('/')[-1]
elif '\\' in excel_name:
excel_name = excel_name.split('\\')[-1]
df = pd.DataFrame(data=dict_of_excel_content)
df.to_excel(f"{self.excel_doc_path}/{excel_name}", index=False)
return f"{self.excel_doc_path}/{excel_name}"
def get_requirements_from_csv(self):
excel_content = excel_to_dict(self.input_csv)
return excel_content |