Spaces:
Running
Running
| import streamlit as st | |
| from transformers import T5ForConditionalGeneration, T5Tokenizer | |
| import torch | |
| from torch.utils.data import Dataset, DataLoader | |
| import json | |
| import os | |
| from datetime import datetime | |
| import tempfile | |
| # Custom dataset for fine-tuning | |
| class TextHumanizerDataset(Dataset): | |
| def __init__(self, data, tokenizer, max_length=512): | |
| self.data = data | |
| self.tokenizer = tokenizer | |
| self.max_length = max_length | |
| def __len__(self): | |
| return len(self.data) | |
| def __getitem__(self, idx): | |
| item = self.data[idx] | |
| input_encoding = self.tokenizer( | |
| f"reescreva em português natural, mantendo todas as informações: {item['input_text']}", | |
| max_length=self.max_length, | |
| padding='max_length', | |
| truncation=True, | |
| return_tensors='pt' | |
| ) | |
| target_encoding = self.tokenizer( | |
| item['output_text'], | |
| max_length=self.max_length, | |
| padding='max_length', | |
| truncation=True, | |
| return_tensors='pt' | |
| ) | |
| return { | |
| 'input_ids': input_encoding['input_ids'].squeeze(), | |
| 'attention_mask': input_encoding['attention_mask'].squeeze(), | |
| 'labels': target_encoding['input_ids'].squeeze() | |
| } | |
| def get_storage_path(): | |
| """Retorna o caminho correto para armazenamento no Hugging Face Spaces""" | |
| if os.environ.get('SPACE_ID'): # Verifica se está rodando no Spaces | |
| return '/data' # Diretório persistente no Spaces | |
| else: | |
| # Fallback para desenvolvimento local | |
| temp_dir = tempfile.gettempdir() | |
| feedback_dir = os.path.join(temp_dir, 'feedback_data') | |
| os.makedirs(feedback_dir, exist_ok=True) | |
| return feedback_dir | |
| def save_feedback(input_text, output_text, rating): | |
| """Salva o feedback do usuário para futuro treinamento""" | |
| feedback_data = { | |
| 'input_text': input_text, | |
| 'output_text': output_text, | |
| 'rating': rating, | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| storage_path = get_storage_path() | |
| feedback_file = os.path.join(storage_path, 'feedback.json') | |
| try: | |
| # Cria arquivo se não existir | |
| if not os.path.exists(feedback_file): | |
| with open(feedback_file, 'w') as f: | |
| f.write('') | |
| # Append do novo feedback | |
| with open(feedback_file, 'a') as f: | |
| f.write(json.dumps(feedback_data) + '\n') | |
| return True | |
| except Exception as e: | |
| st.error(f"Erro ao salvar feedback: {str(e)}") | |
| return False | |
| def fine_tune_model(): | |
| """Realiza fine-tuning do modelo com dados de feedback positivo""" | |
| storage_path = get_storage_path() | |
| feedback_file = os.path.join(storage_path, 'feedback.json') | |
| if not os.path.exists(feedback_file): | |
| return | |
| try: | |
| # Carrega dados de feedback | |
| positive_examples = [] | |
| with open(feedback_file, 'r') as f: | |
| for line in f: | |
| if line.strip(): # Ignora linhas vazias | |
| feedback = json.loads(line) | |
| if feedback['rating'] >= 4: # Usa apenas feedback positivo | |
| positive_examples.append({ | |
| 'input_text': feedback['input_text'], | |
| 'output_text': feedback['output_text'] | |
| }) | |
| if not positive_examples: | |
| return | |
| # Cria dataset e dataloader | |
| dataset = TextHumanizerDataset(positive_examples, st.session_state.tokenizer) | |
| dataloader = DataLoader(dataset, batch_size=4, shuffle=True) | |
| # Configura otimizador | |
| optimizer = torch.optim.AdamW(st.session_state.model.parameters(), lr=1e-5) | |
| # Fine-tuning | |
| st.session_state.model.train() | |
| for batch in dataloader: | |
| optimizer.zero_grad() | |
| outputs = st.session_state.model( | |
| input_ids=batch['input_ids'], | |
| attention_mask=batch['attention_mask'], | |
| labels=batch['labels'] | |
| ) | |
| loss = outputs.loss | |
| loss.backward() | |
| optimizer.step() | |
| st.session_state.model.eval() | |
| return True | |
| except Exception as e: | |
| st.error(f"Erro durante o fine-tuning: {str(e)}") | |
| return False | |
| def clean_generated_text(text): | |
| """Remove comandos e limpa o texto gerado""" | |
| text = text.strip() | |
| # Lista de prefixos de comando para remover | |
| prefixes = [ | |
| "reescreva o seguinte texto", | |
| "reescreva este texto", | |
| "reescreva o texto", | |
| "traduza", | |
| "humanize:", | |
| "humanizar:", | |
| "em português", | |
| "de forma mais natural" | |
| ] | |
| # Remove os prefixos de comando | |
| text_lower = text.lower() | |
| for prefix in prefixes: | |
| if text_lower.startswith(prefix): | |
| text = text[len(prefix):].strip() | |
| text_lower = text.lower() | |
| # Capitaliza a primeira letra | |
| if text: | |
| text = text[0].upper() + text[1:] | |
| return text | |
| def humanize_text(text): | |
| """Humaniza o texto mantendo coerência e tamanho""" | |
| prompt = f"reescreva em português natural, mantendo todas as informações: {text}" | |
| # Tokenização com padding | |
| inputs = st.session_state.tokenizer( | |
| prompt, | |
| return_tensors="pt", | |
| max_length=512, # Reduzido para evitar problemas de memória | |
| padding=True, | |
| truncation=True | |
| ) | |
| # Parâmetros mais conservadores para geração | |
| try: | |
| outputs = st.session_state.model.generate( | |
| inputs.input_ids, | |
| max_length=512, # Reduzido para maior estabilidade | |
| min_length=int(len(text.split()) * 0.8), # Garante pelo menos 80% do tamanho original | |
| do_sample=False, # Desativa amostragem para maior estabilidade | |
| num_beams=2, # Reduzido para evitar problemas de memória | |
| repetition_penalty=1.1, # Reduzido para evitar instabilidades | |
| length_penalty=1.0, # Valor neutro | |
| early_stopping=True, # Ativa early stopping | |
| no_repeat_ngram_size=2 # Evita repetições de bigramas | |
| ) | |
| result = st.session_state.tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| result = clean_generated_text(result) | |
| # Garante tamanho mínimo de forma mais suave | |
| if len(result.split()) < len(text.split()): | |
| missing_words = len(text.split()) - len(result.split()) | |
| original_words = text.split()[-missing_words:] | |
| result = result + " " + " ".join(original_words) | |
| return result | |
| except Exception as e: | |
| st.error(f"Erro durante a geração: {str(e)}") | |
| # Fallback: retorna o texto original em caso de erro | |
| return text | |
| # Initialize session state | |
| if 'model_loaded' not in st.session_state: | |
| st.session_state.tokenizer = T5Tokenizer.from_pretrained("t5-base") | |
| st.session_state.model = T5ForConditionalGeneration.from_pretrained("t5-base") | |
| st.session_state.model_loaded = True | |
| # UI Components | |
| st.set_page_config(page_title="Advanced Text Humanizer", page_icon="🤖") | |
| st.title("🤖 → 🧑 Humanizador de Texto Avançado") | |
| st.markdown(""" | |
| Este aplicativo transforma textos robotizados em linguagem mais natural e humana, | |
| mantendo todas as informações originais e incluindo sistema de feedback para melhoria contínua. | |
| """) | |
| # Input area | |
| input_text = st.text_area( | |
| "Cole seu texto de robô aqui:", | |
| height=150, | |
| help="Cole seu texto aqui para transformá-lo em uma versão mais natural e humana." | |
| ) | |
| # Process button and results | |
| if st.button("Humanizar", type="primary"): | |
| if not input_text: | |
| st.warning("⚠️ Por favor, cole um texto primeiro!") | |
| else: | |
| with st.spinner("Processando o texto..."): | |
| try: | |
| final_text = humanize_text(input_text) | |
| # Display results | |
| st.success("✨ Texto humanizado:") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.text("Original:") | |
| st.info(input_text) | |
| st.write(f"Palavras: {len(input_text.split())}") | |
| with col2: | |
| st.text("Resultado:") | |
| st.info(final_text) | |
| st.write(f"Palavras: {len(final_text.split())}") | |
| # Feedback section | |
| st.markdown("### Feedback") | |
| rating = st.slider( | |
| "Como você avalia a qualidade do texto humanizado?", | |
| min_value=1, | |
| max_value=5, | |
| value=3, | |
| help="1 = Muito ruim, 5 = Excelente" | |
| ) | |
| if st.button("Enviar Feedback"): | |
| if save_feedback(input_text, final_text, rating): | |
| st.success("Feedback salvo com sucesso! Obrigado pela contribuição.") | |
| # Trigger fine-tuning if we have enough positive feedback | |
| if rating >= 4: | |
| with st.spinner("Atualizando modelo com seu feedback..."): | |
| if fine_tune_model(): | |
| st.success("Modelo atualizado com sucesso!") | |
| else: | |
| st.warning("Não foi possível atualizar o modelo neste momento.") | |
| else: | |
| st.error("Não foi possível salvar o feedback. Tente novamente mais tarde.") | |
| except Exception as e: | |
| st.error(f"❌ Erro no processamento: {str(e)}") | |
| # Footer | |
| st.markdown("---") | |
| st.markdown( | |
| """ | |
| <div style='text-align: center'> | |
| <small>Desenvolvido com ❤️ usando Streamlit e Transformers</small> | |
| </div> | |
| """, | |
| unsafe_allow_html=True | |
| ) |