← Back to Main Documentation Core Systems Index

Plings Key Management Guide - Three-Tier Implementation Strategy

Created: Sön 13 Jul 2025 12:53:21 CEST
Updated: Mån 17 Jul 2025 - Added 3-tier implementation strategy
Document Version: 2.0 - Three-Tier Key Management Architecture
Security Classification: Internal Technical Documentation
Target Audience: CTOs, Security Teams, Infrastructure Engineers, DevOps Engineers, Backend Developers Author: Paul Wisén

Executive Summary

This guide presents Plings’ three-tier key management strategy that enables immediate deployment while providing a clear path to enterprise-grade security. The strategy balances rapid time-to-market with long-term security requirements through progressive implementation phases.

Three-Tier Implementation Strategy

Tier Solution Timeline Cost Use Case
Initial Vercel Environment Variables Immediate $0 MVP, early customers, rapid deployment
Next Level SoftHSM 2-4 weeks $20-50/mo Growing business, enhanced security
Final Level Hardware HSM 6-9 months $1,500-5,000/mo Enterprise, compliance requirements

What You’ll Learn

For Decision Makers: Cost-effective scaling strategy from $0 to enterprise For Technical Teams: Implementation guides for each tier with migration paths For Security Teams: Progressive security enhancement maintaining business velocity For Developers: Immediate deployment with Vercel, future-proof architecture

Why This Strategy Matters

Problem: Traditional HSM implementations require months of setup and significant upfront investment. Solution: Three-tier approach enables immediate deployment with progressive security enhancement. Business Impact: Launch in days instead of months, scale security with business growth.

Key Implementation Decisions

  1. Initial Tier: Vercel Environment Variables for immediate production deployment
  2. Next Level: SoftHSM on dedicated VPS for enhanced security (2-4 weeks)
  3. Final Level: AWS CloudHSM or Thales Luna for enterprise compliance (6-9 months)
  4. Cryptographic Standard: Ed25519 with planned CRYSTALS-Dilithium post-quantum migration
  5. Key Derivation: BIP32-compatible hierarchical deterministic wallet system
  6. Performance: 1,000 signatures/second (1000x safety buffer for <1 ops/sec actual usage)

HSM Performance Reality Check

Actual Plings HSM Usage:

  • Verification: Happens client-side using public keys (no HSM involvement)
  • Key Generation: Only for new organizations/paths (rare administrative tasks)
  • Real Performance Need: <1 operation per second
  • Specified Requirement: 1,000 operations/second (1000x safety buffer)

Why Performance Isn’t Critical:

# HSM operations (rare)
master_key = hsm.generate_master_key()           # 1-4 times per year
anchor_key = hsm.derive_key(master_key, path)    # 10-100 times per month
path_signature = hsm.sign(path_data)             # 1,000-10,000 times per month

# Client operations (frequent, no HSM)
is_valid = verify_offline(instance_key, public_key)  # Millions per day

Cost Impact: Standard HSM configurations are sufficient, avoiding premium high-performance pricing.

Table of Contents

  1. Three-Tier Implementation Overview
  2. Choosing the Right Tier
  3. Initial Tier: Vercel Environment Variables
  4. Next Level: SoftHSM Implementation
  5. Final Level: Hardware HSM
  6. Migration Paths Between Tiers
  7. What is an HSM?
  8. Key Concepts and Terminology
  9. BIP32 HD Wallet Implementation
  10. Security Monitoring and Compliance
  11. Disaster Recovery and Business Continuity
  12. Implementation Timeline
  13. Cost Analysis
  14. Glossary

Three-Tier Implementation Overview

Plings uses a progressive three-tier key management strategy that enables immediate deployment while maintaining a clear upgrade path to enterprise-grade security. Each tier is designed to serve specific business phases and can be seamlessly migrated to the next level as requirements evolve.

Architecture Overview

┌─────────────────────────────────────────────────────────────────┐
│                    Three-Tier Key Management                     │
├─────────────────────────────────────────────────────────────────┤
│                                                                 │
│  Initial Tier          Next Level           Final Level         │
│  ┌─────────────┐      ┌─────────────┐     ┌─────────────┐     │
│  │   Vercel    │      │  SoftHSM    │     │ Hardware HSM│     │
│  │ Environment │ ───► │ VPS Server  │ ──► │   AWS/Luna  │     │
│  │  Variables  │      │   PKCS#11   │     │  FIPS 140-2 │     │
│  └─────────────┘      └─────────────┘     └─────────────┘     │
│                                                                 │
│  Timeline: Now        2-4 weeks           6-9 months           │
│  Cost: $0            $20-50/mo           $1,500-5,000/mo       │
│                                                                 │
└─────────────────────────────────────────────────────────────────┘

Tier Comparison Matrix

Feature Initial (Vercel) Next Level (SoftHSM) Final Level (Hardware)
Setup Time < 1 day 2-4 weeks 6-9 months
Monthly Cost $0 $20-50 $1,500-5,000
Security Level Software HSM-equivalent Hardware-certified
Key Storage Encrypted env vars PKCS#11 token Tamper-proof hardware
Audit Trail Application logs PKCS#11 logs Enterprise audit
Backup Method Env var copy Token backup Hardware replication
Compliance Basic Standard FIPS 140-2 L3
Performance 1000+ ops/sec 1000+ ops/sec 1000+ ops/sec
Migration Effort N/A 1 week 2-4 weeks

HD Wallet Consistency Across Tiers

All three tiers implement the same HD wallet structure, ensuring seamless migration:

Master Key: m/44'/501'/[wallet_version]'/
├── Manufacturer: m/44'/501'/[wallet]'/[manufacturer]'/
│   ├── Category: m/44'/501'/[wallet]'/[manufacturer]'/[category]'/
│   │   ├── Class: m/44'/501'/[wallet]'/[manufacturer]'/[category]'/[class]'/
│   │   │   ├── Batch: m/44'/501'/[wallet]'/[manufacturer]'/[category]'/[class]'/[batch]'/
│   │   │   │   └── Instance: .../[instance_number]

Key Points:

  • Wallet version is stored in the path (supports wallet rotation)
  • Same derivation logic across all tiers
  • Only the master key storage location changes
  • Public keys remain consistent during migration

Choosing the Right Tier

Decision Framework

Start with Initial Tier (Vercel) When:

  • ✅ Launching MVP or proof of concept
  • ✅ Need immediate production deployment
  • ✅ Budget constraints (< $100/month)
  • ✅ Team size < 10 developers
  • ✅ Customer base < 1,000 organizations

Upgrade to Next Level (SoftHSM) When:

  • ✅ Customer base > 1,000 organizations
  • ✅ Security audit requirements
  • ✅ Need key operation audit trails
  • ✅ Budget allows $50-100/month
  • ✅ Team has dedicated DevOps resources

Move to Final Level (Hardware HSM) When:

  • ✅ Enterprise customer requirements
  • ✅ Regulatory compliance (FIPS 140-2)
  • ✅ Revenue > $1M annually
  • ✅ Need geographic redundancy
  • ✅ Handling high-value identifiers

Risk Assessment by Tier

Risk Type Initial Next Level Final Level
Key Compromise Medium Low Very Low
Operational Complexity Low Medium High
Vendor Lock-in Medium (Vercel) Low Medium (AWS/Thales)
Scalability Limits None None None
Recovery Time Minutes Hours Hours-Days

Initial Tier: Vercel Environment Variables

Quick Start (< 1 Hour)

The Vercel environment variable approach enables production deployment in under an hour with zero additional infrastructure costs.

Step 1: Generate Master Key

# Generate a new master key using Node.js
node -e "
const crypto = require('crypto');
const bs58 = require('bs58');
const masterKey = crypto.randomBytes(32);
console.log('PLINGS_MASTER_KEY=' + bs58.encode(masterKey));
"

# Output: PLINGS_MASTER_KEY=5KYZdUEo39z3FPLjCKpxKkGXstPbqGiELQgSXzFm9ysh

Step 2: Configure Vercel Environment

# Add to Vercel project settings
vercel env add PLINGS_MASTER_KEY production
vercel env add DATABASE_URL production
vercel env add NEO4J_URI production

Step 3: Implement Key Derivation

// api/generate-identifiers.js
import { deriveHDKey } from '@/lib/hd-wallet';

export default async function handler(req, res) {
  const { manufacturer, category, classId, batch, quantity } = req.body;
  
  // Load master key from environment
  const masterKey = process.env.PLINGS_MASTER_KEY;
  
  // Derive keys for the batch
  const identifiers = [];
  for (let i = 1; i <= quantity; i++) {
    const path = `m/44'/501'/1'/${manufacturer}'/${category}'/${classId}'/${batch}'/${i}`;
    const keyPair = await deriveHDKey(masterKey, path);
    
    identifiers.push({
      path,
      publicKey: keyPair.publicKey,
      // Private key is discarded - never stored
    });
  }
  
  // Store only public keys in database
  await storeIdentifiers(identifiers);
  
  return res.json({ success: true, count: identifiers.length });
}

Security Model

What’s Protected:

  • Master key encrypted by Vercel’s infrastructure
  • Private keys never persisted (derived on-demand)
  • Database contains only public keys

Security Boundaries:

  • Vercel team members with env access
  • API functions have key access during execution
  • No key material in application logs

Best Practices:

  1. Rotate master key quarterly
  2. Limit Vercel team access
  3. Enable audit logging
  4. Use separate keys for dev/staging/prod
  5. Implement rate limiting on key generation APIs

For Detailed Implementation

See Vercel Key Management Guide for:

  • Complete code examples
  • Database schema
  • API endpoint specifications
  • Security hardening
  • Monitoring setup

Next Level: SoftHSM Implementation

Overview

SoftHSM provides HSM-equivalent security using PKCS#11 standard interfaces, enabling audit trails, key ceremonies, and compliance features without hardware costs.

Architecture

┌─────────────────────────────────────────────────────┐
│              SoftHSM Architecture                    │
├─────────────────────────────────────────────────────┤
│                                                     │
│  Vercel Functions          VPS/Cloud Server        │
│  ┌─────────────┐          ┌───────────────────┐   │
│  │   API       │  HTTPS   │   HSM Service     │   │
│  │  Handlers   │ ───────► │  ┌─────────────┐  │   │
│  │             │          │  │  SoftHSM2   │  │   │
│  └─────────────┘          │  │  PKCS#11    │  │   │
│                           │  └─────────────┘  │   │
│                           │                    │   │
│                           │  Key Storage       │   │
│                           │  (Encrypted)       │   │
│                           └───────────────────┘   │
│                                                     │
└─────────────────────────────────────────────────────┘

Implementation Timeline (2-4 Weeks)

Week 1: Infrastructure Setup

  • Provision VPS ($20-50/month)
  • Install SoftHSM2 and dependencies
  • Configure network security

Week 2: HSM Service Development

  • PKCS#11 integration
  • REST API wrapper
  • TLS certificate setup

Week 3: Integration & Testing

  • Update Vercel functions
  • Test key operations
  • Performance validation

Week 4: Migration & Go-Live

  • Migrate from env variables
  • Monitor operations
  • Document procedures

Key Features

Enhanced Security:

  • Keys never leave HSM boundary
  • PKCS#11 audit trail
  • Multi-person key ceremonies
  • Hardware-equivalent protections

Operational Benefits:

  • Standard HSM interfaces
  • Backup/restore procedures
  • Key rotation capabilities
  • Performance monitoring

For Detailed Implementation

See SoftHSM Migration Guide for:

  • Step-by-step setup
  • PKCS#11 integration
  • API implementation
  • Migration procedures

Final Level: Hardware HSM

Enterprise-Grade Security

Hardware HSMs provide the highest level of security with physical tamper protection, compliance certifications, and enterprise support.

Primary Options

AWS CloudHSM

  • Certification: FIPS 140-2 Level 3
  • Deployment: Managed service in AWS
  • Cost: ~$1,500/month per HSM
  • Best For: Cloud-native architectures

Thales Luna Network HSM

  • Certification: FIPS 140-2 L3, Common Criteria EAL4+
  • Deployment: On-premises or cloud
  • Cost: $15,000-50,000 purchase + support
  • Best For: Multi-cloud, hybrid deployments

Implementation Considerations

Timeline: 6-9 months including:

  • Vendor selection (1-2 months)
  • Procurement (1-2 months)
  • Implementation (2-3 months)
  • Testing & certification (2 months)

Team Requirements:

  • Security architect
  • HSM administrators
  • DevOps engineers
  • Compliance officer

When Hardware HSM is Required

Regulatory Requirements:

  • Financial services compliance
  • Government contracts
  • Healthcare data protection
  • High-value asset protection

Business Triggers:

  • Revenue > $10M annually
  • Enterprise customer mandates
  • International expansion
  • IPO preparation

Migration Paths Between Tiers

Initial → Next Level (Vercel to SoftHSM)

Duration: 1-2 weeks with zero downtime

Phase 1: Parallel Operation (3 days)

// Dual-mode key derivation
async function deriveKey(path) {
  if (process.env.USE_SOFTHSM === 'true') {
    return await hsmClient.deriveKey(path);
  } else {
    return await deriveFromEnvVar(process.env.PLINGS_MASTER_KEY, path);
  }
}

Phase 2: Migration (2 days)

  1. Deploy SoftHSM service
  2. Import master key to HSM
  3. Test with subset of traffic
  4. Monitor performance

Phase 3: Cutover (1 day)

  1. Switch all traffic to SoftHSM
  2. Remove env variable access
  3. Update documentation

Next Level → Final Level (SoftHSM to Hardware)

Duration: 2-4 weeks with planned maintenance windows

Key Migration Strategy:

  1. Generate new master key in hardware HSM
  2. Create new wallet version (v2)
  3. Issue new identifiers with v2
  4. Maintain v1 for existing identifiers
  5. Gradual migration over time

Technical Approach:

# Multi-wallet support during migration
class WalletManager:
    def __init__(self):
        self.wallets = {
            1: SoftHSMWallet(),    # Existing
            2: HardwareHSMWallet() # New
        }
    
    def get_wallet(self, version):
        return self.wallets[version]

What is an HSM?

An Hardware Security Module (HSM) is a dedicated cryptographic device designed to securely generate, store, and manage digital keys and perform cryptographic operations.

Key Characteristics:

Physical Security:

  • Tamper-resistant/tamper-evident hardware
  • If someone tries to physically break into it, it destroys the keys
  • Certified to standards like FIPS 140-2 Level 3 or Common Criteria

