Spaces:
Sleeping
Sleeping
| import re | |
| from typing import List, Dict | |
| from datetime import datetime | |
| from collections import Counter | |
| import jieba # For Chinese word segmentation | |
| class FiberDBMS: | |
| def __init__(self): | |
| self.database: List[Dict[str, str]] = [] | |
| self.content_index: Dict[str, List[int]] = {} | |
| def add_entry(self, name: str, content: str, tags: str) -> None: | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| entry = { | |
| "name": name, | |
| "timestamp": timestamp, | |
| "content": content, | |
| "tags": tags | |
| } | |
| self.database.append(entry) | |
| self._index_content(len(self.database) - 1, content) | |
| def _index_content(self, entry_index: int, content: str) -> None: | |
| words = self._tokenize(content) | |
| for word in words: | |
| if word not in self.content_index: | |
| self.content_index[word] = [] | |
| self.content_index[word].append(entry_index) | |
| def load_or_create(self, filename: str) -> None: | |
| try: | |
| self.load_from_file(filename) | |
| print(f"Loaded {len(self.database)} entries from {filename}.") | |
| except FileNotFoundError: | |
| print(f"{filename} not found. Creating a new database.") | |
| def query(self, query: str, top_n: int) -> List[Dict[str, str]]: | |
| query_words = self._tokenize(query) | |
| matching_indices = set() | |
| for word in query_words: | |
| if word in self.content_index: | |
| matching_indices.update(self.content_index[word]) | |
| sorted_results = sorted( | |
| matching_indices, | |
| key=lambda idx: self._rate_result(self.database[idx], query_words), | |
| reverse=True | |
| ) | |
| results = [] | |
| for idx in sorted_results[:top_n]: | |
| entry = self.database[idx] | |
| snippet = self._get_snippet(entry['content'], query_words) | |
| updated_tags = self._update_tags(entry['tags'], entry['content'], query_words) | |
| results.append({ | |
| 'name': entry['name'], | |
| 'content': snippet, | |
| 'tags': updated_tags, | |
| 'index': idx | |
| }) | |
| return results | |
| def save(self, filename: str) -> None: | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| for entry in self.database: | |
| line = f"{entry['name']}\t{entry['timestamp']}\t{entry['content']}\t{entry['tags']}\n" | |
| f.write(line) | |
| print(f"Updated database saved to {filename}.") | |
| def _rate_result(self, entry: Dict[str, str], query_words: List[str]) -> float: | |
| content_tokens = self._tokenize(entry['content']) | |
| name_tokens = self._tokenize(entry['name']) | |
| tags = entry['tags'].split(',') | |
| unique_matches = sum(1 for word in set(query_words) if word in content_tokens) | |
| content_score = sum(content_tokens.count(word) for word in query_words) | |
| name_score = sum(3 for word in query_words if word in name_tokens) | |
| phrase_score = 5 if all(word in content_tokens for word in query_words) else 0 | |
| unique_match_score = unique_matches * 10 | |
| tag_score = sum(2 for tag in tags if any(word in self._tokenize(tag) for word in query_words)) | |
| length_penalty = min(1, len(content_tokens) / 100) | |
| return (content_score + name_score + phrase_score + unique_match_score + tag_score) * length_penalty | |
| def _tokenize(self, text: str) -> List[str]: | |
| # Check if the text contains Chinese characters | |
| if re.search(r'[\u4e00-\u9fff]', text): | |
| return list(jieba.cut(text)) | |
| else: | |
| return re.findall(r'\w+', text.lower()) | |
| def _get_snippet(self, content: str, query_words: List[str], max_length: int = 200) -> str: | |
| content_tokens = self._tokenize(content) | |
| best_start = 0 | |
| max_score = 0 | |
| for i in range(len(content_tokens) - max_length): | |
| snippet = content_tokens[i:i+max_length] | |
| score = sum(snippet.count(word) * (len(word) ** 0.5) for word in query_words) | |
| if score > max_score: | |
| max_score = score | |
| best_start = i | |
| snippet = ''.join(content_tokens[best_start:best_start+max_length]) | |
| return snippet + "..." if len(content) > max_length else snippet | |
| def _update_tags(self, original_tags: str, content: str, query_words: List[str]) -> str: | |
| tags = original_tags.split(',') | |
| original_tag = tags[0] # Keep the first tag unchanged | |
| words = self._tokenize(content) | |
| word_counts = Counter(words) | |
| relevant_keywords = [word for word in query_words if word in word_counts and word not in tags] | |
| relevant_keywords += [word for word, count in word_counts.most_common(5) if word not in tags and word not in query_words] | |
| updated_tags = [original_tag] + tags[1:] + relevant_keywords | |
| return ','.join(updated_tags) | |
| def load_from_file(self, filename: str) -> None: | |
| self.database.clear() | |
| self.content_index.clear() | |
| with open(filename, 'r', encoding='utf-8') as f: | |
| for idx, line in enumerate(f): | |
| name, timestamp, content, tags = line.strip().split('\t') | |
| self.database.append({ | |
| "name": name, | |
| "timestamp": timestamp, | |
| "content": content, | |
| "tags": tags | |
| }) | |
| self._index_content(idx, content) | |
| def main(): | |
| dbms = FiberDBMS() | |
| # Load or create the database | |
| dbms.load_or_create("Celsiaaa.txt") | |
| while True: | |
| query = input("\nEnter your search query (or 'quit' to exit): ") | |
| if query.lower() == 'quit': | |
| break | |
| try: | |
| top_n = int(input("Enter the number of top results to display: ")) | |
| except ValueError: | |
| print("Invalid input. Using default value of 5.") | |
| top_n = 5 | |
| results = dbms.query(query, top_n) | |
| if results: | |
| print(f"\nTop {len(results)} results for '{query}':") | |
| for idx, result in enumerate(results, 1): | |
| print(f"\nResult {idx}:") | |
| print(f"Name: {result['name']}") | |
| print(f"Content: {result['content']}") | |
| print(f"Tags: {result['tags']}") | |
| else: | |
| print(f"No results found for '{query}'.") | |
| # Save updated database with new tags | |
| dbms.save("Celsiaaa.txt") | |
| if __name__ == "__main__": | |
| main() | |