qcrypt-rng / app /api /v2 /endpoints /generate.py
rocRevyAreGoals15's picture
fixing ui
86deab0
"""
QCrypt RNG API - Generate Endpoints
API endpoints for quantum random generation
"""
from fastapi import APIRouter, HTTPException, Depends, BackgroundTasks
from typing import List
import asyncio
from datetime import datetime, timedelta
import time
from app.quantum.qrng import get_quantum_rng
from app.api.v2.models.requests import (
GenerateBytesRequest,
GenerateKeyRequest,
GenerateTokenRequest,
GenerateUUIDRequest,
GeneratePasswordRequest,
BatchGenerateRequest
)
from app.api.v2.models.responses import (
GenerateBytesResponse,
GenerateKeyResponse,
GenerateTokenResponse,
GenerateUUIDResponse,
GeneratePasswordResponse,
BatchGenerateResponse,
ResponseStatus
)
from app.utils.logging import logger, log_quantum_generation
from app.config import settings
from app.monitoring import QRNGMetrics
router = APIRouter()
@router.post("/bytes", response_model=GenerateBytesResponse)
async def generate_bytes(request: GenerateBytesRequest) -> GenerateBytesResponse:
"""
Generate quantum random bytes
This endpoint generates cryptographically secure random bytes using quantum superposition.
Each byte is generated through quantum measurement of qubits in superposition state.
- **length**: Number of bytes to generate (1-1024, enterprise up to 10240)
- **format**: Output format (hex, base64, array, raw)
- **quantum_bits**: Number of qubits to use (1-16, higher = more entropy)
"""
try:
qrng = get_quantum_rng()
start_time = time.time()
# Generate quantum random bytes
result = await qrng.generate_bytes(
request.length,
request.quantum_bits,
request.format.value
)
# Record metrics
duration = time.time() - start_time
QRNGMetrics.record_bytes_generated(
result.quantum_backend,
result.format,
result.length
)
QRNGMetrics.record_generation_duration(result.quantum_backend, duration)
# Log generation
log_quantum_generation(
bytes_generated=result.length,
qubits=result.qubits_used,
backend=result.quantum_backend,
time_ms=result.generation_time_ms
)
return GenerateBytesResponse(
status=ResponseStatus.SUCCESS,
request_id=result.request_id,
data={
"bytes": result.data,
"format": result.format,
"length": result.length,
"entropy_bits": result.entropy_bits
},
metadata={
"generation_time_ms": result.generation_time_ms,
"quantum_backend": result.quantum_backend,
"qubits_used": result.qubits_used,
"measurement_count": result.measurement_count
}
)
except ValueError as e:
logger.error(f"Validation error in generate_bytes: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error generating bytes: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/key", response_model=GenerateKeyResponse)
async def generate_key(request: GenerateKeyRequest) -> GenerateKeyResponse:
"""
Generate cryptographic key
Generate quantum-random cryptographic keys for various algorithms:
- **AES**: 128, 192, or 256-bit keys
- **RSA**: 2048, 3072, or 4096-bit keys
- **ECDSA**: 256, 384, or 521-bit keys
"""
try:
qrng = get_quantum_rng()
# Generate cryptographic key
result = await qrng.generate_key(
request.key_size,
request.algorithm.value
)
# Log generation
log_quantum_generation(
bytes_generated=request.key_size // 8,
qubits=16, # Keys use maximum qubits
backend=result.quantum_backend,
time_ms=result.generation_time_ms
)
return GenerateKeyResponse(
status=ResponseStatus.SUCCESS,
request_id=result.request_id,
data=result.data,
metadata={
"generation_time_ms": result.generation_time_ms,
"quantum_backend": result.quantum_backend,
"entropy_source": "quantum_superposition"
}
)
except ValueError as e:
logger.error(f"Validation error in generate_key: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error generating key: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/token", response_model=GenerateTokenResponse)
async def generate_token(request: GenerateTokenRequest) -> GenerateTokenResponse:
"""
Generate session token
Generate secure session tokens with quantum randomness.
Tokens are URL-safe by default and can include expiration times.
"""
try:
qrng = get_quantum_rng()
# Generate token
result = await qrng.generate_token(
request.length,
request.url_safe
)
# Calculate expiration
expires_at = None
if request.expires_in:
expires_at = datetime.utcnow() + timedelta(seconds=request.expires_in)
# Log generation
log_quantum_generation(
bytes_generated=request.length,
qubits=12,
backend=result.quantum_backend,
time_ms=result.generation_time_ms
)
return GenerateTokenResponse(
status=ResponseStatus.SUCCESS,
request_id=result.request_id,
data={
"token": result.data,
"token_type": "Bearer",
"expires_in": request.expires_in,
"expires_at": expires_at.isoformat() if expires_at else None
},
metadata={
"generation_time_ms": result.generation_time_ms,
"quantum_backend": result.quantum_backend,
"url_safe": request.url_safe
}
)
except ValueError as e:
logger.error(f"Validation error in generate_token: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error generating token: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/uuid", response_model=GenerateUUIDResponse)
async def generate_uuid(request: GenerateUUIDRequest) -> GenerateUUIDResponse:
"""
Generate quantum UUID
Generate RFC4122-compliant UUIDs with quantum randomness.
Superior to standard UUID v4 due to true quantum entropy source.
"""
try:
qrng = get_quantum_rng()
# Generate UUID(s)
if request.count == 1:
result = await qrng.generate_uuid(request.version)
uuid_data = result.data
else:
# Generate multiple UUIDs
tasks = [qrng.generate_uuid(request.version) for _ in range(request.count)]
results = await asyncio.gather(*tasks)
uuid_data = [r.data for r in results]
result = results[0] # Use first for metadata
# Format based on request
if request.format == "urn" and isinstance(uuid_data, str):
uuid_data = f"urn:uuid:{uuid_data}"
elif request.format == "urn" and isinstance(uuid_data, list):
uuid_data = [f"urn:uuid:{u}" for u in uuid_data]
elif request.format == "raw":
if isinstance(uuid_data, str):
uuid_data = uuid_data.replace("-", "")
else:
uuid_data = [u.replace("-", "") for u in uuid_data]
# Log generation
log_quantum_generation(
bytes_generated=16 * request.count,
qubits=16,
backend=result.quantum_backend,
time_ms=result.generation_time_ms * request.count
)
return GenerateUUIDResponse(
status=ResponseStatus.SUCCESS,
request_id=result.request_id,
data=uuid_data,
metadata={
"generation_time_ms": result.generation_time_ms * request.count,
"quantum_backend": result.quantum_backend,
"version": request.version,
"format": request.format,
"count": request.count
}
)
except ValueError as e:
logger.error(f"Validation error in generate_uuid: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error generating UUID: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/password", response_model=GeneratePasswordResponse)
async def generate_password(request: GeneratePasswordRequest) -> GeneratePasswordResponse:
"""
Generate secure password
Generate cryptographically strong passwords with quantum randomness.
Supports various character sets and complexity requirements.
"""
try:
qrng = get_quantum_rng()
# Build character set
charset = ""
if request.include_uppercase:
charset += "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
if request.include_lowercase:
charset += "abcdefghijklmnopqrstuvwxyz"
if request.include_numbers:
charset += "0123456789"
if request.include_symbols:
charset += "!@#$%^&*()_+-=[]{}|;:,.<>?"
if not charset:
raise ValueError("At least one character set must be enabled")
# Remove ambiguous characters if requested
if request.exclude_ambiguous:
ambiguous = "0O1lI"
charset = ''.join(c for c in charset if c not in ambiguous)
# Generate random bytes for password
bytes_needed = request.length * 2 # Extra bytes for rejection sampling
result = await qrng.generate_bytes(bytes_needed, 8, "array")
# Build password
password = ""
byte_index = 0
# Add minimum required characters first
if request.min_uppercase:
uppercase = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
for _ in range(request.min_uppercase):
idx = result.data[byte_index] % len(uppercase)
password += uppercase[idx]
byte_index += 1
if request.min_lowercase:
lowercase = "abcdefghijklmnopqrstuvwxyz"
for _ in range(request.min_lowercase):
idx = result.data[byte_index] % len(lowercase)
password += lowercase[idx]
byte_index += 1
if request.min_numbers:
numbers = "0123456789"
for _ in range(request.min_numbers):
idx = result.data[byte_index] % len(numbers)
password += numbers[idx]
byte_index += 1
if request.min_symbols:
symbols = "!@#$%^&*()_+-=[]{}|;:,.<>?"
for _ in range(request.min_symbols):
idx = result.data[byte_index] % len(symbols)
password += symbols[idx]
byte_index += 1
# Fill remaining length with random characters
while len(password) < request.length:
if byte_index >= len(result.data):
# Need more random bytes
extra_result = await qrng.generate_bytes(bytes_needed, 8, "array")
result.data.extend(extra_result.data)
idx = result.data[byte_index] % len(charset)
password += charset[idx]
byte_index += 1
# Shuffle password to mix minimum requirements
password_list = list(password)
for i in range(len(password_list) - 1, 0, -1):
if byte_index >= len(result.data):
extra_result = await qrng.generate_bytes(32, 8, "array")
result.data.extend(extra_result.data)
j = result.data[byte_index] % (i + 1)
byte_index += 1
password_list[i], password_list[j] = password_list[j], password_list[i]
password = ''.join(password_list)
# Calculate entropy
import math
entropy_bits = request.length * math.log2(len(charset)) if charset else 0
# Determine strength
if entropy_bits >= 128:
strength = "very_strong"
elif entropy_bits >= 96:
strength = "strong"
elif entropy_bits >= 64:
strength = "moderate"
else:
strength = "weak"
return GeneratePasswordResponse(
status=ResponseStatus.SUCCESS,
request_id=result.request_id,
data={
"password": password,
"length": request.length,
"strength": strength,
"entropy_bits": round(entropy_bits, 2)
},
metadata={
"generation_time_ms": result.generation_time_ms,
"quantum_backend": result.quantum_backend,
"character_set": f"{'uppercase,' if request.include_uppercase else ''}"
f"{'lowercase,' if request.include_lowercase else ''}"
f"{'numbers,' if request.include_numbers else ''}"
f"{'symbols' if request.include_symbols else ''}".rstrip(',')
}
)
except ValueError as e:
logger.error(f"Validation error in generate_password: {str(e)}")
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error generating password: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/batch", response_model=BatchGenerateResponse)
async def batch_generate(request: BatchGenerateRequest) -> BatchGenerateResponse:
"""
Batch generate random data
Process multiple generation requests in a single API call.
Supports parallel processing for improved performance.
"""
try:
qrng = get_quantum_rng()
import time
start_time = time.time()
results = []
if request.parallel:
# Process in parallel
tasks = []
for idx, req in enumerate(request.requests):
task = qrng.generate_bytes(req.length, req.quantum_bits, req.format.value)
tasks.append((idx, task))
# Gather results
task_results = await asyncio.gather(*[t[1] for t in tasks], return_exceptions=True)
for idx, result in enumerate(task_results):
if isinstance(result, Exception):
results.append({
"index": idx,
"status": "error",
"error": str(result)
})
else:
results.append({
"index": idx,
"status": "success",
"bytes": result.data,
"format": result.format,
"length": result.length
})
else:
# Process sequentially
for idx, req in enumerate(request.requests):
try:
result = await qrng.generate_bytes(req.length, req.quantum_bits, req.format.value)
results.append({
"index": idx,
"status": "success",
"bytes": result.data,
"format": result.format,
"length": result.length
})
except Exception as e:
results.append({
"index": idx,
"status": "error",
"error": str(e)
})
# Calculate statistics
successful = sum(1 for r in results if r["status"] == "success")
failed = len(results) - successful
total_time = (time.time() - start_time) * 1000
return BatchGenerateResponse(
status=ResponseStatus.SUCCESS,
request_id=f"req_batch_{int(time.time()*1000000)}",
data=results,
metadata={
"total_requests": len(request.requests),
"successful": successful,
"failed": failed,
"total_time_ms": round(total_time, 2),
"parallel": request.parallel
}
)
except Exception as e:
logger.error(f"Error in batch generation: {str(e)}", exc_info=True)
raise HTTPException(status_code=500, detail="Internal server error")