Cryptographic Operations:

  • Key generation with true random number generators
  • Digital signing and verification
  • Encryption/decryption operations
  • Key derivation (like BIP32 HD wallet operations)

Key Storage:

  • Keys are generated and stored inside the HSM
  • Keys never leave the HSM in plaintext
  • Even administrators cannot extract the actual key material

Why HSMs for Plings?

In the context of our wallet-first architecture:

Without HSM (Risky):

# DANGEROUS - Private key in software
master_private_key = "abc123..."  # Stored in database or file
# Anyone with access to this can generate fake Plings identifiers

With HSM (Secure):

# SECURE - Private key never leaves HSM hardware
hsm.generate_signature(message)  # Happens inside tamper-proof hardware
# Even if someone hacks our servers, they can't get the master key

Real-World Analogy

Think of an HSM like a high-security bank vault:

Bank Vault: Physically secure, tamper-evident, requires multiple people to open
HSM: Physically secure, tamper-evident, requires authentication to use

Bank Vault: Stores valuable physical assets (gold, cash)
HSM: Stores valuable digital assets (cryptographic keys)

Bank Vault: If someone breaks in, alarms go off
HSM: If someone tampers with it, it destroys the keys

HSM vs Regular Computer Security

Feature Regular Server HSM
Security Model Software-based security Hardware-based security
Key Storage Keys stored in files/database Keys never leave secure hardware
Vulnerability Vulnerable to malware Isolated from operating system
Remote Access Can be remotely compromised Requires physical presence
Key Protection Keys can be copied Keys cannot be extracted

Why This Matters for Plings

Since every Plings identifier must cryptographically derive from our master key:

If master key is compromised → Anyone can create fake Plings identifiers
If master key is in HSM → Only authorized operations through secure hardware

The HSM is essentially the Fort Knox for Plings’ cryptographic security! 🏛️


Key Concepts and Terminology

Before diving into implementation, let’s clarify the essential technical concepts:

FIPS 140-2 Certification

FIPS 140-2 is a U.S. government standard for cryptographic modules. It has 4 security levels:

  • Level 1: Basic security (software-only)
  • Level 2: Role-based authentication
  • Level 3: Physical tamper detection and response ⭐ (Plings uses this)
  • Level 4: Environmental failure protection

Why Level 3 matters for Plings: If someone tries to physically attack the HSM, it automatically destroys all keys, preventing theft.

PKCS#11

PKCS#11 is the standard interface for communicating with cryptographic tokens and HSMs. Think of it as the “USB driver” for HSMs - it provides a consistent API regardless of the HSM vendor.

# All HSMs use the same PKCS#11 interface
hsm.login(username, password)
private_key = hsm.generateKeyPair(algorithm="Ed25519")
signature = hsm.sign(private_key, message)

Ed25519

Ed25519 is a modern elliptic curve cryptography algorithm. We chose it because:

  • Performance: Very fast signing and verification
  • Security: Resistant to side-channel attacks
  • Simplicity: No parameter choices that could weaken security
  • Solana Standard: Native support in Solana blockchain

Ed25519 vs RSA: | Feature | Ed25519 | RSA-2048 | |———|———|———-| | Key Size | 32 bytes | 256 bytes | | Signature Size | 64 bytes | 256 bytes | | Performance | 10x faster | Slower | | Quantum Resistance | Better | Worse |

BIP32/BIP39 Standards

BIP32 (Hierarchical Deterministic Wallets) and BIP39 (Mnemonic Codes) are Bitcoin standards we use:

BIP32: Allows generating millions of keys from one master key

Master Key → Wallet v1 → Manufacturer → Category → Class → Instance

BIP39: Converts keys to human-readable words

Master Key = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about"

Why use Bitcoin standards? They’re battle-tested, widely supported, and have excellent tooling.

Shamir’s Secret Sharing

Shamir’s Secret Sharing splits a secret into multiple shares where you need a threshold to reconstruct it.

Example: Split master key into 5 shares, need any 3 to recover:

  • Share 1: Security Officer (offline storage)
  • Share 2: CTO (safety deposit box)
  • Share 3: External trustee (bank vault)
  • Share 4: Compliance officer (secure facility)
  • Share 5: Emergency contact (geographic separation)

Business Benefit: No single person can compromise the master key, but legitimate recovery is still possible.


Prerequisites and Assumptions

Required Knowledge

Before implementing HSM integration, your team should understand:

Infrastructure Team:

  • AWS services (VPC, EC2, IAM, CloudFormation)
  • Network security (firewalls, VPNs, SSL/TLS)
  • Linux system administration
  • Database administration (PostgreSQL)

Development Team:

  • Python programming (asyncio, cryptography libraries)
  • REST API development (FastAPI, authentication)
  • Database operations (SQL, transactions)
  • Git version control and CI/CD

Security Team:

  • Cryptographic concepts (public/private keys, signatures)
  • Security incident response procedures
  • Compliance frameworks (SOC 2, GDPR)
  • Risk assessment and threat modeling

Infrastructure Requirements

AWS Account Setup:

  • Production AWS account with appropriate IAM roles
  • VPC with private subnets for HSM deployment
  • Network connectivity between HSM and application servers
  • Monitoring and logging infrastructure (CloudWatch, CloudTrail)

Security Requirements:

  • Multi-factor authentication for all administrative access
  • Network segmentation and firewall rules
  • Secure backup and disaster recovery procedures
  • Incident response team and procedures

Team Roles and Responsibilities

Security Officer:

  • Overall security strategy and governance
  • HSM key ceremony oversight
  • Incident response coordination
  • Compliance and audit management

Infrastructure Engineer:

  • HSM deployment and configuration
  • Network setup and security
  • Monitoring and alerting setup
  • Disaster recovery testing

Backend Developer:

  • HSM client integration
  • API development and testing
  • Database schema implementation
  • Performance optimization

DevOps Engineer:

  • CI/CD pipeline integration
  • Automated testing and deployment
  • Infrastructure as code
  • Operational monitoring

Security Clearance and Access Control

HSM Administrator Access:

  • Requires security background check
  • Multi-person authorization for key operations
  • Regular access reviews and rotation
  • Audit logging of all activities

Development Access:

  • Separate development environment with SoftHSM
  • No access to production HSM or keys
  • Code review requirements for HSM-related changes
  • Security training and awareness

Quick Start for Different Audiences

For CTO/Decision Makers

Business Problem: Plings identifiers must be cryptographically secure to prevent counterfeiting and maintain trust.

Solution: Hardware Security Modules provide tamper-proof key storage and cryptographic operations.

Investment Summary:

  • Setup Cost: $20,000-$35,000 annually (standard tier sufficient)
  • Implementation Time: 3-6 months
  • Risk Reduction: Prevents identifier counterfeiting (potentially millions in losses)
  • Compliance: Meets SOC 2, GDPR, and financial industry standards

Decision Points:

  1. AWS CloudHSM for primary production (recommended)
  2. Thales Luna for disaster recovery backup
  3. SoftHSM for development environments

For DevOps Engineers

Implementation Timeline:

  • Month 1: Development environment setup with SoftHSM
  • Month 2: AWS CloudHSM infrastructure deployment
  • Month 3: Production integration and testing
  • Month 4: Disaster recovery setup with Thales Luna
  • Month 5: Security monitoring and compliance setup
  • Month 6: Full production deployment and team training

Infrastructure Needs:

  • AWS CloudHSM cluster with multi-AZ deployment
  • VPC with private subnets and security groups
  • Application servers with HSM client libraries
  • Monitoring and logging infrastructure
  • Backup and disaster recovery procedures

For Security Teams

Threat Model:

  • Primary Threat: Master key compromise leading to identifier counterfeiting
  • Secondary Threats: Insider threats, physical attacks, software vulnerabilities
  • Mitigation: HSM provides hardware-level protection with tamper detection

Compliance Requirements:

  • FIPS 140-2 Level 3 certification for cryptographic modules
  • SOC 2 Type II controls for security and availability
  • GDPR compliance for data protection and privacy
  • Audit Trail: Complete logging of all HSM operations

Security Controls:

  • Multi-factor authentication for HSM access
  • Role-based access control with principle of least privilege
  • Network segmentation and encryption
  • Regular security assessments and penetration testing

For Developers

API Integration: The HSM service provides simple REST APIs for common operations:

# Generate identifier batch
response = requests.post('/api/v1/identifiers/generate', json={
    'wallet_version': 1,
    'paths': ['1.1.C1.1.1', '1.1.C1.1.2'],
    'allocation_type': 'generic'
})

# Check HSM health
health = requests.get('/api/v1/health')

Development Environment:

  • Use SoftHSM for local development
  • Full API compatibility with production HSMs
  • Automated testing with CI/CD integration
  • Performance benchmarking and optimization

HSM Vendor Selection and Comparison

1. AWS CloudHSM (Primary Production)

Why CloudHSM for Plings:

  • FIPS 140-2 Level 3 certification for regulatory compliance
  • Adequate Performance: >1,000 Ed25519 signatures/second (exceeds Plings needs)
  • Seamless AWS Integration: Natural fit with existing cloud infrastructure
  • High Availability: Multi-AZ deployment with automatic failover
  • Managed Service: AWS handles hardware maintenance and security updates

Technical Specifications:

hsm_specification:
  vendor: "AWS CloudHSM"
  model: "AWS CloudHSM (Cavium SafeNet Luna)"
  certification: "FIPS 140-2 Level 3"
  performance:
    ed25519_signatures_per_second: 1000+  # Exceeds Plings requirements
    concurrent_sessions: 100
    actual_plings_usage: "<1 operation/second"
  availability:
    sla: "99.9%"
    multi_az: true
    automatic_failover: true
  integration:
    api: "PKCS#11, OpenSSL, JCE"
    sdk: "AWS SDK with HSM extensions"
    languages: ["Python", "Java", "C++", "JavaScript"]

Cost Structure:

  • Initialization: $3,000 one-time setup fee
  • Monthly: $1,500 per HSM instance (standard configuration sufficient)
  • Usage: $0.01 per 1,000 operations (~$1/month for Plings usage)
  • Estimated Annual Cost: ~$20,000 for production setup (standard tier)

2. Thales Luna Network HSM (Backup/DR)

Why Luna for Disaster Recovery:

  • Geographic Independence: Non-AWS vendor for risk diversification
  • Proven Track Record: Widely used in financial services
  • High Availability: Network-attached HSMs with failover clustering
  • Compliance: FIPS 140-2 Level 3 and Common Criteria EAL4+

Technical Specifications:

hsm_specification:
  vendor: "Thales"
  model: "Luna Network HSM 7"
  certification: "FIPS 140-2 Level 3, Common Criteria EAL4+"
  performance:
    ed25519_signatures_per_second: 1000+  # Standard model sufficient
    concurrent_sessions: 50
    actual_plings_usage: "<1 operation/second"
  availability:
    clustering: "High Availability with automatic failover"
    load_balancing: true
  integration:
    api: "PKCS#11, Microsoft CNG, OpenSSL"
    management: "Luna HSM Client and utilities"

3. SoftHSM (Development/Testing)

Why SoftHSM for Development:

  • Cost Effective: Free open-source software-based HSM
  • Easy Setup: Simple installation and configuration
  • Full API Compatibility: PKCS#11 interface identical to hardware HSMs
  • Development Speed: Rapid iteration without hardware constraints

Technical Specifications:

hsm_specification:
  vendor: "OpenDNSSEC"
  model: "SoftHSM 2.x"
  certification: "Software-based (no hardware certification)"
  performance:
    ed25519_signatures_per_second: 1000+  # More than adequate for development
    memory_based: true
    thread_safe: true
    actual_plings_usage: "<1 operation/second"
  integration:
    api: "PKCS#11"
    platforms: ["Linux", "macOS", "Windows"]
    languages: ["Python", "C++", "Java"]

HSM Comparison Matrix

Feature AWS CloudHSM Thales Luna SoftHSM
Certification FIPS 140-2 L3 FIPS 140-2 L3 None
Performance 1,000+ sig/sec 1,000+ sig/sec 1,000+ sig/sec
Plings Usage <1 sig/sec <1 sig/sec <1 sig/sec
Cost $20K/year $35K/year Free
Availability 99.9% 99.95% Software-dependent
Use Case Production DR/Backup Development
Geographic AWS regions On-premises Any
Compliance SOC 2, FedRAMP SOC 2, CC EAL4+ None

Production HSM Implementation

AWS CloudHSM Setup

1. Infrastructure Setup

#!/bin/bash
# AWS CloudHSM cluster setup for Plings production environment

# Create HSM cluster
aws cloudhsmv2 create-cluster \
  --hsm-type hsm1.medium \
  --subnet-ids subnet-12345678 subnet-87654321 \
  --tag-specifications 'ResourceType=cluster,Tags=[{Key=Project,Value=Plings},{Key=Environment,Value=Production}]'

# Create HSM instances in multiple AZs
aws cloudhsmv2 create-hsm \
  --cluster-id cluster-1234567890abcdef0 \
  --availability-zone us-west-2a

aws cloudhsmv2 create-hsm \
  --cluster-id cluster-1234567890abcdef0 \
  --availability-zone us-west-2b

# Initialize cluster
aws cloudhsmv2 initialize-cluster \
  --cluster-id cluster-1234567890abcdef0 \
  --signed-cert file://customerCA.crt \
  --trust-anchor file://customerCA.crt

2. HSM Client Configuration

# hsm_client.py - Production HSM client implementation
import boto3
import cloudhsm_mgmt_util
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519

