File size: 7,090 Bytes
c626d10
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import asyncio
import os
from typing import Dict
import random
import datetime
import string

from src.domain.doc import Doc
from src.domain.wikidoc import WikiPage
from src.view.log_msg import create_msg_from
import src.tools.semantic_db as semantic_db
from src.tools.wiki import Wiki
from src.tools.llm_tools import get_wikilist, get_public_paragraph, get_private_paragraph
from src.tools.semantic_db import add_texts_to_collection, query_collection


class Controller:

    def __init__(self, config: Dict):
        self.templates_path = config['templates_path']
        self.generated_docs_path = config['generated_docs_path']
        self.styled_docs_path = config['styled_docs_path']
        self.new_docs = []
        self.gen_docs = []

        template_path = config['templates_path'] + '/' + config['templates'][config['default_template_index']]
        self.default_template = Doc(template_path)
        self.template = self.default_template
        self.log = []
        self.differences = []

    def copy_docs(self, temp_docs: []):
        get_name = lambda doc: doc.name.split('/')[-1].split('.')[0]
        doc_names = [get_name(doc) for doc in temp_docs]
        docs = [Doc(path=doc.name) for doc in temp_docs]
        style_paths = [f"{self.generated_docs_path}/{dn}_.docx" for dn in doc_names]
        gen_paths = [f"{self.generated_docs_path}/{dn}_e.docx" for dn in doc_names]
        for doc, style_path, gen_path in zip(docs, style_paths, gen_paths):
            new_doc = doc.copy(style_path)
            self.new_docs.append(new_doc)
            new_doc.check_document()

    def clear_docs(self):
        for new_doc in self.new_docs:
            if os.path.exists(new_doc.path):
                new_doc.clear()
        for gen_doc in self.gen_docs:
            if os.path.exists(gen_doc.path):
                gen_doc.clear()
        self.new_docs = []
        self.gen_docs = []
        self.log = []
        path_to_clear = os.path.abspath(self.generated_docs_path)
        [os.remove(f"{path_to_clear}/{doc}") for doc in os.listdir(path_to_clear)]

    def set_template(self, template_name: str = ""):
        if not template_name:
            self.template = self.default_template
        else:
            template_path = f"{self.templates_path}/{template_name}"
            self.template = Doc(template_path)

    def get_difference_with_template(self):
        self.differences = []
        for new_doc in self.new_docs:
            diff_styles = new_doc.get_different_styles_with_template(template=self.template)
            diff_dicts = [{'doc': new_doc, 'style': s} for s in diff_styles]
            self.differences += diff_dicts
        template_styles = [name for name in self.template.styles.names]
        return self.differences, template_styles

    def map_style(self, this_style_index: int, template_style_name: str):
        """
        maps a style from 'this' document into a style from the template
        """
        diff_dict = self.differences[this_style_index]
        doc = diff_dict['doc']
        this_style_name = diff_dict['style']
        log = doc.copy_one_style(this_style_name, template_style_name, self.template)
        self.log.append({doc.name: log})

    def apply_template(self, options_list):
        for new_doc in self.new_docs:
            log = new_doc.apply_template(template=self.template, options_list=options_list)
            if log:
                self.log.append({new_doc.name: log})

    def reset(self):
        for new_doc in self.new_docs:
            new_doc.delete()
        for gen_doc in self.gen_docs:
            gen_doc.delete()
        self.new_docs = []
        self.gen_docs = []


    def get_log(self):
        msg_log = create_msg_from(self.log, self.new_docs)
        return msg_log

    """
    Source Control
    """

    def get_or_create_collection(self, id_: str) -> str:
        """
        generates a new id if needed
        """
        if id_ != '-1':
            return id_
        else:
            now = datetime.datetime.now().strftime("%m%d%H%M")
            letters = string.ascii_lowercase + string.digits
            id_ = now + '-' + ''.join(random.choice(letters) for _ in range(10))
            semantic_db.get_or_create_collection(id_)
        return id_

    async def wiki_fetch(self) -> [str]:
        """
        returns the title of the wikipages corresponding to the tasks described in the input text
        """
        all_tasks = []
        for new_doc in self.new_docs:
            all_tasks += new_doc.tasks
        async_tasks = [asyncio.create_task(get_wikilist(task)) for task in all_tasks]
        wiki_lists = await asyncio.gather(*async_tasks)
        flatten_wiki_list = list(set().union(*[set(w) for w in wiki_lists]))
        return flatten_wiki_list

    async def wiki_upload_and_store(self, wiki_title: str, collection_name: str):
        """
        uploads one wikipage and stores them into the right collection
        """
        wikipage = Wiki().fetch(wiki_title)
        wiki_title = wiki_title
        if type(wikipage) != str:
            texts = WikiPage(wikipage.page_content).get_paragraphs()
            add_texts_to_collection(coll_name=collection_name, texts=texts, file=wiki_title, source='wiki')
        else:
            print(wikipage)

    """
    Generate Control
    """


    async def generate_doc_from_db(self, collection_name: str, from_files: [str]) -> [str]:

        def query_from_task(task):
            return get_public_paragraph(task)

        async def retrieve_text_and_generate(t, collection_name: str, from_files: [str]):
            """
            retreives the texts from the database and generates the documents
            """
            # retreive the texts from the database
            task_query = query_from_task(t)
            texts = query_collection(coll_name=collection_name, query=task_query, from_files=from_files)
            task_resolutions = get_private_paragraph(task=t, texts=texts) 
            return task_resolutions
        
        async def real_doc_generation(new_doc):
            async_task_resolutions = [asyncio.create_task(retrieve_text_and_generate(t=task, collection_name=collection_name, from_files=from_files))
                    for task in new_doc.tasks]
            tasks_resolutions = await asyncio.gather(*async_task_resolutions) #A VOIR
            gen_path = f"{self.generated_docs_path}/{new_doc.name}e.docx"
            gen_doc = new_doc.copy(gen_path)
            gen_doc.replace_tasks(tasks_resolutions)
            gen_doc.save_as_docx()
            gen_paths.append(gen_doc.path)
            self.gen_docs.append(gen_doc)
            return gen_paths
        
        gen_paths = []
        gen_paths = await asyncio.gather(*[asyncio.create_task(real_doc_generation(new_doc)) for new_doc in self.new_docs])
        gen_paths = [path for sublist in gen_paths for path in sublist]
        return gen_paths
    
            
    def update_style(self,index,style_to_modify):
        return self.map_style(index,style_to_modify) if style_to_modify else None