teoat's picture
Upload folder using huggingface_hub
4ae946d verified
"""
CRDT (Conflict-free Replicated Data Type) Service
Enables real-time collaborative editing without conflicts
"""
import uuid
from datetime import datetime
from typing import Any
class Operation:
"""Base class for CRDT operations"""
def __init__(self, op_id: str, user_id: str, timestamp: float):
self.op_id = op_id
self.user_id = user_id
self.timestamp = timestamp
def to_dict(self) -> dict[str, Any]:
return {
"op_id": self.op_id,
"user_id": self.user_id,
"timestamp": self.timestamp,
}
class InsertOperation(Operation):
"""Insert operation for text"""
def __init__(
self, op_id: str, user_id: str, timestamp: float, position: int, content: str
):
super().__init__(op_id, user_id, timestamp)
self.position = position
self.content = content
def to_dict(self) -> dict[str, Any]:
return {
**super().to_dict(),
"type": "insert",
"position": self.position,
"content": self.content,
}
class DeleteOperation(Operation):
"""Delete operation for text"""
def __init__(
self, op_id: str, user_id: str, timestamp: float, position: int, length: int
):
super().__init__(op_id, user_id, timestamp)
self.position = position
self.length = length
def to_dict(self) -> dict[str, Any]:
return {
**super().to_dict(),
"type": "delete",
"position": self.position,
"length": self.length,
}
class CRDTDocument:
"""
CRDT-based document for conflict-free collaboration
Implements a simplified version of Logoot/LSEQ CRDT
"""
def __init__(self, document_id: str):
self.document_id = document_id
self.content: list[dict[str, Any]] = []
self.operations: list[Operation] = []
self.version = 0
self.participants: dict[str, dict[str, Any]] = {}
def apply_operation(self, operation: dict[str, Any]) -> bool:
"""
Apply an operation to the document
Returns True if successful, False if conflict
"""
try:
op_type = operation.get("type")
if op_type == "insert":
return self._apply_insert(operation)
elif op_type == "delete":
return self._apply_delete(operation)
else:
return False
except Exception as e:
print(f"Error applying operation: {e}")
return False
def _apply_insert(self, operation: dict[str, Any]) -> bool:
"""Apply insert operation"""
position = operation.get("position", 0)
content = operation.get("content", "")
# Create element with unique ID
element = {
"id": str(uuid.uuid4()),
"content": content,
"user_id": operation.get("user_id"),
"timestamp": operation.get("timestamp"),
"position": position,
}
# Insert at position
if position <= len(self.content):
self.content.insert(position, element)
self.version += 1
return True
return False
def _apply_delete(self, operation: dict[str, Any]) -> bool:
"""Apply delete operation"""
position = operation.get("position", 0)
length = operation.get("length", 1)
# Remove elements
if position < len(self.content):
del self.content[position : position + length]
self.version += 1
return True
return False
def get_text(self) -> str:
"""Get current document text"""
return "".join(elem.get("content", "") for elem in self.content)
def add_participant(self, user_id: str, user_data: dict[str, Any]):
"""Add a participant to the document"""
self.participants[user_id] = {
**user_data,
"joined_at": datetime.utcnow().isoformat(),
"cursor_position": 0,
}
def remove_participant(self, user_id: str):
"""Remove a participant from the document"""
if user_id in self.participants:
del self.participants[user_id]
def update_cursor(self, user_id: str, position: int):
"""Update participant cursor position"""
if user_id in self.participants:
self.participants[user_id]["cursor_position"] = position
def to_dict(self) -> dict[str, Any]:
"""Get document state"""
return {
"document_id": self.document_id,
"content": self.get_text(),
"version": self.version,
"participants": self.participants,
"element_count": len(self.content),
}
class CRDTManager:
"""Manages multiple CRDT documents"""
def __init__(self):
self.documents: dict[str, CRDTDocument] = {}
def create_document(self, document_id: str) -> CRDTDocument:
"""Create a new CRDT document"""
if document_id not in self.documents:
self.documents[document_id] = CRDTDocument(document_id)
return self.documents[document_id]
def get_document(self, document_id: str) -> CRDTDocument | None:
"""Get existing document"""
return self.documents.get(document_id)
def delete_document(self, document_id: str):
"""Delete a document"""
if document_id in self.documents:
del self.documents[document_id]
def apply_operation(self, document_id: str, operation: dict[str, Any]) -> bool:
"""Apply operation to document"""
doc = self.get_document(document_id)
if not doc:
doc = self.create_document(document_id)
return doc.apply_operation(operation)
def get_stats(self) -> dict[str, Any]:
"""Get manager statistics"""
return {
"total_documents": len(self.documents),
"total_participants": sum(
len(doc.participants) for doc in self.documents.values()
),
"documents": [
{
"document_id": doc_id,
"version": doc.version,
"participants": len(doc.participants),
"content_length": len(doc.get_text()),
}
for doc_id, doc in self.documents.items()
],
}
# Global CRDT manager
crdt_manager = CRDTManager()