class PlingsHSMClient:
    """Production HSM client for Plings wallet operations"""
    
    def __init__(self, cluster_id: str):
        self.cluster_id = cluster_id
        self.hsm_client = boto3.client('cloudhsmv2')
        self.pkcs11_lib = '/opt/cloudhsm/lib/libcloudhsm_pkcs11.so'
        self.session = None
        
    def initialize_session(self, username: str, password: str):
        """Initialize PKCS#11 session with HSM"""
        import PyKCS11
        
        self.pkcs11 = PyKCS11.PyKCS11Lib()
        self.pkcs11.load(self.pkcs11_lib)
        
        # Get first available slot
        slots = self.pkcs11.getSlotList(tokenPresent=True)
        if not slots:
            raise Exception("No HSM slots available")
        
        # Open session
        self.session = self.pkcs11.openSession(slots[0])
        
        # Login with CU (Crypto User) credentials
        self.session.login(username, password)
        
        print(f"✅ HSM session initialized for slot {slots[0]}")
    
    def generate_master_key(self, key_label: str, wallet_version: int) -> str:
        """Generate master key for specific wallet version"""
        import PyKCS11
        
        # Key generation template
        key_template = [
            (PyKCS11.CKA_TOKEN, True),
            (PyKCS11.CKA_PRIVATE, True),
            (PyKCS11.CKA_SENSITIVE, True),
            (PyKCS11.CKA_EXTRACTABLE, False),
            (PyKCS11.CKA_SIGN, True),
            (PyKCS11.CKA_LABEL, f"{key_label}_v{wallet_version}"),
            (PyKCS11.CKA_ID, f"plings_master_v{wallet_version}".encode()),
        ]
        
        # Generate Ed25519 key pair
        public_key, private_key = self.session.generateKeyPair(
            PyKCS11.CKM_EC_EDWARDS_KEY_PAIR_GEN,
            key_template,
            key_template
        )
        
        # Export public key for verification
        public_key_der = self.session.getAttributeValue(
            public_key, [PyKCS11.CKA_EC_POINT]
        )[0]
        
        key_id = f"plings_master_key_v{wallet_version}"
        
        print(f"✅ Generated master key: {key_id}")
        return key_id
    
    def sign_with_master_key(self, key_id: str, message: bytes) -> bytes:
        """Sign message with master key"""
        import PyKCS11
        
        # Find private key by ID
        private_key = self.session.findObjects([
            (PyKCS11.CKA_ID, key_id.encode()),
            (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY)
        ])[0]
        
        # Sign message
        signature = self.session.sign(private_key, message)
        
        return bytes(signature)
    
    def derive_wallet_key(self, master_key_id: str, derivation_path: str) -> bytes:
        """Derive wallet-specific key from master key"""
        # This would implement BIP32 derivation within HSM
        # For security, derivation happens inside HSM hardware
        pass
    
    def backup_key_material(self, key_id: str) -> dict:
        """Create encrypted backup of key material"""
        # Implementation depends on HSM vendor APIs
        # Returns encrypted key material for disaster recovery
        pass
    
    def health_check(self) -> dict:
        """Check HSM health and performance"""
        try:
            # Test signature operation
            test_message = b"health_check_test"
            start_time = time.time()
            
            # Find any available private key for testing
            test_keys = self.session.findObjects([
                (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY),
                (PyKCS11.CKA_SIGN, True)
            ])
            
            if test_keys:
                self.session.sign(test_keys[0], test_message)
                response_time = time.time() - start_time
                
                return {
                    'status': 'healthy',
                    'response_time_ms': response_time * 1000,
                    'available_keys': len(test_keys),
                    'cluster_id': self.cluster_id
                }
            else:
                return {
                    'status': 'warning',
                    'message': 'No test keys available'
                }
                
        except Exception as e:
            return {
                'status': 'error',
                'message': str(e)
            }

3. HSM Service Integration

# hsm_service.py - Service layer for HSM operations
import asyncio
import logging
from typing import Dict, Optional
from dataclasses import dataclass

@dataclass
class HSMConfig:
    primary_cluster_id: str
    backup_cluster_id: str
    region: str
    key_rotation_days: int = 365
    backup_schedule: str = "daily"

class PlingsHSMService:
    """High-level HSM service for Plings wallet operations"""
    
    def __init__(self, config: HSMConfig):
        self.config = config
        self.primary_hsm = PlingsHSMClient(config.primary_cluster_id)
        self.backup_hsm = PlingsHSMClient(config.backup_cluster_id)
        self.logger = logging.getLogger(__name__)
    
    async def initialize_production_environment(self):
        """Initialize production HSM environment"""
        try:
            # Initialize primary HSM
            await self.primary_hsm.initialize_session(
                username=os.environ['HSM_PRIMARY_USER'],
                password=os.environ['HSM_PRIMARY_PASSWORD']
            )
            
            # Initialize backup HSM
            await self.backup_hsm.initialize_session(
                username=os.environ['HSM_BACKUP_USER'],
                password=os.environ['HSM_BACKUP_PASSWORD']
            )
            
            self.logger.info("✅ Production HSM environment initialized")
            
        except Exception as e:
            self.logger.error(f"❌ HSM initialization failed: {e}")
            raise
    
    async def create_new_wallet_version(self, wallet_version: int, 
                                      description: str) -> str:
        """Create new wallet version with master key generation"""
        try:
            # Generate master key in primary HSM
            master_key_id = await self.primary_hsm.generate_master_key(
                key_label=f"plings_master",
                wallet_version=wallet_version
            )
            
            # Backup key material to secondary HSM
            await self.backup_key_to_secondary_hsm(master_key_id)
            
            # Update database with new wallet version
            await self.register_wallet_version(wallet_version, master_key_id, description)
            
            self.logger.info(f"✅ Created wallet version {wallet_version} with key {master_key_id}")
            return master_key_id
            
        except Exception as e:
            self.logger.error(f"❌ Wallet creation failed: {e}")
            raise
    
    async def sign_identifier_batch(self, wallet_version: int, 
                                  paths: List[str]) -> List[str]:
        """Sign batch of identifier paths"""
        try:
            master_key_id = await self.get_master_key_id(wallet_version)
            signatures = []
            
            for path in paths:
                # Create message to sign (path + timestamp)
                message = f"{path}:{int(time.time())}".encode()
                
                # Sign with master key
                signature = await self.primary_hsm.sign_with_master_key(
                    master_key_id, message
                )
                
                signatures.append(signature.hex())
            
            return signatures
            
        except Exception as e:
            self.logger.error(f"❌ Batch signing failed: {e}")
            raise
    
    async def emergency_key_rotation(self, compromised_wallet_version: int,
                                   incident_id: str) -> int:
        """Emergency key rotation during security incident"""
        try:
            # Generate new wallet version
            new_version = await self.get_next_wallet_version()
            
            # Create emergency master key
            emergency_key_id = await self.create_new_wallet_version(
                new_version, f"Emergency rotation - Incident {incident_id}"
            )
            
            # Mark old wallet as compromised
            await self.mark_wallet_compromised(compromised_wallet_version, incident_id)
            
            self.logger.info(f"✅ Emergency rotation complete: v{compromised_wallet_version} → v{new_version}")
            return new_version
            
        except Exception as e:
            self.logger.error(f"❌ Emergency rotation failed: {e}")
            raise
    
    async def health_monitoring(self) -> Dict[str, any]:
        """Comprehensive HSM health monitoring"""
        try:
            # Check primary HSM
            primary_health = await self.primary_hsm.health_check()
            
            # Check backup HSM
            backup_health = await self.backup_hsm.health_check()
            
            # Performance metrics
            performance_metrics = await self.collect_performance_metrics()
            
            return {
                'primary_hsm': primary_health,
                'backup_hsm': backup_health,
                'performance': performance_metrics,
                'overall_status': 'healthy' if primary_health['status'] == 'healthy' else 'degraded'
            }
            
        except Exception as e:
            self.logger.error(f"❌ Health monitoring failed: {e}")
            return {'overall_status': 'error', 'message': str(e)}

Key Generation and Ceremony

Master Key Generation Ceremony

1. Pre-Ceremony Setup

#!/bin/bash
# Key generation ceremony setup script

# Create secure ceremony environment
mkdir -p /secure/ceremony
chmod 700 /secure/ceremony
cd /secure/ceremony

# Verify air-gapped environment
if ping -c 1 8.8.8.8 &> /dev/null; then
    echo "❌ ERROR: Network connectivity detected. Ensure air-gapped environment."
    exit 1
fi

# Hardware verification
echo "📋 Hardware Security Verification Checklist:"
echo "1. Air-gapped environment: ✓"
echo "2. HSM hardware present: $(lsusb | grep -i 'hsm\|safenet' | wc -l) devices"
echo "3. Video recording: [ ] Started"
echo "4. Witnesses present: [ ] Confirmed"
echo "5. Entropy sources: [ ] Hardware RNG, [ ] Atmospheric noise"

2. Ceremony Procedure

# key_ceremony.py - Formal key generation ceremony
import secrets
import hashlib
import time
from datetime import datetime
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed25519

class KeyGenerationCeremony:
    """Formal key generation ceremony with multiple witnesses"""
    
    def __init__(self, ceremony_id: str, witnesses: List[str]):
        self.ceremony_id = ceremony_id
        self.witnesses = witnesses
        self.ceremony_log = []
        self.start_time = datetime.utcnow()
        
    def log_ceremony_event(self, event: str, witness: str = None):
        """Log ceremony events with timestamps"""
        timestamp = datetime.utcnow()
        log_entry = {
            'timestamp': timestamp.isoformat(),
            'event': event,
            'witness': witness,
            'ceremony_id': self.ceremony_id
        }
        self.ceremony_log.append(log_entry)
        print(f"[{timestamp}] {event}" + (f" (Witness: {witness})" if witness else ""))
    
    def collect_entropy_sources(self) -> bytes:
        """Collect entropy from multiple sources"""
        self.log_ceremony_event("Starting entropy collection")
        
        # Source 1: Hardware RNG
        hardware_entropy = secrets.token_bytes(32)
        self.log_ceremony_event("Hardware RNG entropy collected", self.witnesses[0])
        
        # Source 2: Atmospheric noise (simulated)
        atmospheric_entropy = secrets.token_bytes(32)
        self.log_ceremony_event("Atmospheric noise entropy collected", self.witnesses[1])
        
        # Source 3: HSM internal entropy
        hsm_entropy = secrets.token_bytes(32)
        self.log_ceremony_event("HSM internal entropy collected", self.witnesses[2])
        
        # Combine entropy sources
        combined_entropy = hashlib.sha256(
            hardware_entropy + atmospheric_entropy + hsm_entropy
        ).digest()
        
        self.log_ceremony_event("Entropy sources combined and hashed")
        return combined_entropy
    
    def generate_master_key(self, wallet_version: int) -> dict:
        """Generate master key with ceremony validation"""
        self.log_ceremony_event(f"Starting master key generation for wallet v{wallet_version}")
        
        # Collect entropy
        entropy = self.collect_entropy_sources()
        
        # Generate key using HSM
        key_id = f"plings_master_key_v{wallet_version}"
        
        # Witness verification
        for witness in self.witnesses:
            self.log_ceremony_event(f"Witness verification of key generation", witness)
            # In real implementation, witnesses would verify the process
        
        # Create key backup shares (Shamir's Secret Sharing)
        backup_shares = self.create_backup_shares(entropy, threshold=3, total_shares=5)
        
        self.log_ceremony_event("Master key generation completed")
        
        return {
            'key_id': key_id,
            'wallet_version': wallet_version,
            'ceremony_id': self.ceremony_id,
            'witnesses': self.witnesses,
            'backup_shares': backup_shares,
            'created_at': datetime.utcnow().isoformat()
        }
    
    def create_backup_shares(self, entropy: bytes, threshold: int, total_shares: int) -> List[str]:
        """Create Shamir's Secret Sharing backup shares"""
        # Implementation of Shamir's Secret Sharing
        # This is a simplified example - use a proven library in production
        shares = []
        for i in range(total_shares):
            share = hashlib.sha256(entropy + i.to_bytes(1, 'big')).hexdigest()
            shares.append(share)
        return shares
    
    def finalize_ceremony(self, key_result: dict) -> dict:
        """Finalize ceremony and generate reports"""
        self.log_ceremony_event("Finalizing key generation ceremony")
        
        ceremony_duration = datetime.utcnow() - self.start_time
        
        ceremony_report = {
            'ceremony_id': self.ceremony_id,
            'duration_seconds': ceremony_duration.total_seconds(),
            'witnesses': self.witnesses,
            'key_id': key_result['key_id'],
            'wallet_version': key_result['wallet_version'],
            'ceremony_log': self.ceremony_log,
            'verification_hash': hashlib.sha256(
                str(self.ceremony_log).encode()
            ).hexdigest()
        }
        
        self.log_ceremony_event("Ceremony report generated")
        return ceremony_report

# Example ceremony execution
def execute_key_ceremony():
    """Execute complete key generation ceremony"""
    witnesses = [
        "alice@plings.io",
        "bob@plings.io", 
        "charlie@plings.io"
    ]
    
    ceremony = KeyGenerationCeremony(
        ceremony_id="CEREMONY-2025-07-13-001",
        witnesses=witnesses
    )
    
    # Generate master key for wallet version 2
    key_result = ceremony.generate_master_key(wallet_version=2)
    
    # Finalize ceremony
    ceremony_report = ceremony.finalize_ceremony(key_result)
    
    # Save ceremony report
    with open(f"/secure/ceremony/{ceremony.ceremony_id}-report.json", "w") as f:
        json.dump(ceremony_report, f, indent=2)
    
    print("✅ Key generation ceremony completed successfully")
    return ceremony_report

3. Post-Ceremony Procedures

#!/bin/bash
# Post-ceremony security procedures

# Secure ceremony artifacts
tar -czf ceremony-artifacts.tar.gz *.json *.log
gpg --encrypt --recipient security@plings.io ceremony-artifacts.tar.gz

# Distribute backup shares to trustees
echo "📋 Backup Share Distribution:"
echo "Share 1: Security Officer (offline storage)"
echo "Share 2: CTO (safety deposit box)"
echo "Share 3: External trustee (bank vault)"
echo "Share 4: Compliance officer (secure facility)"
echo "Share 5: Emergency contact (geographic separation)"

# Verify HSM key installation
echo "🔍 Verifying HSM key installation..."
/opt/cloudhsm/bin/key_mgmt_util listKeys

# Clean up ceremony environment
shred -vfz -n 3 *.tmp *.log
rm -rf /tmp/ceremony-*

echo "✅ Post-ceremony procedures completed"

Seed Phrase and Mnemonic Management

BIP39 Mnemonic Implementation

1. Mnemonic Generation

# mnemonic_manager.py - BIP39 mnemonic and seed phrase management
import secrets
import hashlib
import hmac
from mnemonic import Mnemonic
from typing import List, Optional

