| ← Back to Main Documentation | Core Systems Index |
Plings Key Management Guide - Three-Tier Implementation Strategy
Created: Sön 13 Jul 2025 12:53:21 CEST
Updated: Mån 17 Jul 2025 - Added 3-tier implementation strategy
Document Version: 2.0 - Three-Tier Key Management Architecture
Security Classification: Internal Technical Documentation
Target Audience: CTOs, Security Teams, Infrastructure Engineers, DevOps Engineers, Backend Developers
Author: Paul Wisén
Executive Summary
This guide presents Plings’ three-tier key management strategy that enables immediate deployment while providing a clear path to enterprise-grade security. The strategy balances rapid time-to-market with long-term security requirements through progressive implementation phases.
Three-Tier Implementation Strategy
| Tier | Solution | Timeline | Cost | Use Case |
|---|---|---|---|---|
| Initial | Vercel Environment Variables | Immediate | $0 | MVP, early customers, rapid deployment |
| Next Level | SoftHSM | 2-4 weeks | $20-50/mo | Growing business, enhanced security |
| Final Level | Hardware HSM | 6-9 months | $1,500-5,000/mo | Enterprise, compliance requirements |
What You’ll Learn
For Decision Makers: Cost-effective scaling strategy from $0 to enterprise For Technical Teams: Implementation guides for each tier with migration paths For Security Teams: Progressive security enhancement maintaining business velocity For Developers: Immediate deployment with Vercel, future-proof architecture
Why This Strategy Matters
Problem: Traditional HSM implementations require months of setup and significant upfront investment. Solution: Three-tier approach enables immediate deployment with progressive security enhancement. Business Impact: Launch in days instead of months, scale security with business growth.
Key Implementation Decisions
- Initial Tier: Vercel Environment Variables for immediate production deployment
- Next Level: SoftHSM on dedicated VPS for enhanced security (2-4 weeks)
- Final Level: AWS CloudHSM or Thales Luna for enterprise compliance (6-9 months)
- Cryptographic Standard: Ed25519 with planned CRYSTALS-Dilithium post-quantum migration
- Key Derivation: BIP32-compatible hierarchical deterministic wallet system
- Performance: 1,000 signatures/second (1000x safety buffer for <1 ops/sec actual usage)
HSM Performance Reality Check
Actual Plings HSM Usage:
- Verification: Happens client-side using public keys (no HSM involvement)
- Key Generation: Only for new organizations/paths (rare administrative tasks)
- Real Performance Need: <1 operation per second
- Specified Requirement: 1,000 operations/second (1000x safety buffer)
Why Performance Isn’t Critical:
# HSM operations (rare)
master_key = hsm.generate_master_key() # 1-4 times per year
anchor_key = hsm.derive_key(master_key, path) # 10-100 times per month
path_signature = hsm.sign(path_data) # 1,000-10,000 times per month
# Client operations (frequent, no HSM)
is_valid = verify_offline(instance_key, public_key) # Millions per day
Cost Impact: Standard HSM configurations are sufficient, avoiding premium high-performance pricing.
Table of Contents
- Three-Tier Implementation Overview
- Choosing the Right Tier
- Initial Tier: Vercel Environment Variables
- Next Level: SoftHSM Implementation
- Final Level: Hardware HSM
- Migration Paths Between Tiers
- What is an HSM?
- Key Concepts and Terminology
- BIP32 HD Wallet Implementation
- Security Monitoring and Compliance
- Disaster Recovery and Business Continuity
- Implementation Timeline
- Cost Analysis
- Glossary
Three-Tier Implementation Overview
Plings uses a progressive three-tier key management strategy that enables immediate deployment while maintaining a clear upgrade path to enterprise-grade security. Each tier is designed to serve specific business phases and can be seamlessly migrated to the next level as requirements evolve.
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ Three-Tier Key Management │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Initial Tier Next Level Final Level │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Vercel │ │ SoftHSM │ │ Hardware HSM│ │
│ │ Environment │ ───► │ VPS Server │ ──► │ AWS/Luna │ │
│ │ Variables │ │ PKCS#11 │ │ FIPS 140-2 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
│ │
│ Timeline: Now 2-4 weeks 6-9 months │
│ Cost: $0 $20-50/mo $1,500-5,000/mo │
│ │
└─────────────────────────────────────────────────────────────────┘
Tier Comparison Matrix
| Feature | Initial (Vercel) | Next Level (SoftHSM) | Final Level (Hardware) |
|---|---|---|---|
| Setup Time | < 1 day | 2-4 weeks | 6-9 months |
| Monthly Cost | $0 | $20-50 | $1,500-5,000 |
| Security Level | Software | HSM-equivalent | Hardware-certified |
| Key Storage | Encrypted env vars | PKCS#11 token | Tamper-proof hardware |
| Audit Trail | Application logs | PKCS#11 logs | Enterprise audit |
| Backup Method | Env var copy | Token backup | Hardware replication |
| Compliance | Basic | Standard | FIPS 140-2 L3 |
| Performance | 1000+ ops/sec | 1000+ ops/sec | 1000+ ops/sec |
| Migration Effort | N/A | 1 week | 2-4 weeks |
HD Wallet Consistency Across Tiers
All three tiers implement the same HD wallet structure, ensuring seamless migration:
Master Key: m/44'/501'/[wallet_version]'/
├── Manufacturer: m/44'/501'/[wallet]'/[manufacturer]'/
│ ├── Category: m/44'/501'/[wallet]'/[manufacturer]'/[category]'/
│ │ ├── Class: m/44'/501'/[wallet]'/[manufacturer]'/[category]'/[class]'/
│ │ │ ├── Batch: m/44'/501'/[wallet]'/[manufacturer]'/[category]'/[class]'/[batch]'/
│ │ │ │ └── Instance: .../[instance_number]
Key Points:
- Wallet version is stored in the path (supports wallet rotation)
- Same derivation logic across all tiers
- Only the master key storage location changes
- Public keys remain consistent during migration
Choosing the Right Tier
Decision Framework
Start with Initial Tier (Vercel) When:
- ✅ Launching MVP or proof of concept
- ✅ Need immediate production deployment
- ✅ Budget constraints (< $100/month)
- ✅ Team size < 10 developers
- ✅ Customer base < 1,000 organizations
Upgrade to Next Level (SoftHSM) When:
- ✅ Customer base > 1,000 organizations
- ✅ Security audit requirements
- ✅ Need key operation audit trails
- ✅ Budget allows $50-100/month
- ✅ Team has dedicated DevOps resources
Move to Final Level (Hardware HSM) When:
- ✅ Enterprise customer requirements
- ✅ Regulatory compliance (FIPS 140-2)
- ✅ Revenue > $1M annually
- ✅ Need geographic redundancy
- ✅ Handling high-value identifiers
Risk Assessment by Tier
| Risk Type | Initial | Next Level | Final Level |
|---|---|---|---|
| Key Compromise | Medium | Low | Very Low |
| Operational Complexity | Low | Medium | High |
| Vendor Lock-in | Medium (Vercel) | Low | Medium (AWS/Thales) |
| Scalability Limits | None | None | None |
| Recovery Time | Minutes | Hours | Hours-Days |
Initial Tier: Vercel Environment Variables
Quick Start (< 1 Hour)
The Vercel environment variable approach enables production deployment in under an hour with zero additional infrastructure costs.
Step 1: Generate Master Key
# Generate a new master key using Node.js
node -e "
const crypto = require('crypto');
const bs58 = require('bs58');
const masterKey = crypto.randomBytes(32);
console.log('PLINGS_MASTER_KEY=' + bs58.encode(masterKey));
"
# Output: PLINGS_MASTER_KEY=5KYZdUEo39z3FPLjCKpxKkGXstPbqGiELQgSXzFm9ysh
Step 2: Configure Vercel Environment
# Add to Vercel project settings
vercel env add PLINGS_MASTER_KEY production
vercel env add DATABASE_URL production
vercel env add NEO4J_URI production
Step 3: Implement Key Derivation
// api/generate-identifiers.js
import { deriveHDKey } from '@/lib/hd-wallet';
export default async function handler(req, res) {
const { manufacturer, category, classId, batch, quantity } = req.body;
// Load master key from environment
const masterKey = process.env.PLINGS_MASTER_KEY;
// Derive keys for the batch
const identifiers = [];
for (let i = 1; i <= quantity; i++) {
const path = `m/44'/501'/1'/${manufacturer}'/${category}'/${classId}'/${batch}'/${i}`;
const keyPair = await deriveHDKey(masterKey, path);
identifiers.push({
path,
publicKey: keyPair.publicKey,
// Private key is discarded - never stored
});
}
// Store only public keys in database
await storeIdentifiers(identifiers);
return res.json({ success: true, count: identifiers.length });
}
Security Model
What’s Protected:
- Master key encrypted by Vercel’s infrastructure
- Private keys never persisted (derived on-demand)
- Database contains only public keys
Security Boundaries:
- Vercel team members with env access
- API functions have key access during execution
- No key material in application logs
Best Practices:
- Rotate master key quarterly
- Limit Vercel team access
- Enable audit logging
- Use separate keys for dev/staging/prod
- Implement rate limiting on key generation APIs
For Detailed Implementation
See Vercel Key Management Guide for:
- Complete code examples
- Database schema
- API endpoint specifications
- Security hardening
- Monitoring setup
Next Level: SoftHSM Implementation
Overview
SoftHSM provides HSM-equivalent security using PKCS#11 standard interfaces, enabling audit trails, key ceremonies, and compliance features without hardware costs.
Architecture
┌─────────────────────────────────────────────────────┐
│ SoftHSM Architecture │
├─────────────────────────────────────────────────────┤
│ │
│ Vercel Functions VPS/Cloud Server │
│ ┌─────────────┐ ┌───────────────────┐ │
│ │ API │ HTTPS │ HSM Service │ │
│ │ Handlers │ ───────► │ ┌─────────────┐ │ │
│ │ │ │ │ SoftHSM2 │ │ │
│ └─────────────┘ │ │ PKCS#11 │ │ │
│ │ └─────────────┘ │ │
│ │ │ │
│ │ Key Storage │ │
│ │ (Encrypted) │ │
│ └───────────────────┘ │
│ │
└─────────────────────────────────────────────────────┘
Implementation Timeline (2-4 Weeks)
Week 1: Infrastructure Setup
- Provision VPS ($20-50/month)
- Install SoftHSM2 and dependencies
- Configure network security
Week 2: HSM Service Development
- PKCS#11 integration
- REST API wrapper
- TLS certificate setup
Week 3: Integration & Testing
- Update Vercel functions
- Test key operations
- Performance validation
Week 4: Migration & Go-Live
- Migrate from env variables
- Monitor operations
- Document procedures
Key Features
Enhanced Security:
- Keys never leave HSM boundary
- PKCS#11 audit trail
- Multi-person key ceremonies
- Hardware-equivalent protections
Operational Benefits:
- Standard HSM interfaces
- Backup/restore procedures
- Key rotation capabilities
- Performance monitoring
For Detailed Implementation
See SoftHSM Migration Guide for:
- Step-by-step setup
- PKCS#11 integration
- API implementation
- Migration procedures
Final Level: Hardware HSM
Enterprise-Grade Security
Hardware HSMs provide the highest level of security with physical tamper protection, compliance certifications, and enterprise support.
Primary Options
AWS CloudHSM
- Certification: FIPS 140-2 Level 3
- Deployment: Managed service in AWS
- Cost: ~$1,500/month per HSM
- Best For: Cloud-native architectures
Thales Luna Network HSM
- Certification: FIPS 140-2 L3, Common Criteria EAL4+
- Deployment: On-premises or cloud
- Cost: $15,000-50,000 purchase + support
- Best For: Multi-cloud, hybrid deployments
Implementation Considerations
Timeline: 6-9 months including:
- Vendor selection (1-2 months)
- Procurement (1-2 months)
- Implementation (2-3 months)
- Testing & certification (2 months)
Team Requirements:
- Security architect
- HSM administrators
- DevOps engineers
- Compliance officer
When Hardware HSM is Required
Regulatory Requirements:
- Financial services compliance
- Government contracts
- Healthcare data protection
- High-value asset protection
Business Triggers:
- Revenue > $10M annually
- Enterprise customer mandates
- International expansion
- IPO preparation
Migration Paths Between Tiers
Initial → Next Level (Vercel to SoftHSM)
Duration: 1-2 weeks with zero downtime
Phase 1: Parallel Operation (3 days)
// Dual-mode key derivation
async function deriveKey(path) {
if (process.env.USE_SOFTHSM === 'true') {
return await hsmClient.deriveKey(path);
} else {
return await deriveFromEnvVar(process.env.PLINGS_MASTER_KEY, path);
}
}
Phase 2: Migration (2 days)
- Deploy SoftHSM service
- Import master key to HSM
- Test with subset of traffic
- Monitor performance
Phase 3: Cutover (1 day)
- Switch all traffic to SoftHSM
- Remove env variable access
- Update documentation
Next Level → Final Level (SoftHSM to Hardware)
Duration: 2-4 weeks with planned maintenance windows
Key Migration Strategy:
- Generate new master key in hardware HSM
- Create new wallet version (v2)
- Issue new identifiers with v2
- Maintain v1 for existing identifiers
- Gradual migration over time
Technical Approach:
# Multi-wallet support during migration
class WalletManager:
def __init__(self):
self.wallets = {
1: SoftHSMWallet(), # Existing
2: HardwareHSMWallet() # New
}
def get_wallet(self, version):
return self.wallets[version]
What is an HSM?
An Hardware Security Module (HSM) is a dedicated cryptographic device designed to securely generate, store, and manage digital keys and perform cryptographic operations.
Key Characteristics:
Physical Security:
- Tamper-resistant/tamper-evident hardware
- If someone tries to physically break into it, it destroys the keys
- Certified to standards like FIPS 140-2 Level 3 or Common Criteria
Cryptographic Operations:
- Key generation with true random number generators
- Digital signing and verification
- Encryption/decryption operations
- Key derivation (like BIP32 HD wallet operations)
Key Storage:
- Keys are generated and stored inside the HSM
- Keys never leave the HSM in plaintext
- Even administrators cannot extract the actual key material
Why HSMs for Plings?
In the context of our wallet-first architecture:
Without HSM (Risky):
# DANGEROUS - Private key in software
master_private_key = "abc123..." # Stored in database or file
# Anyone with access to this can generate fake Plings identifiers
With HSM (Secure):
# SECURE - Private key never leaves HSM hardware
hsm.generate_signature(message) # Happens inside tamper-proof hardware
# Even if someone hacks our servers, they can't get the master key
Real-World Analogy
Think of an HSM like a high-security bank vault:
Bank Vault: Physically secure, tamper-evident, requires multiple people to open
HSM: Physically secure, tamper-evident, requires authentication to use
Bank Vault: Stores valuable physical assets (gold, cash)
HSM: Stores valuable digital assets (cryptographic keys)
Bank Vault: If someone breaks in, alarms go off
HSM: If someone tampers with it, it destroys the keys
HSM vs Regular Computer Security
| Feature | Regular Server | HSM |
|---|---|---|
| Security Model | Software-based security | Hardware-based security |
| Key Storage | Keys stored in files/database | Keys never leave secure hardware |
| Vulnerability | Vulnerable to malware | Isolated from operating system |
| Remote Access | Can be remotely compromised | Requires physical presence |
| Key Protection | Keys can be copied | Keys cannot be extracted |
Why This Matters for Plings
Since every Plings identifier must cryptographically derive from our master key:
❌ If master key is compromised → Anyone can create fake Plings identifiers
✅ If master key is in HSM → Only authorized operations through secure hardware
The HSM is essentially the Fort Knox for Plings’ cryptographic security! 🏛️
Key Concepts and Terminology
Before diving into implementation, let’s clarify the essential technical concepts:
FIPS 140-2 Certification
FIPS 140-2 is a U.S. government standard for cryptographic modules. It has 4 security levels:
- Level 1: Basic security (software-only)
- Level 2: Role-based authentication
- Level 3: Physical tamper detection and response ⭐ (Plings uses this)
- Level 4: Environmental failure protection
Why Level 3 matters for Plings: If someone tries to physically attack the HSM, it automatically destroys all keys, preventing theft.
PKCS#11
PKCS#11 is the standard interface for communicating with cryptographic tokens and HSMs. Think of it as the “USB driver” for HSMs - it provides a consistent API regardless of the HSM vendor.
# All HSMs use the same PKCS#11 interface
hsm.login(username, password)
private_key = hsm.generateKeyPair(algorithm="Ed25519")
signature = hsm.sign(private_key, message)
Ed25519
Ed25519 is a modern elliptic curve cryptography algorithm. We chose it because:
- Performance: Very fast signing and verification
- Security: Resistant to side-channel attacks
- Simplicity: No parameter choices that could weaken security
- Solana Standard: Native support in Solana blockchain
Ed25519 vs RSA: | Feature | Ed25519 | RSA-2048 | |———|———|———-| | Key Size | 32 bytes | 256 bytes | | Signature Size | 64 bytes | 256 bytes | | Performance | 10x faster | Slower | | Quantum Resistance | Better | Worse |
BIP32/BIP39 Standards
BIP32 (Hierarchical Deterministic Wallets) and BIP39 (Mnemonic Codes) are Bitcoin standards we use:
BIP32: Allows generating millions of keys from one master key
Master Key → Wallet v1 → Manufacturer → Category → Class → Instance
BIP39: Converts keys to human-readable words
Master Key = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"
Why use Bitcoin standards? They’re battle-tested, widely supported, and have excellent tooling.
Shamir’s Secret Sharing
Shamir’s Secret Sharing splits a secret into multiple shares where you need a threshold to reconstruct it.
Example: Split master key into 5 shares, need any 3 to recover:
- Share 1: Security Officer (offline storage)
- Share 2: CTO (safety deposit box)
- Share 3: External trustee (bank vault)
- Share 4: Compliance officer (secure facility)
- Share 5: Emergency contact (geographic separation)
Business Benefit: No single person can compromise the master key, but legitimate recovery is still possible.
Prerequisites and Assumptions
Required Knowledge
Before implementing HSM integration, your team should understand:
Infrastructure Team:
- AWS services (VPC, EC2, IAM, CloudFormation)
- Network security (firewalls, VPNs, SSL/TLS)
- Linux system administration
- Database administration (PostgreSQL)
Development Team:
- Python programming (asyncio, cryptography libraries)
- REST API development (FastAPI, authentication)
- Database operations (SQL, transactions)
- Git version control and CI/CD
Security Team:
- Cryptographic concepts (public/private keys, signatures)
- Security incident response procedures
- Compliance frameworks (SOC 2, GDPR)
- Risk assessment and threat modeling
Infrastructure Requirements
AWS Account Setup:
- Production AWS account with appropriate IAM roles
- VPC with private subnets for HSM deployment
- Network connectivity between HSM and application servers
- Monitoring and logging infrastructure (CloudWatch, CloudTrail)
Security Requirements:
- Multi-factor authentication for all administrative access
- Network segmentation and firewall rules
- Secure backup and disaster recovery procedures
- Incident response team and procedures
Team Roles and Responsibilities
Security Officer:
- Overall security strategy and governance
- HSM key ceremony oversight
- Incident response coordination
- Compliance and audit management
Infrastructure Engineer:
- HSM deployment and configuration
- Network setup and security
- Monitoring and alerting setup
- Disaster recovery testing
Backend Developer:
- HSM client integration
- API development and testing
- Database schema implementation
- Performance optimization
DevOps Engineer:
- CI/CD pipeline integration
- Automated testing and deployment
- Infrastructure as code
- Operational monitoring
Security Clearance and Access Control
HSM Administrator Access:
- Requires security background check
- Multi-person authorization for key operations
- Regular access reviews and rotation
- Audit logging of all activities
Development Access:
- Separate development environment with SoftHSM
- No access to production HSM or keys
- Code review requirements for HSM-related changes
- Security training and awareness
Quick Start for Different Audiences
For CTO/Decision Makers
Business Problem: Plings identifiers must be cryptographically secure to prevent counterfeiting and maintain trust.
Solution: Hardware Security Modules provide tamper-proof key storage and cryptographic operations.
Investment Summary:
- Setup Cost: $20,000-$35,000 annually (standard tier sufficient)
- Implementation Time: 3-6 months
- Risk Reduction: Prevents identifier counterfeiting (potentially millions in losses)
- Compliance: Meets SOC 2, GDPR, and financial industry standards
Decision Points:
- AWS CloudHSM for primary production (recommended)
- Thales Luna for disaster recovery backup
- SoftHSM for development environments
For DevOps Engineers
Implementation Timeline:
- Month 1: Development environment setup with SoftHSM
- Month 2: AWS CloudHSM infrastructure deployment
- Month 3: Production integration and testing
- Month 4: Disaster recovery setup with Thales Luna
- Month 5: Security monitoring and compliance setup
- Month 6: Full production deployment and team training
Infrastructure Needs:
- AWS CloudHSM cluster with multi-AZ deployment
- VPC with private subnets and security groups
- Application servers with HSM client libraries
- Monitoring and logging infrastructure
- Backup and disaster recovery procedures
For Security Teams
Threat Model:
- Primary Threat: Master key compromise leading to identifier counterfeiting
- Secondary Threats: Insider threats, physical attacks, software vulnerabilities
- Mitigation: HSM provides hardware-level protection with tamper detection
Compliance Requirements:
- FIPS 140-2 Level 3 certification for cryptographic modules
- SOC 2 Type II controls for security and availability
- GDPR compliance for data protection and privacy
- Audit Trail: Complete logging of all HSM operations
Security Controls:
- Multi-factor authentication for HSM access
- Role-based access control with principle of least privilege
- Network segmentation and encryption
- Regular security assessments and penetration testing
For Developers
API Integration: The HSM service provides simple REST APIs for common operations:
# Generate identifier batch
response = requests.post('/api/v1/identifiers/generate', json={
'wallet_version': 1,
'paths': ['1.1.C1.1.1', '1.1.C1.1.2'],
'allocation_type': 'generic'
})
# Check HSM health
health = requests.get('/api/v1/health')
Development Environment:
- Use SoftHSM for local development
- Full API compatibility with production HSMs
- Automated testing with CI/CD integration
- Performance benchmarking and optimization
HSM Vendor Selection and Comparison
Recommended HSM Solutions
1. AWS CloudHSM (Primary Production)
Why CloudHSM for Plings:
- FIPS 140-2 Level 3 certification for regulatory compliance
- Adequate Performance: >1,000 Ed25519 signatures/second (exceeds Plings needs)
- Seamless AWS Integration: Natural fit with existing cloud infrastructure
- High Availability: Multi-AZ deployment with automatic failover
- Managed Service: AWS handles hardware maintenance and security updates
Technical Specifications:
hsm_specification:
vendor: "AWS CloudHSM"
model: "AWS CloudHSM (Cavium SafeNet Luna)"
certification: "FIPS 140-2 Level 3"
performance:
ed25519_signatures_per_second: 1000+ # Exceeds Plings requirements
concurrent_sessions: 100
actual_plings_usage: "<1 operation/second"
availability:
sla: "99.9%"
multi_az: true
automatic_failover: true
integration:
api: "PKCS#11, OpenSSL, JCE"
sdk: "AWS SDK with HSM extensions"
languages: ["Python", "Java", "C++", "JavaScript"]
Cost Structure:
- Initialization: $3,000 one-time setup fee
- Monthly: $1,500 per HSM instance (standard configuration sufficient)
- Usage: $0.01 per 1,000 operations (~$1/month for Plings usage)
- Estimated Annual Cost: ~$20,000 for production setup (standard tier)
2. Thales Luna Network HSM (Backup/DR)
Why Luna for Disaster Recovery:
- Geographic Independence: Non-AWS vendor for risk diversification
- Proven Track Record: Widely used in financial services
- High Availability: Network-attached HSMs with failover clustering
- Compliance: FIPS 140-2 Level 3 and Common Criteria EAL4+
Technical Specifications:
hsm_specification:
vendor: "Thales"
model: "Luna Network HSM 7"
certification: "FIPS 140-2 Level 3, Common Criteria EAL4+"
performance:
ed25519_signatures_per_second: 1000+ # Standard model sufficient
concurrent_sessions: 50
actual_plings_usage: "<1 operation/second"
availability:
clustering: "High Availability with automatic failover"
load_balancing: true
integration:
api: "PKCS#11, Microsoft CNG, OpenSSL"
management: "Luna HSM Client and utilities"
3. SoftHSM (Development/Testing)
Why SoftHSM for Development:
- Cost Effective: Free open-source software-based HSM
- Easy Setup: Simple installation and configuration
- Full API Compatibility: PKCS#11 interface identical to hardware HSMs
- Development Speed: Rapid iteration without hardware constraints
Technical Specifications:
hsm_specification:
vendor: "OpenDNSSEC"
model: "SoftHSM 2.x"
certification: "Software-based (no hardware certification)"
performance:
ed25519_signatures_per_second: 1000+ # More than adequate for development
memory_based: true
thread_safe: true
actual_plings_usage: "<1 operation/second"
integration:
api: "PKCS#11"
platforms: ["Linux", "macOS", "Windows"]
languages: ["Python", "C++", "Java"]
HSM Comparison Matrix
| Feature | AWS CloudHSM | Thales Luna | SoftHSM |
|---|---|---|---|
| Certification | FIPS 140-2 L3 | FIPS 140-2 L3 | None |
| Performance | 1,000+ sig/sec | 1,000+ sig/sec | 1,000+ sig/sec |
| Plings Usage | <1 sig/sec | <1 sig/sec | <1 sig/sec |
| Cost | $20K/year | $35K/year | Free |
| Availability | 99.9% | 99.95% | Software-dependent |
| Use Case | Production | DR/Backup | Development |
| Geographic | AWS regions | On-premises | Any |
| Compliance | SOC 2, FedRAMP | SOC 2, CC EAL4+ | None |
Production HSM Implementation
AWS CloudHSM Setup
1. Infrastructure Setup
#!/bin/bash
# AWS CloudHSM cluster setup for Plings production environment
# Create HSM cluster
aws cloudhsmv2 create-cluster \
--hsm-type hsm1.medium \
--subnet-ids subnet-12345678 subnet-87654321 \
--tag-specifications 'ResourceType=cluster,Tags=[{Key=Project,Value=Plings},{Key=Environment,Value=Production}]'
# Create HSM instances in multiple AZs
aws cloudhsmv2 create-hsm \
--cluster-id cluster-1234567890abcdef0 \
--availability-zone us-west-2a
aws cloudhsmv2 create-hsm \
--cluster-id cluster-1234567890abcdef0 \
--availability-zone us-west-2b
# Initialize cluster
aws cloudhsmv2 initialize-cluster \
--cluster-id cluster-1234567890abcdef0 \
--signed-cert file://customerCA.crt \
--trust-anchor file://customerCA.crt
2. HSM Client Configuration
# hsm_client.py - Production HSM client implementation
import boto3
import cloudhsm_mgmt_util
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
class PlingsHSMClient:
"""Production HSM client for Plings wallet operations"""
def __init__(self, cluster_id: str):
self.cluster_id = cluster_id
self.hsm_client = boto3.client('cloudhsmv2')
self.pkcs11_lib = '/opt/cloudhsm/lib/libcloudhsm_pkcs11.so'
self.session = None
def initialize_session(self, username: str, password: str):
"""Initialize PKCS#11 session with HSM"""
import PyKCS11
self.pkcs11 = PyKCS11.PyKCS11Lib()
self.pkcs11.load(self.pkcs11_lib)
# Get first available slot
slots = self.pkcs11.getSlotList(tokenPresent=True)
if not slots:
raise Exception("No HSM slots available")
# Open session
self.session = self.pkcs11.openSession(slots[0])
# Login with CU (Crypto User) credentials
self.session.login(username, password)
print(f"✅ HSM session initialized for slot {slots[0]}")
def generate_master_key(self, key_label: str, wallet_version: int) -> str:
"""Generate master key for specific wallet version"""
import PyKCS11
# Key generation template
key_template = [
(PyKCS11.CKA_TOKEN, True),
(PyKCS11.CKA_PRIVATE, True),
(PyKCS11.CKA_SENSITIVE, True),
(PyKCS11.CKA_EXTRACTABLE, False),
(PyKCS11.CKA_SIGN, True),
(PyKCS11.CKA_LABEL, f"{key_label}_v{wallet_version}"),
(PyKCS11.CKA_ID, f"plings_master_v{wallet_version}".encode()),
]
# Generate Ed25519 key pair
public_key, private_key = self.session.generateKeyPair(
PyKCS11.CKM_EC_EDWARDS_KEY_PAIR_GEN,
key_template,
key_template
)
# Export public key for verification
public_key_der = self.session.getAttributeValue(
public_key, [PyKCS11.CKA_EC_POINT]
)[0]
key_id = f"plings_master_key_v{wallet_version}"
print(f"✅ Generated master key: {key_id}")
return key_id
def sign_with_master_key(self, key_id: str, message: bytes) -> bytes:
"""Sign message with master key"""
import PyKCS11
# Find private key by ID
private_key = self.session.findObjects([
(PyKCS11.CKA_ID, key_id.encode()),
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY)
])[0]
# Sign message
signature = self.session.sign(private_key, message)
return bytes(signature)
def derive_wallet_key(self, master_key_id: str, derivation_path: str) -> bytes:
"""Derive wallet-specific key from master key"""
# This would implement BIP32 derivation within HSM
# For security, derivation happens inside HSM hardware
pass
def backup_key_material(self, key_id: str) -> dict:
"""Create encrypted backup of key material"""
# Implementation depends on HSM vendor APIs
# Returns encrypted key material for disaster recovery
pass
def health_check(self) -> dict:
"""Check HSM health and performance"""
try:
# Test signature operation
test_message = b"health_check_test"
start_time = time.time()
# Find any available private key for testing
test_keys = self.session.findObjects([
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
(PyKCS11.CKA_SIGN, True)
])
if test_keys:
self.session.sign(test_keys[0], test_message)
response_time = time.time() - start_time
return {
'status': 'healthy',
'response_time_ms': response_time * 1000,
'available_keys': len(test_keys),
'cluster_id': self.cluster_id
}
else:
return {
'status': 'warning',
'message': 'No test keys available'
}
except Exception as e:
return {
'status': 'error',
'message': str(e)
}
3. HSM Service Integration
# hsm_service.py - Service layer for HSM operations
import asyncio
import logging
from typing import Dict, Optional
from dataclasses import dataclass
@dataclass
class HSMConfig:
primary_cluster_id: str
backup_cluster_id: str
region: str
key_rotation_days: int = 365
backup_schedule: str = "daily"
class PlingsHSMService:
"""High-level HSM service for Plings wallet operations"""
def __init__(self, config: HSMConfig):
self.config = config
self.primary_hsm = PlingsHSMClient(config.primary_cluster_id)
self.backup_hsm = PlingsHSMClient(config.backup_cluster_id)
self.logger = logging.getLogger(__name__)
async def initialize_production_environment(self):
"""Initialize production HSM environment"""
try:
# Initialize primary HSM
await self.primary_hsm.initialize_session(
username=os.environ['HSM_PRIMARY_USER'],
password=os.environ['HSM_PRIMARY_PASSWORD']
)
# Initialize backup HSM
await self.backup_hsm.initialize_session(
username=os.environ['HSM_BACKUP_USER'],
password=os.environ['HSM_BACKUP_PASSWORD']
)
self.logger.info("✅ Production HSM environment initialized")
except Exception as e:
self.logger.error(f"❌ HSM initialization failed: {e}")
raise
async def create_new_wallet_version(self, wallet_version: int,
description: str) -> str:
"""Create new wallet version with master key generation"""
try:
# Generate master key in primary HSM
master_key_id = await self.primary_hsm.generate_master_key(
key_label=f"plings_master",
wallet_version=wallet_version
)
# Backup key material to secondary HSM
await self.backup_key_to_secondary_hsm(master_key_id)
# Update database with new wallet version
await self.register_wallet_version(wallet_version, master_key_id, description)
self.logger.info(f"✅ Created wallet version {wallet_version} with key {master_key_id}")
return master_key_id
except Exception as e:
self.logger.error(f"❌ Wallet creation failed: {e}")
raise
async def sign_identifier_batch(self, wallet_version: int,
paths: List[str]) -> List[str]:
"""Sign batch of identifier paths"""
try:
master_key_id = await self.get_master_key_id(wallet_version)
signatures = []
for path in paths:
# Create message to sign (path + timestamp)
message = f"{path}:{int(time.time())}".encode()
# Sign with master key
signature = await self.primary_hsm.sign_with_master_key(
master_key_id, message
)
signatures.append(signature.hex())
return signatures
except Exception as e:
self.logger.error(f"❌ Batch signing failed: {e}")
raise
async def emergency_key_rotation(self, compromised_wallet_version: int,
incident_id: str) -> int:
"""Emergency key rotation during security incident"""
try:
# Generate new wallet version
new_version = await self.get_next_wallet_version()
# Create emergency master key
emergency_key_id = await self.create_new_wallet_version(
new_version, f"Emergency rotation - Incident {incident_id}"
)
# Mark old wallet as compromised
await self.mark_wallet_compromised(compromised_wallet_version, incident_id)
self.logger.info(f"✅ Emergency rotation complete: v{compromised_wallet_version} → v{new_version}")
return new_version
except Exception as e:
self.logger.error(f"❌ Emergency rotation failed: {e}")
raise
async def health_monitoring(self) -> Dict[str, any]:
"""Comprehensive HSM health monitoring"""
try:
# Check primary HSM
primary_health = await self.primary_hsm.health_check()
# Check backup HSM
backup_health = await self.backup_hsm.health_check()
# Performance metrics
performance_metrics = await self.collect_performance_metrics()
return {
'primary_hsm': primary_health,
'backup_hsm': backup_health,
'performance': performance_metrics,
'overall_status': 'healthy' if primary_health['status'] == 'healthy' else 'degraded'
}
except Exception as e:
self.logger.error(f"❌ Health monitoring failed: {e}")
return {'overall_status': 'error', 'message': str(e)}
Key Generation and Ceremony
Master Key Generation Ceremony
1. Pre-Ceremony Setup
#!/bin/bash
# Key generation ceremony setup script
# Create secure ceremony environment
mkdir -p /secure/ceremony
chmod 700 /secure/ceremony
cd /secure/ceremony
# Verify air-gapped environment
if ping -c 1 8.8.8.8 &> /dev/null; then
echo "❌ ERROR: Network connectivity detected. Ensure air-gapped environment."
exit 1
fi
# Hardware verification
echo "📋 Hardware Security Verification Checklist:"
echo "1. Air-gapped environment: ✓"
echo "2. HSM hardware present: $(lsusb | grep -i 'hsm\|safenet' | wc -l) devices"
echo "3. Video recording: [ ] Started"
echo "4. Witnesses present: [ ] Confirmed"
echo "5. Entropy sources: [ ] Hardware RNG, [ ] Atmospheric noise"
2. Ceremony Procedure
# key_ceremony.py - Formal key generation ceremony
import secrets
import hashlib
import time
from datetime import datetime
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519
class KeyGenerationCeremony:
"""Formal key generation ceremony with multiple witnesses"""
def __init__(self, ceremony_id: str, witnesses: List[str]):
self.ceremony_id = ceremony_id
self.witnesses = witnesses
self.ceremony_log = []
self.start_time = datetime.utcnow()
def log_ceremony_event(self, event: str, witness: str = None):
"""Log ceremony events with timestamps"""
timestamp = datetime.utcnow()
log_entry = {
'timestamp': timestamp.isoformat(),
'event': event,
'witness': witness,
'ceremony_id': self.ceremony_id
}
self.ceremony_log.append(log_entry)
print(f"[{timestamp}] {event}" + (f" (Witness: {witness})" if witness else ""))
def collect_entropy_sources(self) -> bytes:
"""Collect entropy from multiple sources"""
self.log_ceremony_event("Starting entropy collection")
# Source 1: Hardware RNG
hardware_entropy = secrets.token_bytes(32)
self.log_ceremony_event("Hardware RNG entropy collected", self.witnesses[0])
# Source 2: Atmospheric noise (simulated)
atmospheric_entropy = secrets.token_bytes(32)
self.log_ceremony_event("Atmospheric noise entropy collected", self.witnesses[1])
# Source 3: HSM internal entropy
hsm_entropy = secrets.token_bytes(32)
self.log_ceremony_event("HSM internal entropy collected", self.witnesses[2])
# Combine entropy sources
combined_entropy = hashlib.sha256(
hardware_entropy + atmospheric_entropy + hsm_entropy
).digest()
self.log_ceremony_event("Entropy sources combined and hashed")
return combined_entropy
def generate_master_key(self, wallet_version: int) -> dict:
"""Generate master key with ceremony validation"""
self.log_ceremony_event(f"Starting master key generation for wallet v{wallet_version}")
# Collect entropy
entropy = self.collect_entropy_sources()
# Generate key using HSM
key_id = f"plings_master_key_v{wallet_version}"
# Witness verification
for witness in self.witnesses:
self.log_ceremony_event(f"Witness verification of key generation", witness)
# In real implementation, witnesses would verify the process
# Create key backup shares (Shamir's Secret Sharing)
backup_shares = self.create_backup_shares(entropy, threshold=3, total_shares=5)
self.log_ceremony_event("Master key generation completed")
return {
'key_id': key_id,
'wallet_version': wallet_version,
'ceremony_id': self.ceremony_id,
'witnesses': self.witnesses,
'backup_shares': backup_shares,
'created_at': datetime.utcnow().isoformat()
}
def create_backup_shares(self, entropy: bytes, threshold: int, total_shares: int) -> List[str]:
"""Create Shamir's Secret Sharing backup shares"""
# Implementation of Shamir's Secret Sharing
# This is a simplified example - use a proven library in production
shares = []
for i in range(total_shares):
share = hashlib.sha256(entropy + i.to_bytes(1, 'big')).hexdigest()
shares.append(share)
return shares
def finalize_ceremony(self, key_result: dict) -> dict:
"""Finalize ceremony and generate reports"""
self.log_ceremony_event("Finalizing key generation ceremony")
ceremony_duration = datetime.utcnow() - self.start_time
ceremony_report = {
'ceremony_id': self.ceremony_id,
'duration_seconds': ceremony_duration.total_seconds(),
'witnesses': self.witnesses,
'key_id': key_result['key_id'],
'wallet_version': key_result['wallet_version'],
'ceremony_log': self.ceremony_log,
'verification_hash': hashlib.sha256(
str(self.ceremony_log).encode()
).hexdigest()
}
self.log_ceremony_event("Ceremony report generated")
return ceremony_report
# Example ceremony execution
def execute_key_ceremony():
"""Execute complete key generation ceremony"""
witnesses = [
"alice@plings.io",
"bob@plings.io",
"charlie@plings.io"
]
ceremony = KeyGenerationCeremony(
ceremony_id="CEREMONY-2025-07-13-001",
witnesses=witnesses
)
# Generate master key for wallet version 2
key_result = ceremony.generate_master_key(wallet_version=2)
# Finalize ceremony
ceremony_report = ceremony.finalize_ceremony(key_result)
# Save ceremony report
with open(f"/secure/ceremony/{ceremony.ceremony_id}-report.json", "w") as f:
json.dump(ceremony_report, f, indent=2)
print("✅ Key generation ceremony completed successfully")
return ceremony_report
3. Post-Ceremony Procedures
#!/bin/bash
# Post-ceremony security procedures
# Secure ceremony artifacts
tar -czf ceremony-artifacts.tar.gz *.json *.log
gpg --encrypt --recipient security@plings.io ceremony-artifacts.tar.gz
# Distribute backup shares to trustees
echo "📋 Backup Share Distribution:"
echo "Share 1: Security Officer (offline storage)"
echo "Share 2: CTO (safety deposit box)"
echo "Share 3: External trustee (bank vault)"
echo "Share 4: Compliance officer (secure facility)"
echo "Share 5: Emergency contact (geographic separation)"
# Verify HSM key installation
echo "🔍 Verifying HSM key installation..."
/opt/cloudhsm/bin/key_mgmt_util listKeys
# Clean up ceremony environment
shred -vfz -n 3 *.tmp *.log
rm -rf /tmp/ceremony-*
echo "✅ Post-ceremony procedures completed"
Seed Phrase and Mnemonic Management
BIP39 Mnemonic Implementation
1. Mnemonic Generation
# mnemonic_manager.py - BIP39 mnemonic and seed phrase management
import secrets
import hashlib
import hmac
from mnemonic import Mnemonic
from typing import List, Optional
class PlingsMnemonicManager:
"""Secure mnemonic and seed phrase management for Plings wallets"""
def __init__(self, language: str = "english"):
self.mnemonic_generator = Mnemonic(language)
self.language = language
def generate_mnemonic(self, strength: int = 256) -> str:
"""Generate BIP39 mnemonic phrase"""
# Generate cryptographically secure entropy
entropy = secrets.token_bytes(strength // 8)
# Generate mnemonic from entropy
mnemonic = self.mnemonic_generator.to_mnemonic(entropy)
# Validate mnemonic
if not self.mnemonic_generator.check(mnemonic):
raise Exception("Generated mnemonic failed validation")
return mnemonic
def mnemonic_to_seed(self, mnemonic: str, passphrase: str = "") -> bytes:
"""Convert mnemonic to seed for HD wallet derivation"""
# Validate mnemonic
if not self.mnemonic_generator.check(mnemonic):
raise Exception("Invalid mnemonic phrase")
# Generate seed using PBKDF2
seed = self.mnemonic_generator.to_seed(mnemonic, passphrase)
return seed
def create_wallet_seed_phrase(self, wallet_version: int) -> dict:
"""Create wallet-specific seed phrase with HSM backup"""
# Generate mnemonic
mnemonic = self.generate_mnemonic(strength=256)
# Create seed
seed = self.mnemonic_to_seed(mnemonic)
# Create backup shares using Shamir's Secret Sharing
shares = self.create_mnemonic_shares(mnemonic, threshold=3, total_shares=5)
return {
'wallet_version': wallet_version,
'seed_id': f"plings_seed_v{wallet_version}",
'mnemonic_words': len(mnemonic.split()),
'seed_length': len(seed),
'backup_shares': shares,
'created_at': datetime.utcnow().isoformat()
}
def create_mnemonic_shares(self, mnemonic: str, threshold: int, total_shares: int) -> List[str]:
"""Create Shamir's Secret Sharing shares of mnemonic"""
# This is a simplified implementation
# In production, use a proven library like `secretsharing`
shares = []
mnemonic_bytes = mnemonic.encode('utf-8')
for i in range(total_shares):
share_seed = hashlib.sha256(mnemonic_bytes + i.to_bytes(1, 'big')).digest()
share = share_seed.hex()
shares.append(f"share_{i+1}:{share}")
return shares
def recover_from_shares(self, shares: List[str], threshold: int) -> str:
"""Recover mnemonic from Shamir's Secret Sharing shares"""
if len(shares) < threshold:
raise Exception(f"Insufficient shares: need {threshold}, have {len(shares)}")
# Simplified recovery (use proper SSS library in production)
# This is just for demonstration
return "recovered_mnemonic_placeholder"
def validate_seed_phrase(self, mnemonic: str) -> dict:
"""Comprehensive seed phrase validation"""
validation_result = {
'valid': False,
'word_count': 0,
'checksum_valid': False,
'entropy_bits': 0,
'errors': []
}
try:
# Split mnemonic into words
words = mnemonic.strip().split()
validation_result['word_count'] = len(words)
# Check word count
if len(words) not in [12, 15, 18, 21, 24]:
validation_result['errors'].append("Invalid word count")
return validation_result
# Validate checksum
if not self.mnemonic_generator.check(mnemonic):
validation_result['errors'].append("Invalid checksum")
return validation_result
validation_result['checksum_valid'] = True
# Calculate entropy bits
validation_result['entropy_bits'] = (len(words) * 11) - (len(words) // 3)
# Check entropy strength
if validation_result['entropy_bits'] < 128:
validation_result['errors'].append("Insufficient entropy")
validation_result['valid'] = len(validation_result['errors']) == 0
except Exception as e:
validation_result['errors'].append(f"Validation error: {str(e)}")
return validation_result
# Example usage
def setup_wallet_mnemonic():
"""Setup mnemonic for new wallet version"""
mnemonic_manager = PlingsMnemonicManager()
# Create seed phrase for wallet version 2
seed_result = mnemonic_manager.create_wallet_seed_phrase(wallet_version=2)
print(f"✅ Created seed phrase for wallet v{seed_result['wallet_version']}")
print(f"📝 Mnemonic words: {seed_result['mnemonic_words']}")
print(f"🔑 Seed length: {seed_result['seed_length']} bytes")
print(f"📋 Backup shares: {len(seed_result['backup_shares'])}")
return seed_result
2. Secure Mnemonic Storage
# secure_mnemonic_storage.py - Encrypted mnemonic storage
import os
import base64
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
class SecureMnemonicStorage:
"""Secure encrypted storage for mnemonic phrases"""
def __init__(self, storage_path: str = "/secure/mnemonics"):
self.storage_path = storage_path
os.makedirs(storage_path, mode=0o700, exist_ok=True)
def derive_key(self, password: str, salt: bytes) -> bytes:
"""Derive encryption key from password"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100000,
)
key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
return key
def encrypt_mnemonic(self, mnemonic: str, password: str) -> dict:
"""Encrypt mnemonic with password"""
# Generate random salt
salt = os.urandom(16)
# Derive key
key = self.derive_key(password, salt)
# Encrypt mnemonic
fernet = Fernet(key)
encrypted_mnemonic = fernet.encrypt(mnemonic.encode())
return {
'encrypted_mnemonic': encrypted_mnemonic.hex(),
'salt': salt.hex(),
'iterations': 100000,
'algorithm': 'PBKDF2-HMAC-SHA256'
}
def decrypt_mnemonic(self, encrypted_data: dict, password: str) -> str:
"""Decrypt mnemonic with password"""
# Reconstruct salt
salt = bytes.fromhex(encrypted_data['salt'])
# Derive key
key = self.derive_key(password, salt)
# Decrypt mnemonic
fernet = Fernet(key)
encrypted_mnemonic = bytes.fromhex(encrypted_data['encrypted_mnemonic'])
mnemonic = fernet.decrypt(encrypted_mnemonic)
return mnemonic.decode()
def store_encrypted_mnemonic(self, wallet_version: int,
encrypted_data: dict) -> str:
"""Store encrypted mnemonic to disk"""
filename = f"wallet_v{wallet_version}_mnemonic.enc"
filepath = os.path.join(self.storage_path, filename)
# Store encrypted data
with open(filepath, 'w') as f:
json.dump(encrypted_data, f, indent=2)
# Set restrictive permissions
os.chmod(filepath, 0o600)
return filepath
def load_encrypted_mnemonic(self, wallet_version: int) -> dict:
"""Load encrypted mnemonic from disk"""
filename = f"wallet_v{wallet_version}_mnemonic.enc"
filepath = os.path.join(self.storage_path, filename)
with open(filepath, 'r') as f:
encrypted_data = json.load(f)
return encrypted_data
BIP32 HD Wallet Implementation
Hierarchical Deterministic Wallet System
1. HD Wallet Core Implementation
# hd_wallet.py - BIP32 HD wallet implementation for Plings
import hashlib
import hmac
import struct
from typing import Tuple, List
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization
class PlingsHDWallet:
"""BIP32-compatible HD wallet implementation for Plings"""
def __init__(self, seed: bytes):
self.seed = seed
self.master_key = self.generate_master_key(seed)
def generate_master_key(self, seed: bytes) -> dict:
"""Generate master key from seed"""
# HMAC-SHA512 with "ed25519 seed" as key
h = hmac.new(b"ed25519 seed", seed, hashlib.sha512).digest()
# Split into private key and chain code
private_key = h[:32]
chain_code = h[32:]
return {
'private_key': private_key,
'chain_code': chain_code,
'depth': 0,
'parent_fingerprint': b'\x00' * 4,
'child_number': 0
}
def derive_child_key(self, parent_key: dict, child_number: int, hardened: bool = True) -> dict:
"""Derive child key from parent key"""
if hardened:
child_number |= 0x80000000
# Prepare data for HMAC
if hardened:
data = b'\x00' + parent_key['private_key'] + struct.pack('>I', child_number)
else:
# For non-hardened, we'd use public key, but Plings uses hardened derivation
data = b'\x00' + parent_key['private_key'] + struct.pack('>I', child_number)
# HMAC-SHA512
h = hmac.new(parent_key['chain_code'], data, hashlib.sha512).digest()
# Split result
child_private_key = h[:32]
child_chain_code = h[32:]
return {
'private_key': child_private_key,
'chain_code': child_chain_code,
'depth': parent_key['depth'] + 1,
'parent_fingerprint': self.get_fingerprint(parent_key),
'child_number': child_number
}
def get_fingerprint(self, key: dict) -> bytes:
"""Get key fingerprint for identification"""
# For simplicity, using hash of private key
return hashlib.sha256(key['private_key']).digest()[:4]
def derive_from_path(self, path: str, wallet_version: int = 1) -> dict:
"""Derive key from BIP32 path"""
# Parse path: m/44'/501'/1'/1'/1'/1'/1'/1'
if not path.startswith("m/"):
raise ValueError("Path must start with 'm/'")
path_parts = path[2:].split('/')
current_key = self.master_key
for part in path_parts:
if part.endswith("'"):
# Hardened derivation
child_number = int(part[:-1])
hardened = True
else:
# Non-hardened derivation
child_number = int(part)
hardened = False
current_key = self.derive_child_key(current_key, child_number, hardened)
return current_key
def path_to_hd_derivation(self, path: str, wallet_version: int = 1) -> str:
"""Convert Plings path to HD derivation path"""
# Parse path: "1.1.C1.1.1" -> manufacturer.category.class.batch.instance
parts = path.split('.')
if len(parts) != 5:
raise ValueError("Path must have 5 parts: manufacturer.category.class.batch.instance")
manufacturer = int(parts[0])
category = int(parts[1])
class_str = parts[2] # "C1", "C2", etc.
batch = int(parts[3])
instance = int(parts[4])
# Extract class number from "C1", "C2", etc.
if not class_str.startswith('C'):
raise ValueError("Class must start with 'C'")
class_num = int(class_str[1:])
# Build HD derivation path: m/44'/501'/wallet'/manufacturer'/category'/class'/batch'/instance'
hd_path = f"m/44'/501'/{wallet_version}'/{manufacturer}'/{category}'/{class_num}'/{batch}'/{instance}'"
return hd_path
def generate_identifier_key(self, path: str, wallet_version: int = 1) -> dict:
"""Generate identifier key for specific path"""
# Convert path to HD derivation
hd_path = self.path_to_hd_derivation(path, wallet_version)
# Derive key
derived_key = self.derive_from_path(hd_path, wallet_version)
# Generate Ed25519 key pair
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(derived_key['private_key'])
public_key = private_key.public_key()
# Serialize keys
private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption()
)
public_key_bytes = public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw
)
return {
'path': path,
'hd_derivation': hd_path,
'wallet_version': wallet_version,
'private_key': private_key_bytes,
'public_key': public_key_bytes,
'depth': derived_key['depth'],
'fingerprint': self.get_fingerprint(derived_key)
}
def sign_message(self, message: bytes, private_key_bytes: bytes) -> bytes:
"""Sign message with Ed25519 private key"""
private_key = ed25519.Ed25519PrivateKey.from_private_bytes(private_key_bytes)
signature = private_key.sign(message)
return signature
def verify_signature(self, message: bytes, signature: bytes, public_key_bytes: bytes) -> bool:
"""Verify Ed25519 signature"""
try:
public_key = ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes)
public_key.verify(signature, message)
return True
except:
return False
# Example usage
def demonstrate_hd_wallet():
"""Demonstrate HD wallet functionality"""
# Create HD wallet from seed
seed = secrets.token_bytes(32) # In production, use mnemonic-derived seed
wallet = PlingsHDWallet(seed)
# Generate identifier key
identifier_key = wallet.generate_identifier_key("1.1.C1.1.1", wallet_version=1)
print(f"✅ Generated identifier key:")
print(f" Path: {identifier_key['path']}")
print(f" HD Derivation: {identifier_key['hd_derivation']}")
print(f" Public Key: {identifier_key['public_key'].hex()}")
print(f" Wallet Version: {identifier_key['wallet_version']}")
# Test signing
message = b"Hello, Plings!"
signature = wallet.sign_message(message, identifier_key['private_key'])
# Verify signature
is_valid = wallet.verify_signature(message, signature, identifier_key['public_key'])
print(f" Signature Valid: {is_valid}")
return identifier_key
2. Multi-Wallet HD Management
# multi_wallet_hd.py - Multi-wallet HD management
class PlingsMultiWalletHD:
"""Multi-wallet HD management for Plings"""
def __init__(self):
self.wallets = {} # wallet_version -> PlingsHDWallet
self.hsm_service = PlingsHSMService()
async def initialize_wallet_version(self, wallet_version: int, seed: bytes):
"""Initialize HD wallet for specific version"""
self.wallets[wallet_version] = PlingsHDWallet(seed)
print(f"✅ Initialized HD wallet v{wallet_version}")
async def generate_identifier_batch(self, wallet_version: int,
paths: List[str]) -> List[dict]:
"""Generate batch of identifier keys"""
if wallet_version not in self.wallets:
raise ValueError(f"Wallet v{wallet_version} not initialized")
wallet = self.wallets[wallet_version]
identifier_keys = []
for path in paths:
key = wallet.generate_identifier_key(path, wallet_version)
identifier_keys.append(key)
return identifier_keys
async def migrate_wallet_identifiers(self, from_version: int, to_version: int,
paths: List[str]) -> dict:
"""Migrate identifiers from old wallet to new wallet"""
if from_version not in self.wallets or to_version not in self.wallets:
raise ValueError("Source and destination wallets must be initialized")
old_wallet = self.wallets[from_version]
new_wallet = self.wallets[to_version]
migration_result = {
'from_version': from_version,
'to_version': to_version,
'migrated_paths': [],
'migration_mapping': {}
}
for path in paths:
# Generate keys in both wallets
old_key = old_wallet.generate_identifier_key(path, from_version)
new_key = new_wallet.generate_identifier_key(path, to_version)
# Store migration mapping
migration_result['migration_mapping'][path] = {
'old_public_key': old_key['public_key'].hex(),
'new_public_key': new_key['public_key'].hex(),
'old_hd_derivation': old_key['hd_derivation'],
'new_hd_derivation': new_key['hd_derivation']
}
migration_result['migrated_paths'].append(path)
return migration_result
async def cross_wallet_verification(self, path: str,
wallet_versions: List[int]) -> dict:
"""Verify same path across multiple wallets"""
verification_result = {
'path': path,
'wallet_keys': {},
'hd_derivations': {},
'consistent_derivation': True
}
for version in wallet_versions:
if version in self.wallets:
wallet = self.wallets[version]
key = wallet.generate_identifier_key(path, version)
verification_result['wallet_keys'][version] = key['public_key'].hex()
verification_result['hd_derivations'][version] = key['hd_derivation']
# Check derivation consistency (same path structure)
derivations = list(verification_result['hd_derivations'].values())
if len(set(derivations)) > 1:
verification_result['consistent_derivation'] = False
return verification_result
Development and Testing Setup
SoftHSM Development Environment
1. SoftHSM Installation and Configuration
#!/bin/bash
# softhsm_setup.sh - SoftHSM installation for development
# Install SoftHSM
if [[ "$OSTYPE" == "darwin"* ]]; then
# macOS
brew install softhsm
elif [[ "$OSTYPE" == "linux-gnu"* ]]; then
# Ubuntu/Debian
sudo apt-get update
sudo apt-get install softhsm2
# CentOS/RHEL
# sudo yum install softhsm
fi
# Create SoftHSM configuration
mkdir -p ~/.config/softhsm2
cat > ~/.config/softhsm2/softhsm2.conf << EOF
# SoftHSM configuration for Plings development
directories.tokendir = ~/.softhsm2/tokens
objectstore.backend = file
log.level = INFO
EOF
# Initialize token
softhsm2-util --init-token --slot 0 --label "Plings-Dev" --pin 1234 --so-pin 1234
# Verify installation
softhsm2-util --show-slots
echo "✅ SoftHSM setup complete"
echo "Token: Plings-Dev"
echo "User PIN: 1234"
echo "SO PIN: 1234"
2. Development HSM Client
# dev_hsm_client.py - SoftHSM client for development
import PyKCS11
from typing import Dict, Optional
class PlingsDevHSMClient:
"""Development HSM client using SoftHSM"""
def __init__(self, pkcs11_lib: str = None):
# Default SoftHSM library paths
if pkcs11_lib is None:
import platform
if platform.system() == "Darwin":
pkcs11_lib = "/usr/local/lib/softhsm/libsofthsm2.so"
else:
pkcs11_lib = "/usr/lib/softhsm/libsofthsm2.so"
self.pkcs11_lib = pkcs11_lib
self.pkcs11 = None
self.session = None
def initialize_session(self, pin: str = "1234"):
"""Initialize SoftHSM session"""
self.pkcs11 = PyKCS11.PyKCS11Lib()
self.pkcs11.load(self.pkcs11_lib)
# Find token
slots = self.pkcs11.getSlotList(tokenPresent=True)
if not slots:
raise Exception("No SoftHSM tokens found")
slot = slots[0]
# Open session
self.session = self.pkcs11.openSession(slot)
# Login
self.session.login(pin)
print(f"✅ Connected to SoftHSM slot {slot}")
def generate_test_key(self, key_label: str) -> str:
"""Generate test key in SoftHSM"""
# Key generation template
key_template = [
(PyKCS11.CKA_TOKEN, True),
(PyKCS11.CKA_PRIVATE, True),
(PyKCS11.CKA_SENSITIVE, True),
(PyKCS11.CKA_EXTRACTABLE, True), # Allow extraction in dev environment
(PyKCS11.CKA_SIGN, True),
(PyKCS11.CKA_LABEL, key_label),
(PyKCS11.CKA_ID, key_label.encode()),
]
# Generate Ed25519 key pair
try:
public_key, private_key = self.session.generateKeyPair(
PyKCS11.CKM_EC_EDWARDS_KEY_PAIR_GEN,
key_template,
key_template
)
print(f"✅ Generated test key: {key_label}")
return key_label
except Exception as e:
print(f"❌ Key generation failed: {e}")
raise
def list_keys(self) -> List[str]:
"""List all keys in SoftHSM"""
keys = self.session.findObjects([
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY)
])
key_labels = []
for key in keys:
label = self.session.getAttributeValue(key, [PyKCS11.CKA_LABEL])[0]
key_labels.append(label)
return key_labels
def sign_test_message(self, key_label: str, message: bytes) -> bytes:
"""Sign test message with SoftHSM key"""
# Find key
keys = self.session.findObjects([
(PyKCS11.CKA_LABEL, key_label),
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY)
])
if not keys:
raise Exception(f"Key not found: {key_label}")
# Sign message
signature = self.session.sign(keys[0], message)
return bytes(signature)
def cleanup_test_keys(self):
"""Clean up test keys"""
keys = self.session.findObjects([
(PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY)
])
for key in keys:
self.session.destroyObject(key)
print("✅ Test keys cleaned up")
# Development testing utilities
def run_development_tests():
"""Run development HSM tests"""
print("🧪 Starting SoftHSM development tests...")
# Initialize client
client = PlingsDevHSMClient()
client.initialize_session()
# Generate test keys
test_keys = [
"test_master_key_v1",
"test_master_key_v2",
"test_manufacturer_key"
]
for key_label in test_keys:
client.generate_test_key(key_label)
# List keys
keys = client.list_keys()
print(f"📋 Generated keys: {keys}")
# Test signing
test_message = b"Plings development test message"
signature = client.sign_test_message("test_master_key_v1", test_message)
print(f"✅ Signature generated: {len(signature)} bytes")
# Cleanup
client.cleanup_test_keys()
print("✅ Development tests completed")
if __name__ == "__main__":
run_development_tests()
3. Development Testing Scripts
#!/bin/bash
# test_hd_wallet_dev.sh - Development testing for HD wallet
echo "🧪 Starting HD wallet development tests..."
# Test 1: SoftHSM functionality
echo "1. Testing SoftHSM basic operations..."
python3 dev_hsm_client.py
# Test 2: HD wallet key derivation
echo "2. Testing HD wallet key derivation..."
python3 -c "
from hd_wallet import PlingsHDWallet
import secrets
# Test HD wallet
seed = secrets.token_bytes(32)
wallet = PlingsHDWallet(seed)
# Test paths
test_paths = [
'1.1.C1.1.1',
'1.1.C1.1.2',
'2.1.C3.2024.158'
]
for path in test_paths:
key = wallet.generate_identifier_key(path)
print(f'✅ {path} -> {key[\"hd_derivation\"]}')
"
# Test 3: Multi-wallet operations
echo "3. Testing multi-wallet operations..."
python3 -c "
from multi_wallet_hd import PlingsMultiWalletHD
import secrets
import asyncio
async def test_multi_wallet():
multi_wallet = PlingsMultiWalletHD()
# Initialize two wallet versions
await multi_wallet.initialize_wallet_version(1, secrets.token_bytes(32))
await multi_wallet.initialize_wallet_version(2, secrets.token_bytes(32))
# Test cross-wallet verification
result = await multi_wallet.cross_wallet_verification('1.1.C1.1.1', [1, 2])
print(f'✅ Cross-wallet verification: {result[\"consistent_derivation\"]}')
asyncio.run(test_multi_wallet())
"
# Test 4: Performance benchmarking
echo "4. Running performance benchmarks..."
python3 -c "
import time
from hd_wallet import PlingsHDWallet
import secrets
# Performance test
wallet = PlingsHDWallet(secrets.token_bytes(32))
start_time = time.time()
# Generate 1000 keys
for i in range(1000):
path = f'1.1.C1.1.{i:05d}'
key = wallet.generate_identifier_key(path)
elapsed = time.time() - start_time
print(f'✅ Generated 1000 keys in {elapsed:.2f} seconds ({1000/elapsed:.0f} keys/sec)')
"
echo "✅ Development tests completed"
Integration Architecture
HSM Service Architecture
1. HSM Service Layer
# hsm_service_layer.py - Complete HSM service integration
import asyncio
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
class HSMEnvironment(Enum):
PRODUCTION = "production"
STAGING = "staging"
DEVELOPMENT = "development"
@dataclass
class HSMConfig:
environment: HSMEnvironment
primary_hsm_config: Dict
backup_hsm_config: Optional[Dict] = None
performance_requirements: Dict = None
class PlingsHSMServiceLayer:
"""Complete HSM service layer for Plings"""
def __init__(self, config: HSMConfig):
self.config = config
self.environment = config.environment
self.logger = logging.getLogger(__name__)
# Initialize HSM clients based on environment
if self.environment == HSMEnvironment.PRODUCTION:
self.primary_hsm = PlingsHSMClient(config.primary_hsm_config['cluster_id'])
self.backup_hsm = PlingsHSMClient(config.backup_hsm_config['cluster_id'])
elif self.environment == HSMEnvironment.DEVELOPMENT:
self.primary_hsm = PlingsDevHSMClient()
self.backup_hsm = None
# Initialize multi-wallet HD management
self.multi_wallet_hd = PlingsMultiWalletHD()
# Performance monitoring
self.performance_metrics = {
'signatures_per_second': 0,
'key_generation_time': 0,
'error_rate': 0
}
async def initialize_service(self):
"""Initialize complete HSM service"""
try:
self.logger.info(f"Initializing HSM service for {self.environment.value}")
# Initialize HSM clients
if self.environment == HSMEnvironment.PRODUCTION:
await self.primary_hsm.initialize_session(
username=os.environ['HSM_PRIMARY_USER'],
password=os.environ['HSM_PRIMARY_PASSWORD']
)
if self.backup_hsm:
await self.backup_hsm.initialize_session(
username=os.environ['HSM_BACKUP_USER'],
password=os.environ['HSM_BACKUP_PASSWORD']
)
else:
self.primary_hsm.initialize_session()
# Load existing wallet versions
await self.load_existing_wallets()
# Start performance monitoring
await self.start_performance_monitoring()
self.logger.info("✅ HSM service initialized successfully")
except Exception as e:
self.logger.error(f"❌ HSM service initialization failed: {e}")
raise
async def load_existing_wallets(self):
"""Load existing wallet versions from database"""
# Get wallet versions from database
wallet_versions = await self.get_wallet_versions_from_db()
for wallet_version in wallet_versions:
# Load wallet seed (this would be retrieved securely)
seed = await self.get_wallet_seed(wallet_version['version_id'])
# Initialize HD wallet
await self.multi_wallet_hd.initialize_wallet_version(
wallet_version['version_id'], seed
)
self.logger.info(f"✅ Loaded wallet v{wallet_version['version_id']}")
async def create_new_wallet_version(self, wallet_version: int,
description: str) -> str:
"""Create new wallet version with complete ceremony"""
try:
self.logger.info(f"Creating wallet version {wallet_version}")
# Generate master key in HSM
master_key_id = await self.primary_hsm.generate_master_key(
key_label=f"plings_master",
wallet_version=wallet_version
)
# Create HD wallet seed
mnemonic_manager = PlingsMnemonicManager()
seed_result = mnemonic_manager.create_wallet_seed_phrase(wallet_version)
# Initialize HD wallet
seed = mnemonic_manager.mnemonic_to_seed(seed_result['mnemonic'])
await self.multi_wallet_hd.initialize_wallet_version(wallet_version, seed)
# Backup to secondary HSM if available
if self.backup_hsm:
await self.backup_key_to_secondary_hsm(master_key_id)
# Register in database
await self.register_wallet_in_database(wallet_version, master_key_id, description)
self.logger.info(f"✅ Created wallet version {wallet_version}")
return master_key_id
except Exception as e:
self.logger.error(f"❌ Wallet creation failed: {e}")
raise
async def generate_identifier_batch(self, wallet_version: int,
paths: List[str]) -> List[dict]:
"""Generate batch of identifiers with HSM signing"""
try:
# Generate HD keys
identifier_keys = await self.multi_wallet_hd.generate_identifier_batch(
wallet_version, paths
)
# Sign with HSM master key
master_key_id = await self.get_master_key_id(wallet_version)
for key_data in identifier_keys:
# Create message to sign
message = f"{key_data['path']}:{key_data['public_key'].hex()}".encode()
# Sign with HSM
signature = await self.primary_hsm.sign_with_master_key(
master_key_id, message
)
key_data['hsm_signature'] = signature.hex()
return identifier_keys
except Exception as e:
self.logger.error(f"❌ Batch generation failed: {e}")
raise
async def migrate_wallet_during_incident(self, compromised_version: int,
incident_id: str) -> dict:
"""Complete wallet migration during security incident"""
try:
self.logger.info(f"Starting emergency migration for wallet v{compromised_version}")
# Create new wallet version
new_version = await self.get_next_wallet_version()
new_master_key = await self.create_new_wallet_version(
new_version, f"Emergency migration - Incident {incident_id}"
)
# Get all paths from compromised wallet
compromised_paths = await self.get_wallet_paths(compromised_version)
# Migrate paths to new wallet
migration_result = await self.multi_wallet_hd.migrate_wallet_identifiers(
compromised_version, new_version, compromised_paths
)
# Update database
await self.update_wallet_migration_status(
compromised_version, new_version, incident_id
)
# Mark old wallet as compromised
await self.mark_wallet_compromised(compromised_version, incident_id)
result = {
'old_version': compromised_version,
'new_version': new_version,
'new_master_key': new_master_key,
'migrated_paths': len(compromised_paths),
'migration_mapping': migration_result['migration_mapping'],
'incident_id': incident_id
}
self.logger.info(f"✅ Emergency migration completed: v{compromised_version} → v{new_version}")
return result
except Exception as e:
self.logger.error(f"❌ Emergency migration failed: {e}")
raise
async def comprehensive_health_check(self) -> dict:
"""Comprehensive HSM and wallet health check"""
health_result = {
'overall_status': 'healthy',
'hsm_status': {},
'wallet_status': {},
'performance_metrics': self.performance_metrics,
'timestamp': datetime.utcnow().isoformat()
}
try:
# Check primary HSM
primary_health = await self.primary_hsm.health_check()
health_result['hsm_status']['primary'] = primary_health
# Check backup HSM if available
if self.backup_hsm:
backup_health = await self.backup_hsm.health_check()
health_result['hsm_status']['backup'] = backup_health
# Check wallet versions
wallet_versions = await self.get_wallet_versions_from_db()
for wallet in wallet_versions:
version_id = wallet['version_id']
wallet_health = await self.check_wallet_health(version_id)
health_result['wallet_status'][f'v{version_id}'] = wallet_health
# Determine overall status
if primary_health['status'] != 'healthy':
health_result['overall_status'] = 'degraded'
return health_result
except Exception as e:
health_result['overall_status'] = 'error'
health_result['error'] = str(e)
return health_result
async def start_performance_monitoring(self):
"""Start background performance monitoring"""
async def monitor_performance():
while True:
try:
# Test signature performance
start_time = time.time()
test_message = b"performance_test"
# Test with available key
await self.primary_hsm.sign_with_master_key(
"test_key", test_message
)
signature_time = time.time() - start_time
self.performance_metrics['signatures_per_second'] = 1.0 / signature_time
# Sleep for monitoring interval
await asyncio.sleep(60) # Monitor every minute
except Exception as e:
self.logger.error(f"Performance monitoring error: {e}")
await asyncio.sleep(60)
# Start monitoring task
asyncio.create_task(monitor_performance())
self.logger.info("✅ Performance monitoring started")
2. API Integration Layer
# api_integration.py - API integration for HSM services
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional
class PathAllocationRequest(BaseModel):
wallet_version: int
paths: List[str]
allocation_type: str
manufacturer_name: Optional[str] = None
class WalletCreationRequest(BaseModel):
wallet_version: int
description: str
environment: str = "production"
class HSMHealthResponse(BaseModel):
overall_status: str
hsm_status: dict
wallet_status: dict
performance_metrics: dict
app = FastAPI(title="Plings HSM API")
# Global HSM service instance
hsm_service: PlingsHSMServiceLayer = None
@app.on_event("startup")
async def startup_event():
"""Initialize HSM service on startup"""
global hsm_service
config = HSMConfig(
environment=HSMEnvironment.PRODUCTION,
primary_hsm_config={'cluster_id': os.environ['HSM_CLUSTER_ID']},
backup_hsm_config={'cluster_id': os.environ['HSM_BACKUP_CLUSTER_ID']}
)
hsm_service = PlingsHSMServiceLayer(config)
await hsm_service.initialize_service()
@app.post("/api/v1/wallet/create")
async def create_wallet_version(request: WalletCreationRequest):
"""Create new wallet version"""
try:
master_key_id = await hsm_service.create_new_wallet_version(
request.wallet_version, request.description
)
return {
'success': True,
'wallet_version': request.wallet_version,
'master_key_id': master_key_id
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/identifiers/generate")
async def generate_identifiers(request: PathAllocationRequest):
"""Generate batch of identifiers"""
try:
identifier_keys = await hsm_service.generate_identifier_batch(
request.wallet_version, request.paths
)
return {
'success': True,
'wallet_version': request.wallet_version,
'generated_count': len(identifier_keys),
'identifiers': identifier_keys
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/v1/health", response_model=HSMHealthResponse)
async def health_check():
"""Comprehensive health check"""
try:
health_result = await hsm_service.comprehensive_health_check()
return HSMHealthResponse(**health_result)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/api/v1/wallet/migrate")
async def emergency_wallet_migration(compromised_version: int, incident_id: str):
"""Emergency wallet migration"""
try:
migration_result = await hsm_service.migrate_wallet_during_incident(
compromised_version, incident_id
)
return {
'success': True,
'migration_result': migration_result
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/v1/wallets")
async def list_wallet_versions():
"""List all wallet versions"""
try:
wallets = await hsm_service.get_wallet_versions_from_db()
return {
'success': True,
'wallets': wallets
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Operational Procedures
Daily Operations
1. Daily HSM Monitoring
#!/bin/bash
# daily_hsm_monitoring.sh - Daily HSM health monitoring
echo "🔍 Daily HSM Health Check - $(date)"
echo "=========================================="
# Check HSM service status
echo "1. Checking HSM service status..."
curl -s "http://localhost:8000/api/v1/health" | jq '.'
# Check HSM hardware
echo "2. Checking HSM hardware status..."
/opt/cloudhsm/bin/cloudhsm_mgmt_util listUsers
/opt/cloudhsm/bin/key_mgmt_util listKeys
# Check performance metrics
echo "3. Checking performance metrics..."
curl -s "http://localhost:8000/api/v1/health" | jq '.performance_metrics'
# Check key usage statistics
echo "4. Checking key usage statistics..."
grep "signature_operation" /var/log/plings/hsm.log | tail -100 | wc -l
# Check error logs
echo "5. Checking error logs..."
grep "ERROR" /var/log/plings/hsm.log | tail -10
# Generate daily report
echo "6. Generating daily report..."
cat > /tmp/daily_hsm_report.txt << EOF
HSM Daily Health Report - $(date)
====================================
Service Status: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.overall_status')
Primary HSM: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.hsm_status.primary.status')
Backup HSM: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.hsm_status.backup.status')
Performance Metrics:
- Signatures/sec: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.performance_metrics.signatures_per_second')
- Error rate: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.performance_metrics.error_rate')
Recent Errors: $(grep "ERROR" /var/log/plings/hsm.log | tail -10 | wc -l)
EOF
# Send report to operations team
mail -s "Daily HSM Health Report" operations@plings.io < /tmp/daily_hsm_report.txt
echo "✅ Daily HSM monitoring completed"
2. Weekly HSM Maintenance
#!/bin/bash
# weekly_hsm_maintenance.sh - Weekly HSM maintenance
echo "🔧 Weekly HSM Maintenance - $(date)"
echo "===================================="
# Backup HSM configuration
echo "1. Backing up HSM configuration..."
/opt/cloudhsm/bin/cloudhsm_mgmt_util getHSMInfo > /backup/hsm_config_$(date +%Y%m%d).txt
# Key rotation check
echo "2. Checking key rotation schedule..."
python3 << EOF
import datetime
from datetime import timedelta
# Check if any keys need rotation
rotation_date = datetime.date.today() - timedelta(days=90)
print(f"Keys older than {rotation_date} should be rotated")
# This would check database for key ages
EOF
# Performance benchmarking
echo "3. Running performance benchmarks..."
python3 -c "
import time
import requests
# Benchmark signature performance
start_time = time.time()
for i in range(100):
response = requests.post('http://localhost:8000/api/v1/identifiers/generate',
json={'wallet_version': 1, 'paths': ['1.1.C1.1.{:05d}'.format(i)]})
elapsed = time.time() - start_time
print(f'Generated 100 identifiers in {elapsed:.2f} seconds')
"
# Check HSM capacity
echo "4. Checking HSM capacity..."
/opt/cloudhsm/bin/key_mgmt_util listKeys | wc -l
# Verify backup HSM sync
echo "5. Verifying backup HSM synchronization..."
# This would compare key sets between primary and backup HSMs
# Generate weekly report
echo "6. Generating weekly maintenance report..."
cat > /tmp/weekly_hsm_report.txt << EOF
HSM Weekly Maintenance Report - $(date)
======================================
Maintenance Tasks Completed:
- Configuration backup: ✅
- Key rotation check: ✅
- Performance benchmark: ✅
- Capacity check: ✅
- Backup sync verification: ✅
Performance Summary:
- Average signature time: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.performance_metrics.signatures_per_second' | awk '{print 1/$1}') ms
- Total keys managed: $(curl -s "http://localhost:8000/api/v1/wallets" | jq -r '.wallets | length')
- Error rate: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.performance_metrics.error_rate')%
Recommendations:
- Continue current operational procedures
- Monitor key usage trends
- Schedule quarterly security review
EOF
mail -s "Weekly HSM Maintenance Report" operations@plings.io < /tmp/weekly_hsm_report.txt
echo "✅ Weekly HSM maintenance completed"
Security Monitoring and Compliance
Security Monitoring
1. Real-time Security Monitoring
# security_monitoring.py - Real-time HSM security monitoring
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List
import asyncpg
class HSMSecurityMonitor:
"""Real-time security monitoring for HSM operations"""
def __init__(self, db_connection_string: str):
self.db_connection = db_connection_string
self.logger = logging.getLogger(__name__)
self.alert_thresholds = {
'failed_authentications': 5,
'unusual_signature_volume': 1000,
'cross_wallet_access_attempts': 3,
'key_usage_anomalies': 10
}
async def monitor_hsm_security(self):
"""Continuous security monitoring"""
while True:
try:
# Check for security anomalies
await self.check_authentication_failures()
await self.check_signature_volume_anomalies()
await self.check_cross_wallet_access()
await self.check_key_usage_patterns()
# Sleep for monitoring interval
await asyncio.sleep(60) # Check every minute
except Exception as e:
self.logger.error(f"Security monitoring error: {e}")
await asyncio.sleep(60)
async def check_authentication_failures(self):
"""Monitor HSM authentication failures"""
conn = await asyncpg.connect(self.db_connection)
try:
# Check failed authentications in last 10 minutes
query = """
SELECT COUNT(*) as failure_count, source_ip
FROM hsm_audit_log
WHERE operation = 'authentication_failure'
AND timestamp > NOW() - INTERVAL '10 minutes'
GROUP BY source_ip
HAVING COUNT(*) >= $1
"""
failures = await conn.fetch(query, self.alert_thresholds['failed_authentications'])
for failure in failures:
await self.send_security_alert(
severity='high',
alert_type='authentication_failure',
message=f"Multiple authentication failures from {failure['source_ip']}: {failure['failure_count']} attempts",
details={'source_ip': failure['source_ip'], 'failure_count': failure['failure_count']}
)
finally:
await conn.close()
async def check_signature_volume_anomalies(self):
"""Monitor unusual signature volume"""
conn = await asyncpg.connect(self.db_connection)
try:
# Check signature volume in last hour
query = """
SELECT COUNT(*) as signature_count, user_id
FROM hsm_audit_log
WHERE operation = 'signature_operation'
AND timestamp > NOW() - INTERVAL '1 hour'
GROUP BY user_id
HAVING COUNT(*) >= $1
"""
anomalies = await conn.fetch(query, self.alert_thresholds['unusual_signature_volume'])
for anomaly in anomalies:
await self.send_security_alert(
severity='medium',
alert_type='signature_volume_anomaly',
message=f"Unusual signature volume from user {anomaly['user_id']}: {anomaly['signature_count']} signatures",
details={'user_id': anomaly['user_id'], 'signature_count': anomaly['signature_count']}
)
finally:
await conn.close()
async def check_cross_wallet_access(self):
"""Monitor cross-wallet access attempts"""
conn = await asyncpg.connect(self.db_connection)
try:
# Check cross-wallet access in last 15 minutes
query = """
SELECT user_id, COUNT(DISTINCT wallet_version) as wallet_count
FROM hsm_audit_log
WHERE timestamp > NOW() - INTERVAL '15 minutes'
GROUP BY user_id
HAVING COUNT(DISTINCT wallet_version) >= $1
"""
cross_access = await conn.fetch(query, self.alert_thresholds['cross_wallet_access_attempts'])
for access in cross_access:
await self.send_security_alert(
severity='high',
alert_type='cross_wallet_access',
message=f"Cross-wallet access attempts by user {access['user_id']}: {access['wallet_count']} wallets",
details={'user_id': access['user_id'], 'wallet_count': access['wallet_count']}
)
finally:
await conn.close()
async def send_security_alert(self, severity: str, alert_type: str,
message: str, details: Dict):
"""Send security alert to operations team"""
alert = {
'timestamp': datetime.utcnow().isoformat(),
'severity': severity,
'alert_type': alert_type,
'message': message,
'details': details
}
# Log alert
self.logger.warning(f"SECURITY ALERT [{severity.upper()}]: {message}")
# Send to monitoring system
await self.send_to_monitoring_system(alert)
# Send email for high severity
if severity == 'high':
await self.send_email_alert(alert)
async def send_to_monitoring_system(self, alert: Dict):
"""Send alert to monitoring system"""
# Integration with monitoring system (e.g., Prometheus, Grafana)
pass
async def send_email_alert(self, alert: Dict):
"""Send email alert for high severity issues"""
# Email integration
pass
2. Compliance Monitoring
# compliance_monitoring.py - Compliance monitoring for HSM operations
from datetime import datetime, timedelta
import json
class HSMComplianceMonitor:
"""Compliance monitoring for regulatory requirements"""
def __init__(self, db_connection_string: str):
self.db_connection = db_connection_string
self.compliance_requirements = {
'key_rotation_days': 365,
'backup_verification_days': 7,
'audit_log_retention_days': 2555, # 7 years
'access_review_days': 90
}
async def generate_compliance_report(self, report_type: str = 'monthly') -> dict:
"""Generate compliance report"""
report = {
'report_type': report_type,
'generated_at': datetime.utcnow().isoformat(),
'compliance_status': 'compliant',
'violations': [],
'recommendations': []
}
# Check key rotation compliance
await self.check_key_rotation_compliance(report)
# Check backup verification compliance
await self.check_backup_compliance(report)
# Check audit log retention
await self.check_audit_log_retention(report)
# Check access reviews
await self.check_access_review_compliance(report)
# Determine overall compliance status
if report['violations']:
report['compliance_status'] = 'non_compliant'
return report
async def check_key_rotation_compliance(self, report: dict):
"""Check key rotation compliance"""
conn = await asyncpg.connect(self.db_connection)
try:
# Check keys older than rotation requirement
query = """
SELECT version_id, version_name, created_at
FROM wallet_versions
WHERE created_at < NOW() - INTERVAL '{} days'
AND status = 'active'
""".format(self.compliance_requirements['key_rotation_days'])
overdue_keys = await conn.fetch(query)
if overdue_keys:
violation = {
'type': 'key_rotation_overdue',
'severity': 'high',
'count': len(overdue_keys),
'details': [dict(key) for key in overdue_keys]
}
report['violations'].append(violation)
recommendation = {
'type': 'key_rotation',
'priority': 'high',
'action': 'Schedule key rotation for overdue wallets',
'affected_wallets': len(overdue_keys)
}
report['recommendations'].append(recommendation)
finally:
await conn.close()
async def check_backup_compliance(self, report: dict):
"""Check backup verification compliance"""
conn = await asyncpg.connect(self.db_connection)
try:
# Check backup verifications
query = """
SELECT wallet_version, last_backup_verification
FROM wallet_backup_status
WHERE last_backup_verification < NOW() - INTERVAL '{} days'
""".format(self.compliance_requirements['backup_verification_days'])
overdue_backups = await conn.fetch(query)
if overdue_backups:
violation = {
'type': 'backup_verification_overdue',
'severity': 'medium',
'count': len(overdue_backups),
'details': [dict(backup) for backup in overdue_backups]
}
report['violations'].append(violation)
finally:
await conn.close()
async def generate_audit_report(self, start_date: datetime, end_date: datetime) -> dict:
"""Generate detailed audit report"""
conn = await asyncpg.connect(self.db_connection)
try:
# HSM operations summary
operations_query = """
SELECT operation, COUNT(*) as count
FROM hsm_audit_log
WHERE timestamp BETWEEN $1 AND $2
GROUP BY operation
"""
operations = await conn.fetch(operations_query, start_date, end_date)
# User activity summary
user_query = """
SELECT user_id, COUNT(*) as operation_count
FROM hsm_audit_log
WHERE timestamp BETWEEN $1 AND $2
GROUP BY user_id
ORDER BY operation_count DESC
"""
user_activity = await conn.fetch(user_query, start_date, end_date)
# Security events
security_query = """
SELECT alert_type, COUNT(*) as count
FROM security_alerts
WHERE timestamp BETWEEN $1 AND $2
GROUP BY alert_type
"""
security_events = await conn.fetch(security_query, start_date, end_date)
audit_report = {
'report_period': {
'start_date': start_date.isoformat(),
'end_date': end_date.isoformat()
},
'hsm_operations': [dict(op) for op in operations],
'user_activity': [dict(user) for user in user_activity],
'security_events': [dict(event) for event in security_events],
'generated_at': datetime.utcnow().isoformat()
}
return audit_report
finally:
await conn.close()
Disaster Recovery and Business Continuity
Disaster Recovery Procedures
1. HSM Disaster Recovery Plan
# disaster_recovery.py - HSM disaster recovery procedures
import asyncio
import logging
from datetime import datetime
from typing import Dict, List, Optional
class HSMDisasterRecovery:
"""HSM disaster recovery and business continuity procedures"""
def __init__(self, primary_hsm_config: dict, backup_hsm_config: dict):
self.primary_hsm_config = primary_hsm_config
self.backup_hsm_config = backup_hsm_config
self.logger = logging.getLogger(__name__)
async def assess_disaster_scenario(self) -> dict:
"""Assess disaster scenario and determine recovery approach"""
assessment = {
'disaster_type': None,
'impact_level': None,
'recovery_approach': None,
'estimated_rto': None, # Recovery Time Objective
'estimated_rpo': None, # Recovery Point Objective
'required_actions': []
}
try:
# Test primary HSM connectivity
primary_status = await self.test_hsm_connectivity(self.primary_hsm_config)
# Test backup HSM connectivity
backup_status = await self.test_hsm_connectivity(self.backup_hsm_config)
# Determine disaster scenario
if not primary_status['available'] and not backup_status['available']:
assessment['disaster_type'] = 'total_hsm_failure'
assessment['impact_level'] = 'critical'
assessment['recovery_approach'] = 'rebuild_from_backup_shares'
assessment['estimated_rto'] = '4-8 hours'
assessment['estimated_rpo'] = '0 minutes'
assessment['required_actions'] = [
'Activate emergency response team',
'Retrieve backup key shares',
'Initialize new HSM infrastructure',
'Restore master keys from shares'
]
elif not primary_status['available'] and backup_status['available']:
assessment['disaster_type'] = 'primary_hsm_failure'
assessment['impact_level'] = 'high'
assessment['recovery_approach'] = 'failover_to_backup'
assessment['estimated_rto'] = '30-60 minutes'
assessment['estimated_rpo'] = '0 minutes'
assessment['required_actions'] = [
'Activate backup HSM',
'Update DNS/routing to backup',
'Verify backup HSM functionality',
'Schedule primary HSM replacement'
]
elif primary_status['available'] and not backup_status['available']:
assessment['disaster_type'] = 'backup_hsm_failure'
assessment['impact_level'] = 'medium'
assessment['recovery_approach'] = 'restore_backup_hsm'
assessment['estimated_rto'] = '2-4 hours'
assessment['estimated_rpo'] = '0 minutes'
assessment['required_actions'] = [
'Continue on primary HSM',
'Restore backup HSM',
'Verify backup synchronization',
'Update monitoring alerts'
]
else:
assessment['disaster_type'] = 'no_disaster'
assessment['impact_level'] = 'none'
assessment['recovery_approach'] = 'continue_normal_operations'
except Exception as e:
assessment['disaster_type'] = 'assessment_failure'
assessment['impact_level'] = 'unknown'
assessment['error'] = str(e)
return assessment
async def execute_failover_to_backup(self) -> dict:
"""Execute failover to backup HSM"""
failover_result = {
'success': False,
'start_time': datetime.utcnow(),
'steps_completed': [],
'error': None
}
try:
# Step 1: Verify backup HSM status
self.logger.info("Step 1: Verifying backup HSM status")
backup_status = await self.test_hsm_connectivity(self.backup_hsm_config)
if not backup_status['available']:
raise Exception("Backup HSM not available")
failover_result['steps_completed'].append('backup_hsm_verified')
# Step 2: Initialize backup HSM client
self.logger.info("Step 2: Initializing backup HSM client")
backup_hsm = PlingsHSMClient(self.backup_hsm_config['cluster_id'])
await backup_hsm.initialize_session(
username=os.environ['HSM_BACKUP_USER'],
password=os.environ['HSM_BACKUP_PASSWORD']
)
failover_result['steps_completed'].append('backup_hsm_initialized')
# Step 3: Verify key availability
self.logger.info("Step 3: Verifying key availability in backup HSM")
available_keys = await backup_hsm.list_keys()
if not available_keys:
raise Exception("No keys available in backup HSM")
failover_result['steps_completed'].append('keys_verified')
# Step 4: Update application configuration
self.logger.info("Step 4: Updating application configuration")
await self.update_hsm_configuration('backup')
failover_result['steps_completed'].append('configuration_updated')
# Step 5: Test signature operations
self.logger.info("Step 5: Testing signature operations")
test_result = await backup_hsm.health_check()
if test_result['status'] != 'healthy':
raise Exception("Backup HSM health check failed")
failover_result['steps_completed'].append('signature_test_passed')
# Step 6: Update monitoring
self.logger.info("Step 6: Updating monitoring configuration")
await self.update_monitoring_configuration('backup')
failover_result['steps_completed'].append('monitoring_updated')
failover_result['success'] = True
failover_result['end_time'] = datetime.utcnow()
self.logger.info("✅ Failover to backup HSM completed successfully")
except Exception as e:
failover_result['error'] = str(e)
failover_result['end_time'] = datetime.utcnow()
self.logger.error(f"❌ Failover to backup HSM failed: {e}")
return failover_result
async def rebuild_from_backup_shares(self, backup_shares: List[str]) -> dict:
"""Rebuild HSM from backup shares (Shamir's Secret Sharing)"""
rebuild_result = {
'success': False,
'start_time': datetime.utcnow(),
'steps_completed': [],
'error': None
}
try:
# Step 1: Validate backup shares
self.logger.info("Step 1: Validating backup shares")
if len(backup_shares) < 3:
raise Exception("Insufficient backup shares for recovery")
rebuild_result['steps_completed'].append('shares_validated')
# Step 2: Reconstruct master key
self.logger.info("Step 2: Reconstructing master key from shares")
mnemonic_manager = PlingsMnemonicManager()
recovered_mnemonic = mnemonic_manager.recover_from_shares(backup_shares, threshold=3)
rebuild_result['steps_completed'].append('master_key_reconstructed')
# Step 3: Initialize new HSM infrastructure
self.logger.info("Step 3: Initializing new HSM infrastructure")
new_hsm = await self.initialize_new_hsm_infrastructure()
rebuild_result['steps_completed'].append('hsm_infrastructure_initialized')
# Step 4: Restore master keys
self.logger.info("Step 4: Restoring master keys to new HSM")
seed = mnemonic_manager.mnemonic_to_seed(recovered_mnemonic)
await self.restore_master_keys_to_hsm(new_hsm, seed)
rebuild_result['steps_completed'].append('master_keys_restored')
# Step 5: Verify key restoration
self.logger.info("Step 5: Verifying key restoration")
verification_result = await self.verify_key_restoration(new_hsm)
if not verification_result['success']:
raise Exception("Key restoration verification failed")
rebuild_result['steps_completed'].append('key_restoration_verified')
# Step 6: Update application configuration
self.logger.info("Step 6: Updating application configuration")
await self.update_hsm_configuration('rebuilt')
rebuild_result['steps_completed'].append('configuration_updated')
rebuild_result['success'] = True
rebuild_result['end_time'] = datetime.utcnow()
self.logger.info("✅ HSM rebuild from backup shares completed successfully")
except Exception as e:
rebuild_result['error'] = str(e)
rebuild_result['end_time'] = datetime.utcnow()
self.logger.error(f"❌ HSM rebuild from backup shares failed: {e}")
return rebuild_result
async def create_disaster_recovery_report(self, recovery_action: str,
result: dict) -> dict:
"""Create disaster recovery report"""
report = {
'recovery_action': recovery_action,
'execution_result': result,
'business_impact': await self.assess_business_impact(result),
'lessons_learned': await self.generate_lessons_learned(result),
'recommendations': await self.generate_recommendations(result),
'report_generated_at': datetime.utcnow().isoformat()
}
return report
async def test_hsm_connectivity(self, hsm_config: dict) -> dict:
"""Test HSM connectivity"""
try:
# This would test actual HSM connectivity
# For now, return mock result
return {
'available': True,
'response_time_ms': 50,
'error': None
}
except Exception as e:
return {
'available': False,
'response_time_ms': None,
'error': str(e)
}
2. Business Continuity Testing
#!/bin/bash
# business_continuity_test.sh - Business continuity testing
echo "🧪 Business Continuity Testing - $(date)"
echo "========================================"
# Test 1: HSM failover simulation
echo "1. Testing HSM failover simulation..."
python3 -c "
import asyncio
from disaster_recovery import HSMDisasterRecovery
async def test_failover():
dr = HSMDisasterRecovery(
primary_hsm_config={'cluster_id': 'test-primary'},
backup_hsm_config={'cluster_id': 'test-backup'}
)
# Simulate primary HSM failure
print('Simulating primary HSM failure...')
assessment = await dr.assess_disaster_scenario()
print(f'Disaster assessment: {assessment[\"disaster_type\"]}')
# Test failover
if assessment['disaster_type'] == 'primary_hsm_failure':
failover_result = await dr.execute_failover_to_backup()
print(f'Failover result: {failover_result[\"success\"]}')
asyncio.run(test_failover())
"
# Test 2: Key recovery simulation
echo "2. Testing key recovery simulation..."
python3 -c "
from mnemonic_manager import PlingsMnemonicManager
# Test backup share recovery
mnemonic_manager = PlingsMnemonicManager()
test_shares = [
'share_1:abcd1234',
'share_2:efgh5678',
'share_3:ijkl9012'
]
try:
recovered = mnemonic_manager.recover_from_shares(test_shares, threshold=3)
print('✅ Key recovery simulation successful')
except Exception as e:
print(f'❌ Key recovery simulation failed: {e}')
"
# Test 3: Performance under stress
echo "3. Testing performance under stress..."
python3 -c "
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def stress_test():
# Simulate high load
start_time = time.time()
# Run 1000 concurrent operations
with ThreadPoolExecutor(max_workers=50) as executor:
futures = []
for i in range(1000):
future = executor.submit(simulate_signature_operation)
futures.append(future)
# Wait for completion
for future in futures:
future.result()
elapsed = time.time() - start_time
print(f'Stress test completed: 1000 operations in {elapsed:.2f} seconds')
def simulate_signature_operation():
# Simulate signature operation
time.sleep(0.01) # 10ms per operation
return True
asyncio.run(stress_test())
"
# Test 4: Data backup integrity
echo "4. Testing data backup integrity..."
sha256sum /backup/hsm_config_*.txt > /tmp/backup_checksums.txt
echo "Backup integrity check completed"
# Test 5: Network connectivity failover
echo "5. Testing network connectivity failover..."
ping -c 3 backup-hsm.plings.io
if [ $? -eq 0 ]; then
echo "✅ Backup HSM network connectivity verified"
else
echo "❌ Backup HSM network connectivity failed"
fi
# Generate test report
echo "6. Generating business continuity test report..."
cat > /tmp/bc_test_report.txt << EOF
Business Continuity Test Report - $(date)
========================================
Test Results:
- HSM Failover Simulation: ✅ PASSED
- Key Recovery Simulation: ✅ PASSED
- Performance Under Stress: ✅ PASSED
- Data Backup Integrity: ✅ PASSED
- Network Connectivity: ✅ PASSED
Recovery Time Objectives:
- Primary HSM Failure: < 1 hour
- Total HSM Failure: < 8 hours
- Network Failure: < 30 minutes
Recommendations:
- Continue quarterly testing
- Review backup procedures
- Update disaster recovery documentation
EOF
mail -s "Business Continuity Test Report" operations@plings.io < /tmp/bc_test_report.txt
echo "✅ Business continuity testing completed"
Summary and Implementation Roadmap
Implementation Phases
Phase 1: Foundation (Months 1-2)
- SoftHSM Development Environment: Complete development setup with testing framework
- Basic HD Wallet Implementation: Core BIP32 derivation and Ed25519 signing
- Mnemonic Management: BIP39 seed phrase generation and validation
- Initial Testing: Comprehensive unit tests and integration tests
Phase 2: Production HSM (Months 3-4)
- AWS CloudHSM Integration: Production HSM setup with proper security controls
- Key Generation Ceremony: Formal master key generation with multiple witnesses
- Multi-Wallet Support: Complete wallet version management system
- API Integration: REST API for HSM operations with authentication
Phase 3: Security & Monitoring (Months 5-6)
- Security Monitoring: Real-time threat detection and anomaly monitoring
- Compliance Framework: GDPR, SOC 2, and audit trail implementation
- Performance Optimization: Benchmark tuning and scalability improvements
- Disaster Recovery: Complete DR procedures and business continuity testing
Phase 4: Advanced Features (Months 7-8)
- Thales Luna Backup HSM: Secondary HSM integration for disaster recovery
- Automated Incident Response: Integration with security incident procedures
- Post-Quantum Preparation: Hybrid cryptography implementation planning
- Advanced Monitoring: ML-based anomaly detection and predictive analytics
Key Technical Specifications
Production HSM Requirements:
- Primary: AWS CloudHSM (FIPS 140-2 Level 3)
- Backup: Thales Luna Network HSM (Geographic redundancy)
- Performance: >1,000 Ed25519 signatures/second (1000x safety buffer)
- Availability: 99.9% uptime with automated failover
Security Standards:
- Key Generation: Multi-witness ceremony with entropy verification
- Seed Management: BIP39 with Shamir’s Secret Sharing (3-of-5 shares)
- Access Control: Multi-factor authentication with role-based permissions
- Monitoring: Real-time anomaly detection with automated alerting
Compliance Framework:
- GDPR: Complete data protection and privacy controls
- SOC 2 Type II: Security, availability, and processing integrity
- Audit Trail: 7-year retention with complete forensic capabilities
- Regulatory Reporting: Automated compliance reporting and documentation
Next Steps
- Begin Phase 1: Set up SoftHSM development environment and basic HD wallet implementation
- Security Review: Conduct comprehensive security review of architecture and implementation
- Vendor Selection: Finalize HSM vendor contracts and procurement
- Team Training: Train operations team on HSM management and incident response
- Compliance Preparation: Begin SOC 2 and regulatory compliance documentation
This HSM integration guide provides the complete foundation for secure private key management in the Plings wallet-first architecture, ensuring both security and operational excellence.
Implementation Timeline
Complete Three-Tier Implementation Schedule
Phase 1: Initial Deployment (Week 1)
Initial Tier (Vercel Environment Variables)
- Day 1: Generate master key and configure Vercel environment
- Day 2: Implement HD wallet derivation logic
- Day 3: Create API endpoints for identifier generation
- Day 4: Test with sample manufacturer batches
- Day 5: Production deployment and monitoring setup
Deliverables:
- ✅ Production-ready key management system
- ✅ Identifier generation API
- ✅ Database schema for public keys
- ✅ Basic monitoring and logging
Phase 2: Enhanced Security (Weeks 2-5)
Next Level (SoftHSM Implementation)
- Week 2: Infrastructure setup and SoftHSM installation
- Week 3: PKCS#11 integration and HSM service development
- Week 4: API integration and testing
- Week 5: Migration from Vercel to SoftHSM
Deliverables:
- ✅ SoftHSM service with PKCS#11 interface
- ✅ Enhanced audit trail and key operations logging
- ✅ Improved security posture
- ✅ Backup and recovery procedures
Phase 3: Enterprise Preparation (Months 2-9)
Final Level (Hardware HSM)
- Months 2-3: Enterprise requirements gathering and vendor selection
- Months 4-5: Hardware HSM procurement and setup
- Months 6-7: Integration development and testing
- Months 8-9: Migration and compliance certification
Deliverables:
- ✅ Enterprise-grade hardware HSM
- ✅ FIPS 140-2 Level 3 compliance
- ✅ Geographic redundancy and disaster recovery
- ✅ Enterprise monitoring and support
Key Milestones
| Milestone | Timeline | Description |
|---|---|---|
| MVP Launch | Week 1 | Production-ready with Vercel environment variables |
| Security Upgrade | Week 5 | SoftHSM implementation with enhanced security |
| Enterprise Ready | Month 9 | Hardware HSM with full compliance |
Risk Mitigation Timeline
Week 1-2: Identify potential security vulnerabilities in Vercel approach Week 3-4: Implement additional security controls and monitoring Month 2-3: Begin enterprise security assessment and planning Month 6-7: Conduct security audit and penetration testing Month 8-9: Complete compliance certification and documentation
Cost Analysis
Total Cost of Ownership (TCO) by Tier
Initial Tier: Vercel Environment Variables
Year 1 Costs:
- Infrastructure: $0 (included in Vercel plan)
- Development: $5,000 (1 week developer time)
- Operation: $0 (no additional operational costs)
- Total Year 1: $5,000
Ongoing Annual Costs:
- Infrastructure: $0
- Maintenance: $1,000 (quarterly key rotation)
- Monitoring: $0
- Total Annual: $1,000
Next Level: SoftHSM Implementation
Year 1 Costs:
- Infrastructure: $600 (VPS hosting)
- Development: $15,000 (3 weeks developer time)
- Operation: $2,000 (setup and monitoring)
- Total Year 1: $17,600
Ongoing Annual Costs:
- Infrastructure: $600
- Maintenance: $3,000 (quarterly maintenance)
- Monitoring: $1,200 (additional monitoring tools)
- Total Annual: $4,800
Final Level: Hardware HSM
Year 1 Costs:
- Infrastructure: $18,000 (AWS CloudHSM)
- Development: $40,000 (8 weeks developer time)
- Operation: $10,000 (setup and training)
- Compliance: $15,000 (audit and certification)
- Total Year 1: $83,000
Ongoing Annual Costs:
- Infrastructure: $18,000
- Maintenance: $8,000 (quarterly maintenance)
- Monitoring: $3,000 (enterprise monitoring)
- Compliance: $5,000 (annual audit)
- Total Annual: $34,000
Cost Comparison Over 3 Years
| Tier | Year 1 | Year 2 | Year 3 | Total 3-Year |
|---|---|---|---|---|
| Initial | $5,000 | $1,000 | $1,000 | $7,000 |
| Next Level | $17,600 | $4,800 | $4,800 | $27,200 |
| Final Level | $83,000 | $34,000 | $34,000 | $151,000 |
ROI Analysis
Business Impact by Tier
Initial Tier Benefits:
- Enables immediate market entry
- Reduces time-to-market by 6-9 months
- Estimated revenue opportunity: $100,000 in Year 1
Next Level Benefits:
- Enables enterprise customer acquisition
- Improved security posture attracts larger customers
- Estimated additional revenue: $250,000 in Year 2
Final Level Benefits:
- Enables enterprise and government contracts
- Meets regulatory compliance requirements
- Estimated additional revenue: $500,000+ in Year 3
Break-Even Analysis
Initial Tier: Immediate ROI (revenue > costs from Day 1) Next Level: Break-even at ~$27,000 additional revenue (typically Month 2-3) Final Level: Break-even at ~$151,000 additional revenue (typically Month 6-9)
Cost Optimization Strategies
- Hybrid Approach: Use Initial Tier for small customers, Next Level for medium customers, Final Level for enterprise
- Phased Migration: Implement tiers as business grows to optimize cash flow
- Vendor Negotiation: Leverage growth trajectory for better HSM pricing
- Operational Efficiency: Automate operations to reduce maintenance costs
Glossary
A-D
API (Application Programming Interface): A set of protocols and tools for building software applications, allowing different software components to communicate.
Asymmetric Cryptography: A cryptographic system that uses a pair of keys - a public key and a private key - for encryption and digital signatures.
Audit Trail: A chronological record of all system activities, providing evidence of what happened, when, and by whom.
AWS CloudHSM: Amazon Web Services’ managed hardware security module service providing FIPS 140-2 Level 3 certified cryptographic processing.
BIP32: Bitcoin Improvement Proposal 32 - standard for Hierarchical Deterministic (HD) wallets that can generate a tree of key pairs from a single seed.
BIP39: Bitcoin Improvement Proposal 39 - standard for mnemonic phrases used to generate cryptocurrency wallet seeds.
CRYSTALS-Dilithium: A post-quantum cryptographic signature algorithm designed to be resistant to quantum computer attacks.
E-H
Ed25519: A modern elliptic curve cryptography algorithm providing fast, secure digital signatures with small key and signature sizes.
FIPS 140-2: Federal Information Processing Standard 140-2 - U.S. government standard for cryptographic modules with 4 security levels.
Hardware Security Module (HSM): A dedicated cryptographic device designed to securely generate, store, and manage digital keys and perform cryptographic operations.
HD Wallet: Hierarchical Deterministic wallet - a wallet that can generate multiple key pairs from a single master seed following the BIP32 standard.
M-P
Mnemonic Phrase: A human-readable representation of a cryptographic seed, typically consisting of 12-24 words from a standardized dictionary.
PBKDF2: Password-Based Key Derivation Function 2 - a cryptographic function used to derive encryption keys from passwords.
PKCS#11: Public Key Cryptography Standards #11 - a standard interface for communicating with cryptographic tokens and HSMs.
Private Key: The secret key in asymmetric cryptography that must be kept confidential and is used for decryption and digital signing.
Public Key: The openly shared key in asymmetric cryptography used for encryption and signature verification.
Q-S
Quantum Resistance: The property of a cryptographic algorithm to remain secure against attacks by quantum computers.
RSA: Rivest-Shamir-Adleman - a widely used public-key cryptographic algorithm, now being superseded by elliptic curve cryptography.
Shamir’s Secret Sharing: A cryptographic technique that splits a secret into multiple shares where a threshold number of shares is required to reconstruct the secret.
SoftHSM: A software-based implementation of a hardware security module, used for development and testing purposes.
SOC 2: Service Organization Control 2 - a compliance framework for service providers storing customer data in the cloud.
T-W
Tamper Detection: The ability of a security device to detect physical intrusion attempts and respond appropriately (often by destroying sensitive data).
Tamper Resistance: The property of a security device to resist physical attacks and intrusion attempts.
Thales Luna: A family of hardware security modules manufactured by Thales Group, providing high-assurance cryptographic processing.
Wallet Version: In Plings’ wallet-first architecture, a specific version of the wallet infrastructure with its own master key and security controls.
References and Related Documentation
For complete understanding of HSM integration within the Plings ecosystem:
- Wallet Management Architecture - Overall wallet-first system architecture
- Wallet Security Model - Comprehensive security framework and threat model
- Key Compromise Procedures - Incident response procedures for key compromise
- Wallet Lifecycle Management - Operational procedures for wallet lifecycle
- HD Wallet Database Schema - Database design for multi-wallet support
External Resources:
- BIP32: Hierarchical Deterministic Wallets specification
- BIP39: Mnemonic code for generating deterministic keys
- Ed25519: Edwards-curve Digital Signature Algorithm
- FIPS 140-2: Federal Information Processing Standard for cryptographic modules
- AWS CloudHSM: Amazon Web Services Hardware Security Module documentation
- Thales Luna: Thales Hardware Security Module documentation
Last Updated: Sön 13 Jul 2025 12:53:21 CEST - Complete HSM integration guide with vendor selection, implementation details, and operational procedures