| | import asyncio |
| | import os |
| | from typing import Dict |
| | import random |
| | import datetime |
| | import string |
| |
|
| | from src.domain.doc import Doc |
| | from src.domain.wikidoc import WikiPage |
| | from src.view.log_msg import create_msg_from |
| | import src.tools.semantic_db as semantic_db |
| | from src.tools.wiki import Wiki |
| | from src.tools.llm_tools import get_wikilist, get_public_paragraph, get_private_paragraph |
| | from src.tools.semantic_db import add_texts_to_collection, query_collection |
| |
|
| |
|
| | class Controller: |
| |
|
| | def __init__(self, config: Dict): |
| | self.templates_path = config['templates_path'] |
| | self.generated_docs_path = config['generated_docs_path'] |
| | self.styled_docs_path = config['styled_docs_path'] |
| | self.new_docs = [] |
| | self.gen_docs = [] |
| |
|
| | template_path = config['templates_path'] + '/' + config['templates'][config['default_template_index']] |
| | self.default_template = Doc(template_path) |
| | self.template = self.default_template |
| | self.log = [] |
| | self.differences = [] |
| |
|
| | def copy_docs(self, temp_docs: []): |
| | get_name = lambda doc: doc.name.split('/')[-1].split('.')[0] |
| | doc_names = [get_name(doc) for doc in temp_docs] |
| | docs = [Doc(path=doc.name) for doc in temp_docs] |
| | style_paths = [f"{self.generated_docs_path}/{dn}_.docx" for dn in doc_names] |
| | gen_paths = [f"{self.generated_docs_path}/{dn}_e.docx" for dn in doc_names] |
| | for doc, style_path, gen_path in zip(docs, style_paths, gen_paths): |
| | new_doc = doc.copy(style_path) |
| | self.new_docs.append(new_doc) |
| | new_doc.check_document() |
| |
|
| | def clear_docs(self): |
| | for new_doc in self.new_docs: |
| | if os.path.exists(new_doc.path): |
| | new_doc.clear() |
| | for gen_doc in self.gen_docs: |
| | if os.path.exists(gen_doc.path): |
| | gen_doc.clear() |
| | self.new_docs = [] |
| | self.gen_docs = [] |
| | self.log = [] |
| | path_to_clear = os.path.abspath(self.generated_docs_path) |
| | [os.remove(f"{path_to_clear}/{doc}") for doc in os.listdir(path_to_clear)] |
| |
|
| | def set_template(self, template_name: str = ""): |
| | if not template_name: |
| | self.template = self.default_template |
| | else: |
| | template_path = f"{self.templates_path}/{template_name}" |
| | self.template = Doc(template_path) |
| |
|
| | def get_difference_with_template(self): |
| | self.differences = [] |
| | for new_doc in self.new_docs: |
| | diff_styles = new_doc.get_different_styles_with_template(template=self.template) |
| | diff_dicts = [{'doc': new_doc, 'style': s} for s in diff_styles] |
| | self.differences += diff_dicts |
| | template_styles = [name for name in self.template.styles.names] |
| | return self.differences, template_styles |
| |
|
| | def map_style(self, this_style_index: int, template_style_name: str): |
| | """ |
| | maps a style from 'this' document into a style from the template |
| | """ |
| | diff_dict = self.differences[this_style_index] |
| | doc = diff_dict['doc'] |
| | this_style_name = diff_dict['style'] |
| | log = doc.copy_one_style(this_style_name, template_style_name, self.template) |
| | self.log.append({doc.name: log}) |
| |
|
| | def apply_template(self, options_list): |
| | for new_doc in self.new_docs: |
| | log = new_doc.apply_template(template=self.template, options_list=options_list) |
| | if log: |
| | self.log.append({new_doc.name: log}) |
| |
|
| | def reset(self): |
| | for new_doc in self.new_docs: |
| | new_doc.delete() |
| | for gen_doc in self.gen_docs: |
| | gen_doc.delete() |
| | self.new_docs = [] |
| | self.gen_docs = [] |
| |
|
| |
|
| | def get_log(self): |
| | msg_log = create_msg_from(self.log, self.new_docs) |
| | return msg_log |
| |
|
| | """ |
| | Source Control |
| | """ |
| |
|
| | def get_or_create_collection(self, id_: str) -> str: |
| | """ |
| | generates a new id if needed |
| | """ |
| | if id_ != '-1': |
| | return id_ |
| | else: |
| | now = datetime.datetime.now().strftime("%m%d%H%M") |
| | letters = string.ascii_lowercase + string.digits |
| | id_ = now + '-' + ''.join(random.choice(letters) for _ in range(10)) |
| | semantic_db.get_or_create_collection(id_) |
| | return id_ |
| |
|
| | async def wiki_fetch(self) -> [str]: |
| | """ |
| | returns the title of the wikipages corresponding to the tasks described in the input text |
| | """ |
| | all_tasks = [] |
| | for new_doc in self.new_docs: |
| | all_tasks += new_doc.tasks |
| | async_tasks = [asyncio.create_task(get_wikilist(task)) for task in all_tasks] |
| | wiki_lists = await asyncio.gather(*async_tasks) |
| | flatten_wiki_list = list(set().union(*[set(w) for w in wiki_lists])) |
| | return flatten_wiki_list |
| |
|
| | async def wiki_upload_and_store(self, wiki_title: str, collection_name: str): |
| | """ |
| | uploads one wikipage and stores them into the right collection |
| | """ |
| | wikipage = Wiki().fetch(wiki_title) |
| | wiki_title = wiki_title |
| | if type(wikipage) != str: |
| | texts = WikiPage(wikipage.page_content).get_paragraphs() |
| | add_texts_to_collection(coll_name=collection_name, texts=texts, file=wiki_title, source='wiki') |
| | else: |
| | print(wikipage) |
| |
|
| | """ |
| | Generate Control |
| | """ |
| |
|
| |
|
| | async def generate_doc_from_db(self, collection_name: str, from_files: [str]) -> [str]: |
| |
|
| | def query_from_task(task): |
| | return get_public_paragraph(task) |
| |
|
| | async def retrieve_text_and_generate(t, collection_name: str, from_files: [str]): |
| | """ |
| | retreives the texts from the database and generates the documents |
| | """ |
| | |
| | task_query = query_from_task(t) |
| | texts = query_collection(coll_name=collection_name, query=task_query, from_files=from_files) |
| | task_resolutions = get_private_paragraph(task=t, texts=texts) |
| | return task_resolutions |
| | |
| | async def real_doc_generation(new_doc): |
| | async_task_resolutions = [asyncio.create_task(retrieve_text_and_generate(t=task, collection_name=collection_name, from_files=from_files)) |
| | for task in new_doc.tasks] |
| | tasks_resolutions = await asyncio.gather(*async_task_resolutions) |
| | gen_path = f"{self.generated_docs_path}/{new_doc.name}e.docx" |
| | gen_doc = new_doc.copy(gen_path) |
| | gen_doc.replace_tasks(tasks_resolutions) |
| | gen_doc.save_as_docx() |
| | gen_paths.append(gen_doc.path) |
| | self.gen_docs.append(gen_doc) |
| | return gen_paths |
| | |
| | gen_paths = [] |
| | gen_paths = await asyncio.gather(*[asyncio.create_task(real_doc_generation(new_doc)) for new_doc in self.new_docs]) |
| | gen_paths = [path for sublist in gen_paths for path in sublist] |
| | return gen_paths |
| | |
| | |
| | def update_style(self,index,style_to_modify): |
| | return self.map_style(index,style_to_modify) if style_to_modify else None |