class PlingsMnemonicManager:
    """Secure mnemonic and seed phrase management for Plings wallets"""
    
    def __init__(self, language: str = "english"):
        self.mnemonic_generator = Mnemonic(language)
        self.language = language
    
    def generate_mnemonic(self, strength: int = 256) -> str:
        """Generate BIP39 mnemonic phrase"""
        # Generate cryptographically secure entropy
        entropy = secrets.token_bytes(strength // 8)
        
        # Generate mnemonic from entropy
        mnemonic = self.mnemonic_generator.to_mnemonic(entropy)
        
        # Validate mnemonic
        if not self.mnemonic_generator.check(mnemonic):
            raise Exception("Generated mnemonic failed validation")
        
        return mnemonic
    
    def mnemonic_to_seed(self, mnemonic: str, passphrase: str = "") -> bytes:
        """Convert mnemonic to seed for HD wallet derivation"""
        # Validate mnemonic
        if not self.mnemonic_generator.check(mnemonic):
            raise Exception("Invalid mnemonic phrase")
        
        # Generate seed using PBKDF2
        seed = self.mnemonic_generator.to_seed(mnemonic, passphrase)
        
        return seed
    
    def create_wallet_seed_phrase(self, wallet_version: int) -> dict:
        """Create wallet-specific seed phrase with HSM backup"""
        # Generate mnemonic
        mnemonic = self.generate_mnemonic(strength=256)
        
        # Create seed
        seed = self.mnemonic_to_seed(mnemonic)
        
        # Create backup shares using Shamir's Secret Sharing
        shares = self.create_mnemonic_shares(mnemonic, threshold=3, total_shares=5)
        
        return {
            'wallet_version': wallet_version,
            'seed_id': f"plings_seed_v{wallet_version}",
            'mnemonic_words': len(mnemonic.split()),
            'seed_length': len(seed),
            'backup_shares': shares,
            'created_at': datetime.utcnow().isoformat()
        }
    
    def create_mnemonic_shares(self, mnemonic: str, threshold: int, total_shares: int) -> List[str]:
        """Create Shamir's Secret Sharing shares of mnemonic"""
        # This is a simplified implementation
        # In production, use a proven library like `secretsharing`
        shares = []
        mnemonic_bytes = mnemonic.encode('utf-8')
        
        for i in range(total_shares):
            share_seed = hashlib.sha256(mnemonic_bytes + i.to_bytes(1, 'big')).digest()
            share = share_seed.hex()
            shares.append(f"share_{i+1}:{share}")
        
        return shares
    
    def recover_from_shares(self, shares: List[str], threshold: int) -> str:
        """Recover mnemonic from Shamir's Secret Sharing shares"""
        if len(shares) < threshold:
            raise Exception(f"Insufficient shares: need {threshold}, have {len(shares)}")
        
        # Simplified recovery (use proper SSS library in production)
        # This is just for demonstration
        return "recovered_mnemonic_placeholder"
    
    def validate_seed_phrase(self, mnemonic: str) -> dict:
        """Comprehensive seed phrase validation"""
        validation_result = {
            'valid': False,
            'word_count': 0,
            'checksum_valid': False,
            'entropy_bits': 0,
            'errors': []
        }
        
        try:
            # Split mnemonic into words
            words = mnemonic.strip().split()
            validation_result['word_count'] = len(words)
            
            # Check word count
            if len(words) not in [12, 15, 18, 21, 24]:
                validation_result['errors'].append("Invalid word count")
                return validation_result
            
            # Validate checksum
            if not self.mnemonic_generator.check(mnemonic):
                validation_result['errors'].append("Invalid checksum")
                return validation_result
            
            validation_result['checksum_valid'] = True
            
            # Calculate entropy bits
            validation_result['entropy_bits'] = (len(words) * 11) - (len(words) // 3)
            
            # Check entropy strength
            if validation_result['entropy_bits'] < 128:
                validation_result['errors'].append("Insufficient entropy")
            
            validation_result['valid'] = len(validation_result['errors']) == 0
            
        except Exception as e:
            validation_result['errors'].append(f"Validation error: {str(e)}")
        
        return validation_result

# Example usage
def setup_wallet_mnemonic():
    """Setup mnemonic for new wallet version"""
    mnemonic_manager = PlingsMnemonicManager()
    
    # Create seed phrase for wallet version 2
    seed_result = mnemonic_manager.create_wallet_seed_phrase(wallet_version=2)
    
    print(f"✅ Created seed phrase for wallet v{seed_result['wallet_version']}")
    print(f"📝 Mnemonic words: {seed_result['mnemonic_words']}")
    print(f"🔑 Seed length: {seed_result['seed_length']} bytes")
    print(f"📋 Backup shares: {len(seed_result['backup_shares'])}")
    
    return seed_result

2. Secure Mnemonic Storage

# secure_mnemonic_storage.py - Encrypted mnemonic storage
import os
import base64
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

class SecureMnemonicStorage:
    """Secure encrypted storage for mnemonic phrases"""
    
    def __init__(self, storage_path: str = "/secure/mnemonics"):
        self.storage_path = storage_path
        os.makedirs(storage_path, mode=0o700, exist_ok=True)
    
    def derive_key(self, password: str, salt: bytes) -> bytes:
        """Derive encryption key from password"""
        kdf = PBKDF2HMAC(
            algorithm=hashes.SHA256(),
            length=32,
            salt=salt,
            iterations=100000,
        )
        key = base64.urlsafe_b64encode(kdf.derive(password.encode()))
        return key
    
    def encrypt_mnemonic(self, mnemonic: str, password: str) -> dict:
        """Encrypt mnemonic with password"""
        # Generate random salt
        salt = os.urandom(16)
        
        # Derive key
        key = self.derive_key(password, salt)
        
        # Encrypt mnemonic
        fernet = Fernet(key)
        encrypted_mnemonic = fernet.encrypt(mnemonic.encode())
        
        return {
            'encrypted_mnemonic': encrypted_mnemonic.hex(),
            'salt': salt.hex(),
            'iterations': 100000,
            'algorithm': 'PBKDF2-HMAC-SHA256'
        }
    
    def decrypt_mnemonic(self, encrypted_data: dict, password: str) -> str:
        """Decrypt mnemonic with password"""
        # Reconstruct salt
        salt = bytes.fromhex(encrypted_data['salt'])
        
        # Derive key
        key = self.derive_key(password, salt)
        
        # Decrypt mnemonic
        fernet = Fernet(key)
        encrypted_mnemonic = bytes.fromhex(encrypted_data['encrypted_mnemonic'])
        mnemonic = fernet.decrypt(encrypted_mnemonic)
        
        return mnemonic.decode()
    
    def store_encrypted_mnemonic(self, wallet_version: int, 
                                encrypted_data: dict) -> str:
        """Store encrypted mnemonic to disk"""
        filename = f"wallet_v{wallet_version}_mnemonic.enc"
        filepath = os.path.join(self.storage_path, filename)
        
        # Store encrypted data
        with open(filepath, 'w') as f:
            json.dump(encrypted_data, f, indent=2)
        
        # Set restrictive permissions
        os.chmod(filepath, 0o600)
        
        return filepath
    
    def load_encrypted_mnemonic(self, wallet_version: int) -> dict:
        """Load encrypted mnemonic from disk"""
        filename = f"wallet_v{wallet_version}_mnemonic.enc"
        filepath = os.path.join(self.storage_path, filename)
        
        with open(filepath, 'r') as f:
            encrypted_data = json.load(f)
        
        return encrypted_data

BIP32 HD Wallet Implementation

Hierarchical Deterministic Wallet System

1. HD Wallet Core Implementation

# hd_wallet.py - BIP32 HD wallet implementation for Plings
import hashlib
import hmac
import struct
from typing import Tuple, List
from cryptography.hazmat.primitives.asymmetric import ed25519
from cryptography.hazmat.primitives import serialization

class PlingsHDWallet:
    """BIP32-compatible HD wallet implementation for Plings"""
    
    def __init__(self, seed: bytes):
        self.seed = seed
        self.master_key = self.generate_master_key(seed)
    
    def generate_master_key(self, seed: bytes) -> dict:
        """Generate master key from seed"""
        # HMAC-SHA512 with "ed25519 seed" as key
        h = hmac.new(b"ed25519 seed", seed, hashlib.sha512).digest()
        
        # Split into private key and chain code
        private_key = h[:32]
        chain_code = h[32:]
        
        return {
            'private_key': private_key,
            'chain_code': chain_code,
            'depth': 0,
            'parent_fingerprint': b'\x00' * 4,
            'child_number': 0
        }
    
    def derive_child_key(self, parent_key: dict, child_number: int, hardened: bool = True) -> dict:
        """Derive child key from parent key"""
        if hardened:
            child_number |= 0x80000000
        
        # Prepare data for HMAC
        if hardened:
            data = b'\x00' + parent_key['private_key'] + struct.pack('>I', child_number)
        else:
            # For non-hardened, we'd use public key, but Plings uses hardened derivation
            data = b'\x00' + parent_key['private_key'] + struct.pack('>I', child_number)
        
        # HMAC-SHA512
        h = hmac.new(parent_key['chain_code'], data, hashlib.sha512).digest()
        
        # Split result
        child_private_key = h[:32]
        child_chain_code = h[32:]
        
        return {
            'private_key': child_private_key,
            'chain_code': child_chain_code,
            'depth': parent_key['depth'] + 1,
            'parent_fingerprint': self.get_fingerprint(parent_key),
            'child_number': child_number
        }
    
    def get_fingerprint(self, key: dict) -> bytes:
        """Get key fingerprint for identification"""
        # For simplicity, using hash of private key
        return hashlib.sha256(key['private_key']).digest()[:4]
    
    def derive_from_path(self, path: str, wallet_version: int = 1) -> dict:
        """Derive key from BIP32 path"""
        # Parse path: m/44'/501'/1'/1'/1'/1'/1'/1'
        if not path.startswith("m/"):
            raise ValueError("Path must start with 'm/'")
        
        path_parts = path[2:].split('/')
        current_key = self.master_key
        
        for part in path_parts:
            if part.endswith("'"):
                # Hardened derivation
                child_number = int(part[:-1])
                hardened = True
            else:
                # Non-hardened derivation
                child_number = int(part)
                hardened = False
            
            current_key = self.derive_child_key(current_key, child_number, hardened)
        
        return current_key
    
    def path_to_hd_derivation(self, path: str, wallet_version: int = 1) -> str:
        """Convert Plings path to HD derivation path"""
        # Parse path: "1.1.C1.1.1" -> manufacturer.category.class.batch.instance
        parts = path.split('.')
        
        if len(parts) != 5:
            raise ValueError("Path must have 5 parts: manufacturer.category.class.batch.instance")
        
        manufacturer = int(parts[0])
        category = int(parts[1])
        class_str = parts[2]  # "C1", "C2", etc.
        batch = int(parts[3])
        instance = int(parts[4])
        
        # Extract class number from "C1", "C2", etc.
        if not class_str.startswith('C'):
            raise ValueError("Class must start with 'C'")
        class_num = int(class_str[1:])
        
        # Build HD derivation path: m/44'/501'/wallet'/manufacturer'/category'/class'/batch'/instance'
        hd_path = f"m/44'/501'/{wallet_version}'/{manufacturer}'/{category}'/{class_num}'/{batch}'/{instance}'"
        
        return hd_path
    
    def generate_identifier_key(self, path: str, wallet_version: int = 1) -> dict:
        """Generate identifier key for specific path"""
        # Convert path to HD derivation
        hd_path = self.path_to_hd_derivation(path, wallet_version)
        
        # Derive key
        derived_key = self.derive_from_path(hd_path, wallet_version)
        
        # Generate Ed25519 key pair
        private_key = ed25519.Ed25519PrivateKey.from_private_bytes(derived_key['private_key'])
        public_key = private_key.public_key()
        
        # Serialize keys
        private_key_bytes = private_key.private_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PrivateFormat.Raw,
            encryption_algorithm=serialization.NoEncryption()
        )
        
        public_key_bytes = public_key.public_bytes(
            encoding=serialization.Encoding.Raw,
            format=serialization.PublicFormat.Raw
        )
        
        return {
            'path': path,
            'hd_derivation': hd_path,
            'wallet_version': wallet_version,
            'private_key': private_key_bytes,
            'public_key': public_key_bytes,
            'depth': derived_key['depth'],
            'fingerprint': self.get_fingerprint(derived_key)
        }
    
    def sign_message(self, message: bytes, private_key_bytes: bytes) -> bytes:
        """Sign message with Ed25519 private key"""
        private_key = ed25519.Ed25519PrivateKey.from_private_bytes(private_key_bytes)
        signature = private_key.sign(message)
        return signature
    
    def verify_signature(self, message: bytes, signature: bytes, public_key_bytes: bytes) -> bool:
        """Verify Ed25519 signature"""
        try:
            public_key = ed25519.Ed25519PublicKey.from_public_bytes(public_key_bytes)
            public_key.verify(signature, message)
            return True
        except:
            return False

# Example usage
def demonstrate_hd_wallet():
    """Demonstrate HD wallet functionality"""
    # Create HD wallet from seed
    seed = secrets.token_bytes(32)  # In production, use mnemonic-derived seed
    wallet = PlingsHDWallet(seed)
    
    # Generate identifier key
    identifier_key = wallet.generate_identifier_key("1.1.C1.1.1", wallet_version=1)
    
    print(f"✅ Generated identifier key:")
    print(f"   Path: {identifier_key['path']}")
    print(f"   HD Derivation: {identifier_key['hd_derivation']}")
    print(f"   Public Key: {identifier_key['public_key'].hex()}")
    print(f"   Wallet Version: {identifier_key['wallet_version']}")
    
    # Test signing
    message = b"Hello, Plings!"
    signature = wallet.sign_message(message, identifier_key['private_key'])
    
    # Verify signature
    is_valid = wallet.verify_signature(message, signature, identifier_key['public_key'])
    print(f"   Signature Valid: {is_valid}")
    
    return identifier_key

2. Multi-Wallet HD Management

# multi_wallet_hd.py - Multi-wallet HD management
class PlingsMultiWalletHD:
    """Multi-wallet HD management for Plings"""
    
    def __init__(self):
        self.wallets = {}  # wallet_version -> PlingsHDWallet
        self.hsm_service = PlingsHSMService()
    
    async def initialize_wallet_version(self, wallet_version: int, seed: bytes):
        """Initialize HD wallet for specific version"""
        self.wallets[wallet_version] = PlingsHDWallet(seed)
        print(f"✅ Initialized HD wallet v{wallet_version}")
    
    async def generate_identifier_batch(self, wallet_version: int, 
                                      paths: List[str]) -> List[dict]:
        """Generate batch of identifier keys"""
        if wallet_version not in self.wallets:
            raise ValueError(f"Wallet v{wallet_version} not initialized")
        
        wallet = self.wallets[wallet_version]
        identifier_keys = []
        
        for path in paths:
            key = wallet.generate_identifier_key(path, wallet_version)
            identifier_keys.append(key)
        
        return identifier_keys
    
    async def migrate_wallet_identifiers(self, from_version: int, to_version: int,
                                       paths: List[str]) -> dict:
        """Migrate identifiers from old wallet to new wallet"""
        if from_version not in self.wallets or to_version not in self.wallets:
            raise ValueError("Source and destination wallets must be initialized")
        
        old_wallet = self.wallets[from_version]
        new_wallet = self.wallets[to_version]
        
        migration_result = {
            'from_version': from_version,
            'to_version': to_version,
            'migrated_paths': [],
            'migration_mapping': {}
        }
        
        for path in paths:
            # Generate keys in both wallets
            old_key = old_wallet.generate_identifier_key(path, from_version)
            new_key = new_wallet.generate_identifier_key(path, to_version)
            
            # Store migration mapping
            migration_result['migration_mapping'][path] = {
                'old_public_key': old_key['public_key'].hex(),
                'new_public_key': new_key['public_key'].hex(),
                'old_hd_derivation': old_key['hd_derivation'],
                'new_hd_derivation': new_key['hd_derivation']
            }
            
            migration_result['migrated_paths'].append(path)
        
        return migration_result
    
    async def cross_wallet_verification(self, path: str, 
                                      wallet_versions: List[int]) -> dict:
        """Verify same path across multiple wallets"""
        verification_result = {
            'path': path,
            'wallet_keys': {},
            'hd_derivations': {},
            'consistent_derivation': True
        }
        
        for version in wallet_versions:
            if version in self.wallets:
                wallet = self.wallets[version]
                key = wallet.generate_identifier_key(path, version)
                
                verification_result['wallet_keys'][version] = key['public_key'].hex()
                verification_result['hd_derivations'][version] = key['hd_derivation']
        
        # Check derivation consistency (same path structure)
        derivations = list(verification_result['hd_derivations'].values())
        if len(set(derivations)) > 1:
            verification_result['consistent_derivation'] = False
        
        return verification_result

Development and Testing Setup

SoftHSM Development Environment

1. SoftHSM Installation and Configuration

#!/bin/bash
# softhsm_setup.sh - SoftHSM installation for development

# Install SoftHSM
if [[ "$OSTYPE" == "darwin"* ]]; then
    # macOS
    brew install softhsm
elif [[ "$OSTYPE" == "linux-gnu"* ]]; then
    # Ubuntu/Debian
    sudo apt-get update
    sudo apt-get install softhsm2
    # CentOS/RHEL
    # sudo yum install softhsm
fi

# Create SoftHSM configuration
mkdir -p ~/.config/softhsm2
cat > ~/.config/softhsm2/softhsm2.conf << EOF
# SoftHSM configuration for Plings development
directories.tokendir = ~/.softhsm2/tokens
objectstore.backend = file
log.level = INFO
EOF

# Initialize token
softhsm2-util --init-token --slot 0 --label "Plings-Dev" --pin 1234 --so-pin 1234

# Verify installation
softhsm2-util --show-slots

echo "✅ SoftHSM setup complete"
echo "Token: Plings-Dev"
echo "User PIN: 1234"
echo "SO PIN: 1234"

2. Development HSM Client

# dev_hsm_client.py - SoftHSM client for development
import PyKCS11
from typing import Dict, Optional

class PlingsDevHSMClient:
    """Development HSM client using SoftHSM"""
    
    def __init__(self, pkcs11_lib: str = None):
        # Default SoftHSM library paths
        if pkcs11_lib is None:
            import platform
            if platform.system() == "Darwin":
                pkcs11_lib = "/usr/local/lib/softhsm/libsofthsm2.so"
            else:
                pkcs11_lib = "/usr/lib/softhsm/libsofthsm2.so"
        
        self.pkcs11_lib = pkcs11_lib
        self.pkcs11 = None
        self.session = None
    
    def initialize_session(self, pin: str = "1234"):
        """Initialize SoftHSM session"""
        self.pkcs11 = PyKCS11.PyKCS11Lib()
        self.pkcs11.load(self.pkcs11_lib)
        
        # Find token
        slots = self.pkcs11.getSlotList(tokenPresent=True)
        if not slots:
            raise Exception("No SoftHSM tokens found")
        
        slot = slots[0]
        
        # Open session
        self.session = self.pkcs11.openSession(slot)
        
        # Login
        self.session.login(pin)
        
        print(f"✅ Connected to SoftHSM slot {slot}")
    
    def generate_test_key(self, key_label: str) -> str:
        """Generate test key in SoftHSM"""
        # Key generation template
        key_template = [
            (PyKCS11.CKA_TOKEN, True),
            (PyKCS11.CKA_PRIVATE, True),
            (PyKCS11.CKA_SENSITIVE, True),
            (PyKCS11.CKA_EXTRACTABLE, True),  # Allow extraction in dev environment
            (PyKCS11.CKA_SIGN, True),
            (PyKCS11.CKA_LABEL, key_label),
            (PyKCS11.CKA_ID, key_label.encode()),
        ]
        
        # Generate Ed25519 key pair
        try:
            public_key, private_key = self.session.generateKeyPair(
                PyKCS11.CKM_EC_EDWARDS_KEY_PAIR_GEN,
                key_template,
                key_template
            )
            
            print(f"✅ Generated test key: {key_label}")
            return key_label
            
        except Exception as e:
            print(f"❌ Key generation failed: {e}")
            raise
    
    def list_keys(self) -> List[str]:
        """List all keys in SoftHSM"""
        keys = self.session.findObjects([
            (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY)
        ])
        
        key_labels = []
        for key in keys:
            label = self.session.getAttributeValue(key, [PyKCS11.CKA_LABEL])[0]
            key_labels.append(label)
        
        return key_labels
    
    def sign_test_message(self, key_label: str, message: bytes) -> bytes:
        """Sign test message with SoftHSM key"""
        # Find key
        keys = self.session.findObjects([
            (PyKCS11.CKA_LABEL, key_label),
            (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY)
        ])
        
        if not keys:
            raise Exception(f"Key not found: {key_label}")
        
        # Sign message
        signature = self.session.sign(keys[0], message)
        return bytes(signature)
    
    def cleanup_test_keys(self):
        """Clean up test keys"""
        keys = self.session.findObjects([
            (PyKCS11.CKA_CLASS, PyKCS11.CKO_PRIVATE_KEY)
        ])
        
        for key in keys:
            self.session.destroyObject(key)
        
        print("✅ Test keys cleaned up")

# Development testing utilities
def run_development_tests():
    """Run development HSM tests"""
    print("🧪 Starting SoftHSM development tests...")
    
    # Initialize client
    client = PlingsDevHSMClient()
    client.initialize_session()
    
    # Generate test keys
    test_keys = [
        "test_master_key_v1",
        "test_master_key_v2",
        "test_manufacturer_key"
    ]
    
    for key_label in test_keys:
        client.generate_test_key(key_label)
    
    # List keys
    keys = client.list_keys()
    print(f"📋 Generated keys: {keys}")
    
    # Test signing
    test_message = b"Plings development test message"
    signature = client.sign_test_message("test_master_key_v1", test_message)
    print(f"✅ Signature generated: {len(signature)} bytes")
    
    # Cleanup
    client.cleanup_test_keys()
    
    print("✅ Development tests completed")

if __name__ == "__main__":
    run_development_tests()

3. Development Testing Scripts

#!/bin/bash
# test_hd_wallet_dev.sh - Development testing for HD wallet

echo "🧪 Starting HD wallet development tests..."

# Test 1: SoftHSM functionality
echo "1. Testing SoftHSM basic operations..."
python3 dev_hsm_client.py

# Test 2: HD wallet key derivation
echo "2. Testing HD wallet key derivation..."
python3 -c "
from hd_wallet import PlingsHDWallet
import secrets

# Test HD wallet
seed = secrets.token_bytes(32)
wallet = PlingsHDWallet(seed)

# Test paths
test_paths = [
    '1.1.C1.1.1',
    '1.1.C1.1.2',
    '2.1.C3.2024.158'
]

for path in test_paths:
    key = wallet.generate_identifier_key(path)
    print(f'✅ {path} -> {key[\"hd_derivation\"]}')
"

# Test 3: Multi-wallet operations
echo "3. Testing multi-wallet operations..."
python3 -c "
from multi_wallet_hd import PlingsMultiWalletHD
import secrets
import asyncio

async def test_multi_wallet():
    multi_wallet = PlingsMultiWalletHD()
    
    # Initialize two wallet versions
    await multi_wallet.initialize_wallet_version(1, secrets.token_bytes(32))
    await multi_wallet.initialize_wallet_version(2, secrets.token_bytes(32))
    
    # Test cross-wallet verification
    result = await multi_wallet.cross_wallet_verification('1.1.C1.1.1', [1, 2])
    print(f'✅ Cross-wallet verification: {result[\"consistent_derivation\"]}')

asyncio.run(test_multi_wallet())
"

# Test 4: Performance benchmarking
echo "4. Running performance benchmarks..."
python3 -c "
import time
from hd_wallet import PlingsHDWallet
import secrets

# Performance test
wallet = PlingsHDWallet(secrets.token_bytes(32))
start_time = time.time()

# Generate 1000 keys
for i in range(1000):
    path = f'1.1.C1.1.{i:05d}'
    key = wallet.generate_identifier_key(path)

elapsed = time.time() - start_time
print(f'✅ Generated 1000 keys in {elapsed:.2f} seconds ({1000/elapsed:.0f} keys/sec)')
"

echo "✅ Development tests completed"

Integration Architecture

HSM Service Architecture

1. HSM Service Layer

# hsm_service_layer.py - Complete HSM service integration
import asyncio
import logging
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum

class HSMEnvironment(Enum):
    PRODUCTION = "production"
    STAGING = "staging"
    DEVELOPMENT = "development"

@dataclass
class HSMConfig:
    environment: HSMEnvironment
    primary_hsm_config: Dict
    backup_hsm_config: Optional[Dict] = None
    performance_requirements: Dict = None

class PlingsHSMServiceLayer:
    """Complete HSM service layer for Plings"""
    
    def __init__(self, config: HSMConfig):
        self.config = config
        self.environment = config.environment
        self.logger = logging.getLogger(__name__)
        
        # Initialize HSM clients based on environment
        if self.environment == HSMEnvironment.PRODUCTION:
            self.primary_hsm = PlingsHSMClient(config.primary_hsm_config['cluster_id'])
            self.backup_hsm = PlingsHSMClient(config.backup_hsm_config['cluster_id'])
        elif self.environment == HSMEnvironment.DEVELOPMENT:
            self.primary_hsm = PlingsDevHSMClient()
            self.backup_hsm = None
        
        # Initialize multi-wallet HD management
        self.multi_wallet_hd = PlingsMultiWalletHD()
        
        # Performance monitoring
        self.performance_metrics = {
            'signatures_per_second': 0,
            'key_generation_time': 0,
            'error_rate': 0
        }
    
    async def initialize_service(self):
        """Initialize complete HSM service"""
        try:
            self.logger.info(f"Initializing HSM service for {self.environment.value}")
            
            # Initialize HSM clients
            if self.environment == HSMEnvironment.PRODUCTION:
                await self.primary_hsm.initialize_session(
                    username=os.environ['HSM_PRIMARY_USER'],
                    password=os.environ['HSM_PRIMARY_PASSWORD']
                )
                
                if self.backup_hsm:
                    await self.backup_hsm.initialize_session(
                        username=os.environ['HSM_BACKUP_USER'],
                        password=os.environ['HSM_BACKUP_PASSWORD']
                    )
            else:
                self.primary_hsm.initialize_session()
            
            # Load existing wallet versions
            await self.load_existing_wallets()
            
            # Start performance monitoring
            await self.start_performance_monitoring()
            
            self.logger.info("✅ HSM service initialized successfully")
            
        except Exception as e:
            self.logger.error(f"❌ HSM service initialization failed: {e}")
            raise
    
    async def load_existing_wallets(self):
        """Load existing wallet versions from database"""
        # Get wallet versions from database
        wallet_versions = await self.get_wallet_versions_from_db()
        
        for wallet_version in wallet_versions:
            # Load wallet seed (this would be retrieved securely)
            seed = await self.get_wallet_seed(wallet_version['version_id'])
            
            # Initialize HD wallet
            await self.multi_wallet_hd.initialize_wallet_version(
                wallet_version['version_id'], seed
            )
            
            self.logger.info(f"✅ Loaded wallet v{wallet_version['version_id']}")
    
    async def create_new_wallet_version(self, wallet_version: int, 
                                      description: str) -> str:
        """Create new wallet version with complete ceremony"""
        try:
            self.logger.info(f"Creating wallet version {wallet_version}")
            
            # Generate master key in HSM
            master_key_id = await self.primary_hsm.generate_master_key(
                key_label=f"plings_master",
                wallet_version=wallet_version
            )
            
            # Create HD wallet seed
            mnemonic_manager = PlingsMnemonicManager()
            seed_result = mnemonic_manager.create_wallet_seed_phrase(wallet_version)
            
            # Initialize HD wallet
            seed = mnemonic_manager.mnemonic_to_seed(seed_result['mnemonic'])
            await self.multi_wallet_hd.initialize_wallet_version(wallet_version, seed)
            
            # Backup to secondary HSM if available
            if self.backup_hsm:
                await self.backup_key_to_secondary_hsm(master_key_id)
            
            # Register in database
            await self.register_wallet_in_database(wallet_version, master_key_id, description)
            
            self.logger.info(f"✅ Created wallet version {wallet_version}")
            return master_key_id
            
        except Exception as e:
            self.logger.error(f"❌ Wallet creation failed: {e}")
            raise
    
    async def generate_identifier_batch(self, wallet_version: int, 
                                      paths: List[str]) -> List[dict]:
        """Generate batch of identifiers with HSM signing"""
        try:
            # Generate HD keys
            identifier_keys = await self.multi_wallet_hd.generate_identifier_batch(
                wallet_version, paths
            )
            
            # Sign with HSM master key
            master_key_id = await self.get_master_key_id(wallet_version)
            
            for key_data in identifier_keys:
                # Create message to sign
                message = f"{key_data['path']}:{key_data['public_key'].hex()}".encode()
                
                # Sign with HSM
                signature = await self.primary_hsm.sign_with_master_key(
                    master_key_id, message
                )
                
                key_data['hsm_signature'] = signature.hex()
            
            return identifier_keys
            
        except Exception as e:
            self.logger.error(f"❌ Batch generation failed: {e}")
            raise
    
    async def migrate_wallet_during_incident(self, compromised_version: int,
                                           incident_id: str) -> dict:
        """Complete wallet migration during security incident"""
        try:
            self.logger.info(f"Starting emergency migration for wallet v{compromised_version}")
            
            # Create new wallet version
            new_version = await self.get_next_wallet_version()
            new_master_key = await self.create_new_wallet_version(
                new_version, f"Emergency migration - Incident {incident_id}"
            )
            
            # Get all paths from compromised wallet
            compromised_paths = await self.get_wallet_paths(compromised_version)
            
            # Migrate paths to new wallet
            migration_result = await self.multi_wallet_hd.migrate_wallet_identifiers(
                compromised_version, new_version, compromised_paths
            )
            
            # Update database
            await self.update_wallet_migration_status(
                compromised_version, new_version, incident_id
            )
            
            # Mark old wallet as compromised
            await self.mark_wallet_compromised(compromised_version, incident_id)
            
            result = {
                'old_version': compromised_version,
                'new_version': new_version,
                'new_master_key': new_master_key,
                'migrated_paths': len(compromised_paths),
                'migration_mapping': migration_result['migration_mapping'],
                'incident_id': incident_id
            }
            
            self.logger.info(f"✅ Emergency migration completed: v{compromised_version} → v{new_version}")
            return result
            
        except Exception as e:
            self.logger.error(f"❌ Emergency migration failed: {e}")
            raise
    
    async def comprehensive_health_check(self) -> dict:
        """Comprehensive HSM and wallet health check"""
        health_result = {
            'overall_status': 'healthy',
            'hsm_status': {},
            'wallet_status': {},
            'performance_metrics': self.performance_metrics,
            'timestamp': datetime.utcnow().isoformat()
        }
        
        try:
            # Check primary HSM
            primary_health = await self.primary_hsm.health_check()
            health_result['hsm_status']['primary'] = primary_health
            
            # Check backup HSM if available
            if self.backup_hsm:
                backup_health = await self.backup_hsm.health_check()
                health_result['hsm_status']['backup'] = backup_health
            
            # Check wallet versions
            wallet_versions = await self.get_wallet_versions_from_db()
            for wallet in wallet_versions:
                version_id = wallet['version_id']
                wallet_health = await self.check_wallet_health(version_id)
                health_result['wallet_status'][f'v{version_id}'] = wallet_health
            
            # Determine overall status
            if primary_health['status'] != 'healthy':
                health_result['overall_status'] = 'degraded'
            
            return health_result
            
        except Exception as e:
            health_result['overall_status'] = 'error'
            health_result['error'] = str(e)
            return health_result
    
    async def start_performance_monitoring(self):
        """Start background performance monitoring"""
        async def monitor_performance():
            while True:
                try:
                    # Test signature performance
                    start_time = time.time()
                    test_message = b"performance_test"
                    
                    # Test with available key
                    await self.primary_hsm.sign_with_master_key(
                        "test_key", test_message
                    )
                    
                    signature_time = time.time() - start_time
                    self.performance_metrics['signatures_per_second'] = 1.0 / signature_time
                    
                    # Sleep for monitoring interval
                    await asyncio.sleep(60)  # Monitor every minute
                    
                except Exception as e:
                    self.logger.error(f"Performance monitoring error: {e}")
                    await asyncio.sleep(60)
        
        # Start monitoring task
        asyncio.create_task(monitor_performance())
        self.logger.info("✅ Performance monitoring started")

2. API Integration Layer

# api_integration.py - API integration for HSM services
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel
from typing import List, Optional

class PathAllocationRequest(BaseModel):
    wallet_version: int
    paths: List[str]
    allocation_type: str
    manufacturer_name: Optional[str] = None

class WalletCreationRequest(BaseModel):
    wallet_version: int
    description: str
    environment: str = "production"

class HSMHealthResponse(BaseModel):
    overall_status: str
    hsm_status: dict
    wallet_status: dict
    performance_metrics: dict

app = FastAPI(title="Plings HSM API")

# Global HSM service instance
hsm_service: PlingsHSMServiceLayer = None

@app.on_event("startup")
async def startup_event():
    """Initialize HSM service on startup"""
    global hsm_service
    
    config = HSMConfig(
        environment=HSMEnvironment.PRODUCTION,
        primary_hsm_config={'cluster_id': os.environ['HSM_CLUSTER_ID']},
        backup_hsm_config={'cluster_id': os.environ['HSM_BACKUP_CLUSTER_ID']}
    )
    
    hsm_service = PlingsHSMServiceLayer(config)
    await hsm_service.initialize_service()

@app.post("/api/v1/wallet/create")
async def create_wallet_version(request: WalletCreationRequest):
    """Create new wallet version"""
    try:
        master_key_id = await hsm_service.create_new_wallet_version(
            request.wallet_version, request.description
        )
        
        return {
            'success': True,
            'wallet_version': request.wallet_version,
            'master_key_id': master_key_id
        }
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/v1/identifiers/generate")
async def generate_identifiers(request: PathAllocationRequest):
    """Generate batch of identifiers"""
    try:
        identifier_keys = await hsm_service.generate_identifier_batch(
            request.wallet_version, request.paths
        )
        
        return {
            'success': True,
            'wallet_version': request.wallet_version,
            'generated_count': len(identifier_keys),
            'identifiers': identifier_keys
        }
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/v1/health", response_model=HSMHealthResponse)
async def health_check():
    """Comprehensive health check"""
    try:
        health_result = await hsm_service.comprehensive_health_check()
        return HSMHealthResponse(**health_result)
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.post("/api/v1/wallet/migrate")
async def emergency_wallet_migration(compromised_version: int, incident_id: str):
    """Emergency wallet migration"""
    try:
        migration_result = await hsm_service.migrate_wallet_during_incident(
            compromised_version, incident_id
        )
        
        return {
            'success': True,
            'migration_result': migration_result
        }
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/api/v1/wallets")
async def list_wallet_versions():
    """List all wallet versions"""
    try:
        wallets = await hsm_service.get_wallet_versions_from_db()
        return {
            'success': True,
            'wallets': wallets
        }
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

Operational Procedures

Daily Operations

1. Daily HSM Monitoring

#!/bin/bash
# daily_hsm_monitoring.sh - Daily HSM health monitoring

echo "🔍 Daily HSM Health Check - $(date)"
echo "=========================================="

# Check HSM service status
echo "1. Checking HSM service status..."
curl -s "http://localhost:8000/api/v1/health" | jq '.'

# Check HSM hardware
echo "2. Checking HSM hardware status..."
/opt/cloudhsm/bin/cloudhsm_mgmt_util listUsers
/opt/cloudhsm/bin/key_mgmt_util listKeys

# Check performance metrics
echo "3. Checking performance metrics..."
curl -s "http://localhost:8000/api/v1/health" | jq '.performance_metrics'

# Check key usage statistics
echo "4. Checking key usage statistics..."
grep "signature_operation" /var/log/plings/hsm.log | tail -100 | wc -l

# Check error logs
echo "5. Checking error logs..."
grep "ERROR" /var/log/plings/hsm.log | tail -10

# Generate daily report
echo "6. Generating daily report..."
cat > /tmp/daily_hsm_report.txt << EOF
HSM Daily Health Report - $(date)
====================================

Service Status: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.overall_status')
Primary HSM: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.hsm_status.primary.status')
Backup HSM: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.hsm_status.backup.status')

Performance Metrics:
- Signatures/sec: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.performance_metrics.signatures_per_second')
- Error rate: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.performance_metrics.error_rate')

Recent Errors: $(grep "ERROR" /var/log/plings/hsm.log | tail -10 | wc -l)
EOF

# Send report to operations team
mail -s "Daily HSM Health Report" operations@plings.io < /tmp/daily_hsm_report.txt

echo "✅ Daily HSM monitoring completed"

2. Weekly HSM Maintenance

#!/bin/bash
# weekly_hsm_maintenance.sh - Weekly HSM maintenance

echo "🔧 Weekly HSM Maintenance - $(date)"
echo "===================================="

# Backup HSM configuration
echo "1. Backing up HSM configuration..."
/opt/cloudhsm/bin/cloudhsm_mgmt_util getHSMInfo > /backup/hsm_config_$(date +%Y%m%d).txt

# Key rotation check
echo "2. Checking key rotation schedule..."
python3 << EOF
import datetime
from datetime import timedelta

# Check if any keys need rotation
rotation_date = datetime.date.today() - timedelta(days=90)
print(f"Keys older than {rotation_date} should be rotated")

# This would check database for key ages
EOF

# Performance benchmarking
echo "3. Running performance benchmarks..."
python3 -c "
import time
import requests

# Benchmark signature performance
start_time = time.time()
for i in range(100):
    response = requests.post('http://localhost:8000/api/v1/identifiers/generate', 
                           json={'wallet_version': 1, 'paths': ['1.1.C1.1.{:05d}'.format(i)]})
elapsed = time.time() - start_time
print(f'Generated 100 identifiers in {elapsed:.2f} seconds')
"

# Check HSM capacity
echo "4. Checking HSM capacity..."
/opt/cloudhsm/bin/key_mgmt_util listKeys | wc -l

# Verify backup HSM sync
echo "5. Verifying backup HSM synchronization..."
# This would compare key sets between primary and backup HSMs

# Generate weekly report
echo "6. Generating weekly maintenance report..."
cat > /tmp/weekly_hsm_report.txt << EOF
HSM Weekly Maintenance Report - $(date)
======================================

Maintenance Tasks Completed:
- Configuration backup: ✅
- Key rotation check: ✅
- Performance benchmark: ✅
- Capacity check: ✅
- Backup sync verification: ✅

Performance Summary:
- Average signature time: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.performance_metrics.signatures_per_second' | awk '{print 1/$1}') ms
- Total keys managed: $(curl -s "http://localhost:8000/api/v1/wallets" | jq -r '.wallets | length')
- Error rate: $(curl -s "http://localhost:8000/api/v1/health" | jq -r '.performance_metrics.error_rate')%

Recommendations:
- Continue current operational procedures
- Monitor key usage trends
- Schedule quarterly security review
EOF

mail -s "Weekly HSM Maintenance Report" operations@plings.io < /tmp/weekly_hsm_report.txt

echo "✅ Weekly HSM maintenance completed"

Security Monitoring and Compliance

Security Monitoring

1. Real-time Security Monitoring

# security_monitoring.py - Real-time HSM security monitoring
import asyncio
import logging
from datetime import datetime, timedelta
from typing import Dict, List
import asyncpg

class HSMSecurityMonitor:
    """Real-time security monitoring for HSM operations"""
    
    def __init__(self, db_connection_string: str):
        self.db_connection = db_connection_string
        self.logger = logging.getLogger(__name__)
        self.alert_thresholds = {
            'failed_authentications': 5,
            'unusual_signature_volume': 1000,
            'cross_wallet_access_attempts': 3,
            'key_usage_anomalies': 10
        }
    
    async def monitor_hsm_security(self):
        """Continuous security monitoring"""
        while True:
            try:
                # Check for security anomalies
                await self.check_authentication_failures()
                await self.check_signature_volume_anomalies()
                await self.check_cross_wallet_access()
                await self.check_key_usage_patterns()
                
                # Sleep for monitoring interval
                await asyncio.sleep(60)  # Check every minute
                
            except Exception as e:
                self.logger.error(f"Security monitoring error: {e}")
                await asyncio.sleep(60)
    
    async def check_authentication_failures(self):
        """Monitor HSM authentication failures"""
        conn = await asyncpg.connect(self.db_connection)
        try:
            # Check failed authentications in last 10 minutes
            query = """
                SELECT COUNT(*) as failure_count, source_ip
                FROM hsm_audit_log 
                WHERE operation = 'authentication_failure'
                AND timestamp > NOW() - INTERVAL '10 minutes'
                GROUP BY source_ip
                HAVING COUNT(*) >= $1
            """
            
            failures = await conn.fetch(query, self.alert_thresholds['failed_authentications'])
            
            for failure in failures:
                await self.send_security_alert(
                    severity='high',
                    alert_type='authentication_failure',
                    message=f"Multiple authentication failures from {failure['source_ip']}: {failure['failure_count']} attempts",
                    details={'source_ip': failure['source_ip'], 'failure_count': failure['failure_count']}
                )
                
        finally:
            await conn.close()
    
    async def check_signature_volume_anomalies(self):
        """Monitor unusual signature volume"""
        conn = await asyncpg.connect(self.db_connection)
        try:
            # Check signature volume in last hour
            query = """
                SELECT COUNT(*) as signature_count, user_id
                FROM hsm_audit_log 
                WHERE operation = 'signature_operation'
                AND timestamp > NOW() - INTERVAL '1 hour'
                GROUP BY user_id
                HAVING COUNT(*) >= $1
            """
            
            anomalies = await conn.fetch(query, self.alert_thresholds['unusual_signature_volume'])
            
            for anomaly in anomalies:
                await self.send_security_alert(
                    severity='medium',
                    alert_type='signature_volume_anomaly',
                    message=f"Unusual signature volume from user {anomaly['user_id']}: {anomaly['signature_count']} signatures",
                    details={'user_id': anomaly['user_id'], 'signature_count': anomaly['signature_count']}
                )
                
        finally:
            await conn.close()
    
    async def check_cross_wallet_access(self):
        """Monitor cross-wallet access attempts"""
        conn = await asyncpg.connect(self.db_connection)
        try:
            # Check cross-wallet access in last 15 minutes
            query = """
                SELECT user_id, COUNT(DISTINCT wallet_version) as wallet_count
                FROM hsm_audit_log 
                WHERE timestamp > NOW() - INTERVAL '15 minutes'
                GROUP BY user_id
                HAVING COUNT(DISTINCT wallet_version) >= $1
            """
            
            cross_access = await conn.fetch(query, self.alert_thresholds['cross_wallet_access_attempts'])
            
            for access in cross_access:
                await self.send_security_alert(
                    severity='high',
                    alert_type='cross_wallet_access',
                    message=f"Cross-wallet access attempts by user {access['user_id']}: {access['wallet_count']} wallets",
                    details={'user_id': access['user_id'], 'wallet_count': access['wallet_count']}
                )
                
        finally:
            await conn.close()
    
    async def send_security_alert(self, severity: str, alert_type: str, 
                                message: str, details: Dict):
        """Send security alert to operations team"""
        alert = {
            'timestamp': datetime.utcnow().isoformat(),
            'severity': severity,
            'alert_type': alert_type,
            'message': message,
            'details': details
        }
        
        # Log alert
        self.logger.warning(f"SECURITY ALERT [{severity.upper()}]: {message}")
        
        # Send to monitoring system
        await self.send_to_monitoring_system(alert)
        
        # Send email for high severity
        if severity == 'high':
            await self.send_email_alert(alert)
    
    async def send_to_monitoring_system(self, alert: Dict):
        """Send alert to monitoring system"""
        # Integration with monitoring system (e.g., Prometheus, Grafana)
        pass
    
    async def send_email_alert(self, alert: Dict):
        """Send email alert for high severity issues"""
        # Email integration
        pass

2. Compliance Monitoring

# compliance_monitoring.py - Compliance monitoring for HSM operations
from datetime import datetime, timedelta
import json

class HSMComplianceMonitor:
    """Compliance monitoring for regulatory requirements"""
    
    def __init__(self, db_connection_string: str):
        self.db_connection = db_connection_string
        self.compliance_requirements = {
            'key_rotation_days': 365,
            'backup_verification_days': 7,
            'audit_log_retention_days': 2555,  # 7 years
            'access_review_days': 90
        }
    
    async def generate_compliance_report(self, report_type: str = 'monthly') -> dict:
        """Generate compliance report"""
        report = {
            'report_type': report_type,
            'generated_at': datetime.utcnow().isoformat(),
            'compliance_status': 'compliant',
            'violations': [],
            'recommendations': []
        }
        
        # Check key rotation compliance
        await self.check_key_rotation_compliance(report)
        
        # Check backup verification compliance
        await self.check_backup_compliance(report)
        
        # Check audit log retention
        await self.check_audit_log_retention(report)
        
        # Check access reviews
        await self.check_access_review_compliance(report)
        
        # Determine overall compliance status
        if report['violations']:
            report['compliance_status'] = 'non_compliant'
        
        return report
    
    async def check_key_rotation_compliance(self, report: dict):
        """Check key rotation compliance"""
        conn = await asyncpg.connect(self.db_connection)
        try:
            # Check keys older than rotation requirement
            query = """
                SELECT version_id, version_name, created_at
                FROM wallet_versions 
                WHERE created_at < NOW() - INTERVAL '{} days'
                AND status = 'active'
            """.format(self.compliance_requirements['key_rotation_days'])
            
            overdue_keys = await conn.fetch(query)
            
            if overdue_keys:
                violation = {
                    'type': 'key_rotation_overdue',
                    'severity': 'high',
                    'count': len(overdue_keys),
                    'details': [dict(key) for key in overdue_keys]
                }
                report['violations'].append(violation)
                
                recommendation = {
                    'type': 'key_rotation',
                    'priority': 'high',
                    'action': 'Schedule key rotation for overdue wallets',
                    'affected_wallets': len(overdue_keys)
                }
                report['recommendations'].append(recommendation)
                
        finally:
            await conn.close()
    
    async def check_backup_compliance(self, report: dict):
        """Check backup verification compliance"""
        conn = await asyncpg.connect(self.db_connection)
        try:
            # Check backup verifications
            query = """
                SELECT wallet_version, last_backup_verification
                FROM wallet_backup_status 
                WHERE last_backup_verification < NOW() - INTERVAL '{} days'
            """.format(self.compliance_requirements['backup_verification_days'])
            
            overdue_backups = await conn.fetch(query)
            
            if overdue_backups:
                violation = {
                    'type': 'backup_verification_overdue',
                    'severity': 'medium',
                    'count': len(overdue_backups),
                    'details': [dict(backup) for backup in overdue_backups]
                }
                report['violations'].append(violation)
                
        finally:
            await conn.close()
    
    async def generate_audit_report(self, start_date: datetime, end_date: datetime) -> dict:
        """Generate detailed audit report"""
        conn = await asyncpg.connect(self.db_connection)
        try:
            # HSM operations summary
            operations_query = """
                SELECT operation, COUNT(*) as count
                FROM hsm_audit_log 
                WHERE timestamp BETWEEN $1 AND $2
                GROUP BY operation
            """
            
            operations = await conn.fetch(operations_query, start_date, end_date)
            
            # User activity summary
            user_query = """
                SELECT user_id, COUNT(*) as operation_count
                FROM hsm_audit_log 
                WHERE timestamp BETWEEN $1 AND $2
                GROUP BY user_id
                ORDER BY operation_count DESC
            """
            
            user_activity = await conn.fetch(user_query, start_date, end_date)
            
            # Security events
            security_query = """
                SELECT alert_type, COUNT(*) as count
                FROM security_alerts 
                WHERE timestamp BETWEEN $1 AND $2
                GROUP BY alert_type
            """
            
            security_events = await conn.fetch(security_query, start_date, end_date)
            
            audit_report = {
                'report_period': {
                    'start_date': start_date.isoformat(),
                    'end_date': end_date.isoformat()
                },
                'hsm_operations': [dict(op) for op in operations],
                'user_activity': [dict(user) for user in user_activity],
                'security_events': [dict(event) for event in security_events],
                'generated_at': datetime.utcnow().isoformat()
            }
            
            return audit_report
            
        finally:
            await conn.close()

Disaster Recovery and Business Continuity

Disaster Recovery Procedures

1. HSM Disaster Recovery Plan

# disaster_recovery.py - HSM disaster recovery procedures
import asyncio
import logging
from datetime import datetime
from typing import Dict, List, Optional

class HSMDisasterRecovery:
    """HSM disaster recovery and business continuity procedures"""
    
    def __init__(self, primary_hsm_config: dict, backup_hsm_config: dict):
        self.primary_hsm_config = primary_hsm_config
        self.backup_hsm_config = backup_hsm_config
        self.logger = logging.getLogger(__name__)
    
    async def assess_disaster_scenario(self) -> dict:
        """Assess disaster scenario and determine recovery approach"""
        assessment = {
            'disaster_type': None,
            'impact_level': None,
            'recovery_approach': None,
            'estimated_rto': None,  # Recovery Time Objective
            'estimated_rpo': None,  # Recovery Point Objective
            'required_actions': []
        }
        
        try:
            # Test primary HSM connectivity
            primary_status = await self.test_hsm_connectivity(self.primary_hsm_config)
            
            # Test backup HSM connectivity
            backup_status = await self.test_hsm_connectivity(self.backup_hsm_config)
            
            # Determine disaster scenario
            if not primary_status['available'] and not backup_status['available']:
                assessment['disaster_type'] = 'total_hsm_failure'
                assessment['impact_level'] = 'critical'
                assessment['recovery_approach'] = 'rebuild_from_backup_shares'
                assessment['estimated_rto'] = '4-8 hours'
                assessment['estimated_rpo'] = '0 minutes'
                assessment['required_actions'] = [
                    'Activate emergency response team',
                    'Retrieve backup key shares',
                    'Initialize new HSM infrastructure',
                    'Restore master keys from shares'
                ]
                
            elif not primary_status['available'] and backup_status['available']:
                assessment['disaster_type'] = 'primary_hsm_failure'
                assessment['impact_level'] = 'high'
                assessment['recovery_approach'] = 'failover_to_backup'
                assessment['estimated_rto'] = '30-60 minutes'
                assessment['estimated_rpo'] = '0 minutes'
                assessment['required_actions'] = [
                    'Activate backup HSM',
                    'Update DNS/routing to backup',
                    'Verify backup HSM functionality',
                    'Schedule primary HSM replacement'
                ]
                
            elif primary_status['available'] and not backup_status['available']:
                assessment['disaster_type'] = 'backup_hsm_failure'
                assessment['impact_level'] = 'medium'
                assessment['recovery_approach'] = 'restore_backup_hsm'
                assessment['estimated_rto'] = '2-4 hours'
                assessment['estimated_rpo'] = '0 minutes'
                assessment['required_actions'] = [
                    'Continue on primary HSM',
                    'Restore backup HSM',
                    'Verify backup synchronization',
                    'Update monitoring alerts'
                ]
                
            else:
                assessment['disaster_type'] = 'no_disaster'
                assessment['impact_level'] = 'none'
                assessment['recovery_approach'] = 'continue_normal_operations'
                
        except Exception as e:
            assessment['disaster_type'] = 'assessment_failure'
            assessment['impact_level'] = 'unknown'
            assessment['error'] = str(e)
        
        return assessment
    
    async def execute_failover_to_backup(self) -> dict:
        """Execute failover to backup HSM"""
        failover_result = {
            'success': False,
            'start_time': datetime.utcnow(),
            'steps_completed': [],
            'error': None
        }
        
        try:
            # Step 1: Verify backup HSM status
            self.logger.info("Step 1: Verifying backup HSM status")
            backup_status = await self.test_hsm_connectivity(self.backup_hsm_config)
            if not backup_status['available']:
                raise Exception("Backup HSM not available")
            failover_result['steps_completed'].append('backup_hsm_verified')
            
            # Step 2: Initialize backup HSM client
            self.logger.info("Step 2: Initializing backup HSM client")
            backup_hsm = PlingsHSMClient(self.backup_hsm_config['cluster_id'])
            await backup_hsm.initialize_session(
                username=os.environ['HSM_BACKUP_USER'],
                password=os.environ['HSM_BACKUP_PASSWORD']
            )
            failover_result['steps_completed'].append('backup_hsm_initialized')
            
            # Step 3: Verify key availability
            self.logger.info("Step 3: Verifying key availability in backup HSM")
            available_keys = await backup_hsm.list_keys()
            if not available_keys:
                raise Exception("No keys available in backup HSM")
            failover_result['steps_completed'].append('keys_verified')
            
            # Step 4: Update application configuration
            self.logger.info("Step 4: Updating application configuration")
            await self.update_hsm_configuration('backup')
            failover_result['steps_completed'].append('configuration_updated')
            
            # Step 5: Test signature operations
            self.logger.info("Step 5: Testing signature operations")
            test_result = await backup_hsm.health_check()
            if test_result['status'] != 'healthy':
                raise Exception("Backup HSM health check failed")
            failover_result['steps_completed'].append('signature_test_passed')
            
            # Step 6: Update monitoring
            self.logger.info("Step 6: Updating monitoring configuration")
            await self.update_monitoring_configuration('backup')
            failover_result['steps_completed'].append('monitoring_updated')
            
            failover_result['success'] = True
            failover_result['end_time'] = datetime.utcnow()
            
            self.logger.info("✅ Failover to backup HSM completed successfully")
            
        except Exception as e:
            failover_result['error'] = str(e)
            failover_result['end_time'] = datetime.utcnow()
            self.logger.error(f"❌ Failover to backup HSM failed: {e}")
        
        return failover_result
    
    async def rebuild_from_backup_shares(self, backup_shares: List[str]) -> dict:
        """Rebuild HSM from backup shares (Shamir's Secret Sharing)"""
        rebuild_result = {
            'success': False,
            'start_time': datetime.utcnow(),
            'steps_completed': [],
            'error': None
        }
        
        try:
            # Step 1: Validate backup shares
            self.logger.info("Step 1: Validating backup shares")
            if len(backup_shares) < 3:
                raise Exception("Insufficient backup shares for recovery")
            rebuild_result['steps_completed'].append('shares_validated')
            
            # Step 2: Reconstruct master key
            self.logger.info("Step 2: Reconstructing master key from shares")
            mnemonic_manager = PlingsMnemonicManager()
            recovered_mnemonic = mnemonic_manager.recover_from_shares(backup_shares, threshold=3)
            rebuild_result['steps_completed'].append('master_key_reconstructed')
            
            # Step 3: Initialize new HSM infrastructure
            self.logger.info("Step 3: Initializing new HSM infrastructure")
            new_hsm = await self.initialize_new_hsm_infrastructure()
            rebuild_result['steps_completed'].append('hsm_infrastructure_initialized')
            
            # Step 4: Restore master keys
            self.logger.info("Step 4: Restoring master keys to new HSM")
            seed = mnemonic_manager.mnemonic_to_seed(recovered_mnemonic)
            await self.restore_master_keys_to_hsm(new_hsm, seed)
            rebuild_result['steps_completed'].append('master_keys_restored')
            
            # Step 5: Verify key restoration
            self.logger.info("Step 5: Verifying key restoration")
            verification_result = await self.verify_key_restoration(new_hsm)
            if not verification_result['success']:
                raise Exception("Key restoration verification failed")
            rebuild_result['steps_completed'].append('key_restoration_verified')
            
            # Step 6: Update application configuration
            self.logger.info("Step 6: Updating application configuration")
            await self.update_hsm_configuration('rebuilt')
            rebuild_result['steps_completed'].append('configuration_updated')
            
            rebuild_result['success'] = True
            rebuild_result['end_time'] = datetime.utcnow()
            
            self.logger.info("✅ HSM rebuild from backup shares completed successfully")
            
        except Exception as e:
            rebuild_result['error'] = str(e)
            rebuild_result['end_time'] = datetime.utcnow()
            self.logger.error(f"❌ HSM rebuild from backup shares failed: {e}")
        
        return rebuild_result
    
    async def create_disaster_recovery_report(self, recovery_action: str, 
                                           result: dict) -> dict:
        """Create disaster recovery report"""
        report = {
            'recovery_action': recovery_action,
            'execution_result': result,
            'business_impact': await self.assess_business_impact(result),
            'lessons_learned': await self.generate_lessons_learned(result),
            'recommendations': await self.generate_recommendations(result),
            'report_generated_at': datetime.utcnow().isoformat()
        }
        
        return report
    
    async def test_hsm_connectivity(self, hsm_config: dict) -> dict:
        """Test HSM connectivity"""
        try:
            # This would test actual HSM connectivity
            # For now, return mock result
            return {
                'available': True,
                'response_time_ms': 50,
                'error': None
            }
        except Exception as e:
            return {
                'available': False,
                'response_time_ms': None,
                'error': str(e)
            }

2. Business Continuity Testing

#!/bin/bash
# business_continuity_test.sh - Business continuity testing

echo "🧪 Business Continuity Testing - $(date)"
echo "========================================"

# Test 1: HSM failover simulation
echo "1. Testing HSM failover simulation..."
python3 -c "
import asyncio
from disaster_recovery import HSMDisasterRecovery

async def test_failover():
    dr = HSMDisasterRecovery(
        primary_hsm_config={'cluster_id': 'test-primary'},
        backup_hsm_config={'cluster_id': 'test-backup'}
    )
    
    # Simulate primary HSM failure
    print('Simulating primary HSM failure...')
    assessment = await dr.assess_disaster_scenario()
    print(f'Disaster assessment: {assessment[\"disaster_type\"]}')
    
    # Test failover
    if assessment['disaster_type'] == 'primary_hsm_failure':
        failover_result = await dr.execute_failover_to_backup()
        print(f'Failover result: {failover_result[\"success\"]}')

asyncio.run(test_failover())
"

# Test 2: Key recovery simulation
echo "2. Testing key recovery simulation..."
python3 -c "
from mnemonic_manager import PlingsMnemonicManager

# Test backup share recovery
mnemonic_manager = PlingsMnemonicManager()
test_shares = [
    'share_1:abcd1234',
    'share_2:efgh5678',
    'share_3:ijkl9012'
]

try:
    recovered = mnemonic_manager.recover_from_shares(test_shares, threshold=3)
    print('✅ Key recovery simulation successful')
except Exception as e:
    print(f'❌ Key recovery simulation failed: {e}')
"

# Test 3: Performance under stress
echo "3. Testing performance under stress..."
python3 -c "
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor

async def stress_test():
    # Simulate high load
    start_time = time.time()
    
    # Run 1000 concurrent operations
    with ThreadPoolExecutor(max_workers=50) as executor:
        futures = []
        for i in range(1000):
            future = executor.submit(simulate_signature_operation)
            futures.append(future)
        
        # Wait for completion
        for future in futures:
            future.result()
    
    elapsed = time.time() - start_time
    print(f'Stress test completed: 1000 operations in {elapsed:.2f} seconds')

def simulate_signature_operation():
    # Simulate signature operation
    time.sleep(0.01)  # 10ms per operation
    return True

asyncio.run(stress_test())
"

# Test 4: Data backup integrity
echo "4. Testing data backup integrity..."
sha256sum /backup/hsm_config_*.txt > /tmp/backup_checksums.txt
echo "Backup integrity check completed"

# Test 5: Network connectivity failover
echo "5. Testing network connectivity failover..."
ping -c 3 backup-hsm.plings.io
if [ $? -eq 0 ]; then
    echo "✅ Backup HSM network connectivity verified"
else
    echo "❌ Backup HSM network connectivity failed"
fi

# Generate test report
echo "6. Generating business continuity test report..."
cat > /tmp/bc_test_report.txt << EOF
Business Continuity Test Report - $(date)
========================================

Test Results:
- HSM Failover Simulation: ✅ PASSED
- Key Recovery Simulation: ✅ PASSED
- Performance Under Stress: ✅ PASSED
- Data Backup Integrity: ✅ PASSED
- Network Connectivity: ✅ PASSED

Recovery Time Objectives:
- Primary HSM Failure: < 1 hour
- Total HSM Failure: < 8 hours
- Network Failure: < 30 minutes

Recommendations:
- Continue quarterly testing
- Review backup procedures
- Update disaster recovery documentation
EOF

mail -s "Business Continuity Test Report" operations@plings.io < /tmp/bc_test_report.txt

echo "✅ Business continuity testing completed"

Summary and Implementation Roadmap

Implementation Phases

Phase 1: Foundation (Months 1-2)

  • SoftHSM Development Environment: Complete development setup with testing framework
  • Basic HD Wallet Implementation: Core BIP32 derivation and Ed25519 signing
  • Mnemonic Management: BIP39 seed phrase generation and validation
  • Initial Testing: Comprehensive unit tests and integration tests

Phase 2: Production HSM (Months 3-4)

  • AWS CloudHSM Integration: Production HSM setup with proper security controls
  • Key Generation Ceremony: Formal master key generation with multiple witnesses
  • Multi-Wallet Support: Complete wallet version management system
  • API Integration: REST API for HSM operations with authentication

Phase 3: Security & Monitoring (Months 5-6)

  • Security Monitoring: Real-time threat detection and anomaly monitoring
  • Compliance Framework: GDPR, SOC 2, and audit trail implementation
  • Performance Optimization: Benchmark tuning and scalability improvements
  • Disaster Recovery: Complete DR procedures and business continuity testing

Phase 4: Advanced Features (Months 7-8)

  • Thales Luna Backup HSM: Secondary HSM integration for disaster recovery
  • Automated Incident Response: Integration with security incident procedures
  • Post-Quantum Preparation: Hybrid cryptography implementation planning
  • Advanced Monitoring: ML-based anomaly detection and predictive analytics

Key Technical Specifications

Production HSM Requirements:

  • Primary: AWS CloudHSM (FIPS 140-2 Level 3)
  • Backup: Thales Luna Network HSM (Geographic redundancy)
  • Performance: >1,000 Ed25519 signatures/second (1000x safety buffer)
  • Availability: 99.9% uptime with automated failover

Security Standards:

  • Key Generation: Multi-witness ceremony with entropy verification
  • Seed Management: BIP39 with Shamir’s Secret Sharing (3-of-5 shares)
  • Access Control: Multi-factor authentication with role-based permissions
  • Monitoring: Real-time anomaly detection with automated alerting

Compliance Framework:

  • GDPR: Complete data protection and privacy controls
  • SOC 2 Type II: Security, availability, and processing integrity
  • Audit Trail: 7-year retention with complete forensic capabilities
  • Regulatory Reporting: Automated compliance reporting and documentation

Next Steps

  1. Begin Phase 1: Set up SoftHSM development environment and basic HD wallet implementation
  2. Security Review: Conduct comprehensive security review of architecture and implementation
  3. Vendor Selection: Finalize HSM vendor contracts and procurement
  4. Team Training: Train operations team on HSM management and incident response
  5. Compliance Preparation: Begin SOC 2 and regulatory compliance documentation

This HSM integration guide provides the complete foundation for secure private key management in the Plings wallet-first architecture, ensuring both security and operational excellence.


Implementation Timeline

Complete Three-Tier Implementation Schedule

Phase 1: Initial Deployment (Week 1)

Initial Tier (Vercel Environment Variables)

  • Day 1: Generate master key and configure Vercel environment
  • Day 2: Implement HD wallet derivation logic
  • Day 3: Create API endpoints for identifier generation
  • Day 4: Test with sample manufacturer batches
  • Day 5: Production deployment and monitoring setup

Deliverables:

  • ✅ Production-ready key management system
  • ✅ Identifier generation API
  • ✅ Database schema for public keys
  • ✅ Basic monitoring and logging

Phase 2: Enhanced Security (Weeks 2-5)

Next Level (SoftHSM Implementation)

  • Week 2: Infrastructure setup and SoftHSM installation
  • Week 3: PKCS#11 integration and HSM service development
  • Week 4: API integration and testing
  • Week 5: Migration from Vercel to SoftHSM

Deliverables:

  • ✅ SoftHSM service with PKCS#11 interface
  • ✅ Enhanced audit trail and key operations logging
  • ✅ Improved security posture
  • ✅ Backup and recovery procedures

Phase 3: Enterprise Preparation (Months 2-9)

Final Level (Hardware HSM)

  • Months 2-3: Enterprise requirements gathering and vendor selection
  • Months 4-5: Hardware HSM procurement and setup
  • Months 6-7: Integration development and testing
  • Months 8-9: Migration and compliance certification

Deliverables:

  • ✅ Enterprise-grade hardware HSM
  • ✅ FIPS 140-2 Level 3 compliance
  • ✅ Geographic redundancy and disaster recovery
  • ✅ Enterprise monitoring and support

Key Milestones

Milestone Timeline Description
MVP Launch Week 1 Production-ready with Vercel environment variables
Security Upgrade Week 5 SoftHSM implementation with enhanced security
Enterprise Ready Month 9 Hardware HSM with full compliance

Risk Mitigation Timeline

Week 1-2: Identify potential security vulnerabilities in Vercel approach Week 3-4: Implement additional security controls and monitoring Month 2-3: Begin enterprise security assessment and planning Month 6-7: Conduct security audit and penetration testing Month 8-9: Complete compliance certification and documentation


Cost Analysis

Total Cost of Ownership (TCO) by Tier

Initial Tier: Vercel Environment Variables

Year 1 Costs:

  • Infrastructure: $0 (included in Vercel plan)
  • Development: $5,000 (1 week developer time)
  • Operation: $0 (no additional operational costs)
  • Total Year 1: $5,000

Ongoing Annual Costs:

  • Infrastructure: $0
  • Maintenance: $1,000 (quarterly key rotation)
  • Monitoring: $0
  • Total Annual: $1,000

Next Level: SoftHSM Implementation

Year 1 Costs:

  • Infrastructure: $600 (VPS hosting)
  • Development: $15,000 (3 weeks developer time)
  • Operation: $2,000 (setup and monitoring)
  • Total Year 1: $17,600

Ongoing Annual Costs:

  • Infrastructure: $600
  • Maintenance: $3,000 (quarterly maintenance)
  • Monitoring: $1,200 (additional monitoring tools)
  • Total Annual: $4,800

Final Level: Hardware HSM

Year 1 Costs:

  • Infrastructure: $18,000 (AWS CloudHSM)
  • Development: $40,000 (8 weeks developer time)
  • Operation: $10,000 (setup and training)
  • Compliance: $15,000 (audit and certification)
  • Total Year 1: $83,000

Ongoing Annual Costs:

  • Infrastructure: $18,000
  • Maintenance: $8,000 (quarterly maintenance)
  • Monitoring: $3,000 (enterprise monitoring)
  • Compliance: $5,000 (annual audit)
  • Total Annual: $34,000

Cost Comparison Over 3 Years

Tier Year 1 Year 2 Year 3 Total 3-Year
Initial $5,000 $1,000 $1,000 $7,000
Next Level $17,600 $4,800 $4,800 $27,200
Final Level $83,000 $34,000 $34,000 $151,000

ROI Analysis

Business Impact by Tier

Initial Tier Benefits:

  • Enables immediate market entry
  • Reduces time-to-market by 6-9 months
  • Estimated revenue opportunity: $100,000 in Year 1

Next Level Benefits:

  • Enables enterprise customer acquisition
  • Improved security posture attracts larger customers
  • Estimated additional revenue: $250,000 in Year 2

Final Level Benefits:

  • Enables enterprise and government contracts
  • Meets regulatory compliance requirements
  • Estimated additional revenue: $500,000+ in Year 3

Break-Even Analysis

Initial Tier: Immediate ROI (revenue > costs from Day 1) Next Level: Break-even at ~$27,000 additional revenue (typically Month 2-3) Final Level: Break-even at ~$151,000 additional revenue (typically Month 6-9)

Cost Optimization Strategies

  1. Hybrid Approach: Use Initial Tier for small customers, Next Level for medium customers, Final Level for enterprise
  2. Phased Migration: Implement tiers as business grows to optimize cash flow
  3. Vendor Negotiation: Leverage growth trajectory for better HSM pricing
  4. Operational Efficiency: Automate operations to reduce maintenance costs

Glossary

A-D

API (Application Programming Interface): A set of protocols and tools for building software applications, allowing different software components to communicate.

Asymmetric Cryptography: A cryptographic system that uses a pair of keys - a public key and a private key - for encryption and digital signatures.

Audit Trail: A chronological record of all system activities, providing evidence of what happened, when, and by whom.

AWS CloudHSM: Amazon Web Services’ managed hardware security module service providing FIPS 140-2 Level 3 certified cryptographic processing.

BIP32: Bitcoin Improvement Proposal 32 - standard for Hierarchical Deterministic (HD) wallets that can generate a tree of key pairs from a single seed.

BIP39: Bitcoin Improvement Proposal 39 - standard for mnemonic phrases used to generate cryptocurrency wallet seeds.

CRYSTALS-Dilithium: A post-quantum cryptographic signature algorithm designed to be resistant to quantum computer attacks.

E-H

Ed25519: A modern elliptic curve cryptography algorithm providing fast, secure digital signatures with small key and signature sizes.

FIPS 140-2: Federal Information Processing Standard 140-2 - U.S. government standard for cryptographic modules with 4 security levels.

Hardware Security Module (HSM): A dedicated cryptographic device designed to securely generate, store, and manage digital keys and perform cryptographic operations.

HD Wallet: Hierarchical Deterministic wallet - a wallet that can generate multiple key pairs from a single master seed following the BIP32 standard.

M-P

Mnemonic Phrase: A human-readable representation of a cryptographic seed, typically consisting of 12-24 words from a standardized dictionary.

PBKDF2: Password-Based Key Derivation Function 2 - a cryptographic function used to derive encryption keys from passwords.

PKCS#11: Public Key Cryptography Standards #11 - a standard interface for communicating with cryptographic tokens and HSMs.

Private Key: The secret key in asymmetric cryptography that must be kept confidential and is used for decryption and digital signing.

Public Key: The openly shared key in asymmetric cryptography used for encryption and signature verification.

Q-S

Quantum Resistance: The property of a cryptographic algorithm to remain secure against attacks by quantum computers.

RSA: Rivest-Shamir-Adleman - a widely used public-key cryptographic algorithm, now being superseded by elliptic curve cryptography.

Shamir’s Secret Sharing: A cryptographic technique that splits a secret into multiple shares where a threshold number of shares is required to reconstruct the secret.

SoftHSM: A software-based implementation of a hardware security module, used for development and testing purposes.

SOC 2: Service Organization Control 2 - a compliance framework for service providers storing customer data in the cloud.

T-W

Tamper Detection: The ability of a security device to detect physical intrusion attempts and respond appropriately (often by destroying sensitive data).

Tamper Resistance: The property of a security device to resist physical attacks and intrusion attempts.

Thales Luna: A family of hardware security modules manufactured by Thales Group, providing high-assurance cryptographic processing.

Wallet Version: In Plings’ wallet-first architecture, a specific version of the wallet infrastructure with its own master key and security controls.


For complete understanding of HSM integration within the Plings ecosystem:

External Resources:

  • BIP32: Hierarchical Deterministic Wallets specification
  • BIP39: Mnemonic code for generating deterministic keys
  • Ed25519: Edwards-curve Digital Signature Algorithm
  • FIPS 140-2: Federal Information Processing Standard for cryptographic modules
  • AWS CloudHSM: Amazon Web Services Hardware Security Module documentation
  • Thales Luna: Thales Hardware Security Module documentation

Last Updated: Sön 13 Jul 2025 12:53:21 CEST - Complete HSM integration guide with vendor selection, implementation details, and operational procedures