← Back to Main Documentation Core Systems Index

Plings Wallet Lifecycle Management - Operational Procedures

Created: Sön 13 Jul 2025 11:17:59 CEST
Document Version: 2.0 - Wallet-First Operational Framework
Review Schedule: Monthly (Next: August 2025)
Target Audience: Operations Teams, DevOps Engineers, System Administrators

Overview

The Plings Wallet Lifecycle Management system provides comprehensive operational procedures for managing the complete lifecycle of wallet versions in the wallet-first architecture. This includes wallet creation, deployment, monitoring, maintenance, migration, and retirement procedures that ensure continuous service availability and security.

Lifecycle Stages

1. Planning & Approval → 2. Creation & Testing → 3. Deployment & Activation
        ↓                           ↓                        ↓
8. Retirement & Cleanup ← 7. Migration & Transition ← 6. Monitoring & Maintenance
        ↑                           ↑                        ↑
        └── 5. Incident Response ← 4. Production Operations ←┘

Key Operational Principles

  1. Zero-Downtime Operations: All wallet lifecycle operations maintain service availability
  2. Automated Validation: Comprehensive testing at every lifecycle stage
  3. Audit Transparency: Complete operational audit trails for compliance
  4. Risk Mitigation: Proactive risk assessment and mitigation procedures
  5. Scalability Planning: Capacity planning and performance optimization

Wallet Creation and Deployment

1. Wallet Planning and Approval Process

1.1 Wallet Creation Request Process

# Wallet Creation Request Template
wallet_creation_request:
  metadata:
    request_id: "WCR-2025-07-13-001"
    requested_by: "operations@plings.io"
    request_date: "2025-07-13T11:17:59Z"
    priority: "standard" # standard, high, emergency
    
  wallet_specification:
    environment: "production" # production, testing, development
    purpose: "planned_key_rotation" # planned_rotation, capacity_expansion, security_upgrade, incident_response
    security_level: "standard" # standard, high, experimental
    estimated_lifespan: "12_months"
    
  business_justification:
    reason: "Annual planned key rotation for production environment"
    business_impact: "Maintains security best practices and regulatory compliance"
    risk_if_delayed: "Medium - delayed rotation increases security exposure"
    
  technical_requirements:
    hsm_requirements: "FIPS_140_2_Level_3"
    performance_requirements: "10000_signatures_per_second"
    storage_requirements: "encrypted_at_rest"
    backup_requirements: "geographic_distribution"
    
  approval_workflow:
    security_team_approval: "required"
    operations_team_approval: "required" 
    executive_approval: "required_for_production"
    compliance_review: "required_for_production"

1.2 Automated Approval Workflow

class WalletCreationWorkflow:
    """Automated workflow for wallet creation approval"""
    
    async def process_wallet_creation_request(self, request: WalletCreationRequest) -> ApprovalResult:
        """Process wallet creation request through approval workflow"""
        
        # Step 1: Automated validation
        validation_result = await self.validate_request(request)
        if not validation_result.success:
            return ApprovalResult(
                status="rejected",
                reason="validation_failed",
                details=validation_result.errors
            )
        
        # Step 2: Risk assessment
        risk_assessment = await self.assess_risk(request)
        if risk_assessment.risk_level == "high":
            request.approval_requirements.append("executive_approval")
        
        # Step 3: Resource availability check
        resource_check = await self.check_resource_availability(request)
        if not resource_check.sufficient:
            return ApprovalResult(
                status="pending_resources",
                estimated_availability=resource_check.next_available_slot
            )
        
        # Step 4: Approval routing
        approval_tasks = []
        for approver in request.approval_requirements:
            task = self.request_approval(approver, request)
            approval_tasks.append(task)
        
        # Step 5: Collect approvals
        approvals = await asyncio.gather(*approval_tasks)
        
        if all(approval.approved for approval in approvals):
            return ApprovalResult(
                status="approved",
                approval_id=f"WAP-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
                approvers=[a.approver for a in approvals],
                valid_until=datetime.utcnow() + timedelta(days=7)
            )
        else:
            return ApprovalResult(
                status="rejected",
                reason="insufficient_approvals",
                pending_approvers=[a.approver for a in approvals if not a.approved]
            )

2. Wallet Creation Procedure

2.1 HSM Key Generation

#!/bin/bash
# HSM Master Key Generation Script
# Executed in secure air-gapped environment

set -euo pipefail

WALLET_VERSION="${1:-}"
APPROVAL_ID="${2:-}"
WITNESS_COUNT="${3:-3}"

if [[ -z "$WALLET_VERSION" || -z "$APPROVAL_ID" ]]; then
    echo "Usage: $0 <wallet_version> <approval_id> [witness_count]"
    exit 1
fi

echo "=== Plings Wallet Master Key Generation ==="
echo "Wallet Version: $WALLET_VERSION"
echo "Approval ID: $APPROVAL_ID"
echo "Witnesses Required: $WITNESS_COUNT"
echo "Generation Time: $(date -u)"
echo

# Verify air-gapped environment
if ping -c 1 8.8.8.8 &> /dev/null; then
    echo "ERROR: Network connectivity detected. Key generation requires air-gapped environment."
    exit 1
fi

# Verify HSM availability
if ! ./hsm-status.sh --verify-ready; then
    echo "ERROR: HSM not ready for key generation"
    exit 1
fi

# Collect witness signatures
echo "Collecting witness signatures..."
for i in $(seq 1 $WITNESS_COUNT); do
    echo "Witness $i: Please provide your signature for approval $APPROVAL_ID"
    read -p "Witness $i Name: " witness_name
    read -s -p "Witness $i Signature: " witness_signature
    echo
    
    # Verify witness signature
    if ! ./verify-witness-signature.sh "$witness_name" "$witness_signature" "$APPROVAL_ID"; then
        echo "ERROR: Invalid witness signature for $witness_name"
        exit 1
    fi
    
    echo "Witness $i verified: $witness_name"
done

# Generate entropy from multiple sources
echo "Generating cryptographic entropy..."
HARDWARE_ENTROPY=$(./hardware-rng.sh --bytes=32)
QUANTUM_ENTROPY=$(./quantum-rng.sh --bytes=32)
ATMOSPHERIC_ENTROPY=$(./atmospheric-rng.sh --bytes=32)

# Combine entropy sources
COMBINED_ENTROPY=$(./combine-entropy.sh "$HARDWARE_ENTROPY" "$QUANTUM_ENTROPY" "$ATMOSPHERIC_ENTROPY")

# Generate master key in HSM
echo "Generating master key in HSM..."
HSM_KEY_ID="plings_master_key_v${WALLET_VERSION}"

./hsm-generate-key.sh \
    --key-id="$HSM_KEY_ID" \
    --algorithm="Ed25519" \
    --entropy="$COMBINED_ENTROPY" \
    --witnesses="$WITNESS_COUNT" \
    --backup-required

# Verify key generation
if ./hsm-verify-key.sh --key-id="$HSM_KEY_ID"; then
    echo "✅ Master key generated successfully: $HSM_KEY_ID"
else
    echo "❌ Master key generation verification failed"
    exit 1
fi

# Generate backup
echo "Creating secure backup..."
./hsm-backup-key.sh --key-id="$HSM_KEY_ID" --geographic-distribution

echo "=== Key Generation Complete ==="
echo "Key ID: $HSM_KEY_ID"
echo "Backup Locations: $(./hsm-list-backups.sh --key-id=$HSM_KEY_ID)"
echo "Next Step: Execute wallet deployment procedure"

2.2 Wallet Database Creation

-- Wallet creation procedure
-- Execute after successful HSM key generation

BEGIN;

-- Create new wallet version
INSERT INTO wallet_versions (
    version_id,
    version_name, 
    master_key_id,
    description,
    environment,
    security_level,
    status,
    created_at,
    is_default
) VALUES (
    :wallet_version_id,
    :wallet_version_name,
    :hsm_key_id,
    :description,
    :environment,
    :security_level,
    'created', -- Initial status
    NOW(),
    false -- Will be set to true during activation
);

-- Create operational audit record
INSERT INTO wallet_operations_log (
    operation_type,
    wallet_version,
    operation_status,
    initiated_by,
    approval_id,
    operation_details,
    timestamp
) VALUES (
    'wallet_creation',
    :wallet_version_id,
    'completed',
    :operator_id,
    :approval_id,
    jsonb_build_object(
        'hsm_key_id', :hsm_key_id,
        'environment', :environment,
        'security_level', :security_level,
        'witness_count', :witness_count
    ),
    NOW()
);

COMMIT;

3. Wallet Testing and Validation

3.1 Comprehensive Wallet Testing Suite

class WalletValidationSuite:
    """Comprehensive testing suite for new wallet versions"""
    
    async def run_full_validation(self, wallet_version: int) -> ValidationResult:
        """Execute complete wallet validation test suite"""
        
        test_results = {}
        
        # Test 1: HSM connectivity and key access
        test_results['hsm_connectivity'] = await self.test_hsm_connectivity(wallet_version)
        
        # Test 2: Cryptographic operations
        test_results['crypto_operations'] = await self.test_cryptographic_operations(wallet_version)
        
        # Test 3: Path derivation accuracy
        test_results['path_derivation'] = await self.test_path_derivation(wallet_version)
        
        # Test 4: Database integration
        test_results['database_integration'] = await self.test_database_integration(wallet_version)
        
        # Test 5: Performance baseline
        test_results['performance'] = await self.test_performance_baseline(wallet_version)
        
        # Test 6: Security controls
        test_results['security_controls'] = await self.test_security_controls(wallet_version)
        
        # Test 7: Backup and recovery
        test_results['backup_recovery'] = await self.test_backup_recovery(wallet_version)
        
        # Analyze results
        all_passed = all(result.success for result in test_results.values())
        critical_failures = [name for name, result in test_results.items() 
                           if not result.success and result.severity == 'critical']
        
        return ValidationResult(
            wallet_version=wallet_version,
            all_tests_passed=all_passed,
            test_results=test_results,
            critical_failures=critical_failures,
            validation_timestamp=datetime.utcnow(),
            ready_for_deployment=all_passed and len(critical_failures) == 0
        )
    
    async def test_cryptographic_operations(self, wallet_version: int) -> TestResult:
        """Test cryptographic operations for wallet version"""
        
        try:
            # Test 1: Basic signature generation
            test_signature = await self.wallet_service.generate_test_signature(wallet_version)
            
            # Test 2: Signature verification
            verification_result = await self.wallet_service.verify_test_signature(
                wallet_version, test_signature
            )
            
            # Test 3: Path key derivation
            test_paths = ["2.2.2.2.2", "2.3.2.7P.3Lz", "2.G.5j.2qQ.3L8z"]  # Class pointers separate
            derivation_results = []
            
            for path in test_paths:
                derived_key = await self.wallet_service.derive_path_key(wallet_version, path)
                derivation_results.append({
                    'path': path,
                    'derived_successfully': derived_key is not None,
                    'key_format_valid': self.validate_key_format(derived_key)
                })
            
            # Test 4: Performance under load
            performance_test = await self.test_signature_performance(wallet_version, count=1000)
            
            all_tests_passed = (
                verification_result.valid and
                all(r['derived_successfully'] and r['key_format_valid'] for r in derivation_results) and
                performance_test.average_time_ms < 10  # Sub-10ms signature generation
            )
            
            return TestResult(
                test_name='cryptographic_operations',
                success=all_tests_passed,
                severity='critical',
                details={
                    'signature_verification': verification_result,
                    'path_derivations': derivation_results,
                    'performance': performance_test
                }
            )
            
        except Exception as e:
            return TestResult(
                test_name='cryptographic_operations',
                success=False,
                severity='critical',
                error=str(e)
            )

3.2 Performance Baseline Establishment

#!/bin/bash
# Wallet Performance Baseline Testing
# Establish performance baselines for new wallet version

WALLET_VERSION="${1:-}"
TEST_DURATION="${2:-300}" # 5 minutes default
CONCURRENCY="${3:-10}"

if [[ -z "$WALLET_VERSION" ]]; then
    echo "Usage: $0 <wallet_version> [test_duration_seconds] [concurrency_level]"
    exit 1
fi

echo "=== Wallet Performance Baseline Testing ==="
echo "Wallet Version: $WALLET_VERSION"
echo "Test Duration: ${TEST_DURATION}s"
echo "Concurrency: $CONCURRENCY"
echo "Start Time: $(date)"
echo

# Create test results directory
RESULTS_DIR="./test-results/wallet-v${WALLET_VERSION}-$(date +%Y%m%d-%H%M%S)"
mkdir -p "$RESULTS_DIR"

# Test 1: Signature Generation Performance
echo "Testing signature generation performance..."
./performance-test-signatures.sh \
    --wallet="$WALLET_VERSION" \
    --duration="$TEST_DURATION" \
    --concurrency="$CONCURRENCY" \
    --output="$RESULTS_DIR/signature_performance.json"

# Test 2: Path Derivation Performance  
echo "Testing path derivation performance..."
./performance-test-derivation.sh \
    --wallet="$WALLET_VERSION" \
    --duration="$TEST_DURATION" \
    --concurrency="$CONCURRENCY" \
    --output="$RESULTS_DIR/derivation_performance.json"

# Test 3: Database Query Performance
echo "Testing database query performance..."
./performance-test-database.sh \
    --wallet="$WALLET_VERSION" \
    --duration="$TEST_DURATION" \
    --output="$RESULTS_DIR/database_performance.json"

# Test 4: End-to-End Allocation Performance
echo "Testing end-to-end allocation performance..."
./performance-test-allocation.sh \
    --wallet="$WALLET_VERSION" \
    --duration="$TEST_DURATION" \
    --concurrency="$CONCURRENCY" \
    --output="$RESULTS_DIR/allocation_performance.json"

# Generate performance report
echo "Generating performance baseline report..."
./generate-performance-report.sh \
    --results-dir="$RESULTS_DIR" \
    --wallet="$WALLET_VERSION" \
    --output="$RESULTS_DIR/baseline_report.html"

# Store baseline in database
echo "Storing performance baseline..."
./store-performance-baseline.sh \
    --wallet="$WALLET_VERSION" \
    --results-dir="$RESULTS_DIR"

echo "=== Performance Baseline Complete ==="
echo "Results stored in: $RESULTS_DIR"
echo "Baseline report: $RESULTS_DIR/baseline_report.html"

4. Wallet Deployment and Activation

4.1 Blue-Green Deployment Strategy

class WalletDeploymentManager:
    """Blue-green deployment strategy for wallet versions"""
    
    async def deploy_wallet_blue_green(self, new_wallet_version: int) -> DeploymentResult:
        """Deploy new wallet version using blue-green strategy"""
        
        # Phase 1: Prepare blue environment (new wallet)
        blue_preparation = await self.prepare_blue_environment(new_wallet_version)
        if not blue_preparation.success:
            return DeploymentResult(
                success=False,
                phase="blue_preparation_failed",
                error=blue_preparation.error
            )
        
        # Phase 2: Run comprehensive testing on blue
        blue_testing = await self.test_blue_environment(new_wallet_version)
        if not blue_testing.all_tests_passed:
            return DeploymentResult(
                success=False,
                phase="blue_testing_failed", 
                test_results=blue_testing
            )
        
        # Phase 3: Begin gradual traffic migration
        migration_result = await self.gradual_traffic_migration(new_wallet_version)
        if not migration_result.success:
            # Automatic rollback on failure
            await self.rollback_to_green(migration_result.rollback_point)
            return DeploymentResult(
                success=False,
                phase="traffic_migration_failed",
                rollback_completed=True
            )
        
        # Phase 4: Monitor and validate production traffic
        production_validation = await self.validate_production_traffic(
            new_wallet_version, duration_minutes=30
        )
        
        if not production_validation.success:
            # Automatic rollback on production issues
            await self.rollback_to_green(production_validation.rollback_point)
            return DeploymentResult(
                success=False,
                phase="production_validation_failed",
                rollback_completed=True
            )
        
        # Phase 5: Complete deployment
        completion_result = await self.complete_deployment(new_wallet_version)
        
        return DeploymentResult(
            success=True,
            deployed_wallet_version=new_wallet_version,
            deployment_completed_at=datetime.utcnow(),
            performance_metrics=production_validation.metrics
        )
    
    async def gradual_traffic_migration(self, new_wallet_version: int) -> MigrationResult:
        """Gradually migrate traffic to new wallet version"""
        
        migration_steps = [
            {'percentage': 1, 'duration_minutes': 10},   # 1% for 10 minutes
            {'percentage': 5, 'duration_minutes': 15},   # 5% for 15 minutes  
            {'percentage': 25, 'duration_minutes': 20},  # 25% for 20 minutes
            {'percentage': 50, 'duration_minutes': 30},  # 50% for 30 minutes
            {'percentage': 100, 'duration_minutes': 0}   # 100% (complete)
        ]
        
        for step in migration_steps:
            # Update traffic routing
            await self.update_traffic_routing(
                new_wallet_version, 
                percentage=step['percentage']
            )
            
            # Monitor during step
            if step['duration_minutes'] > 0:
                monitoring_result = await self.monitor_migration_step(
                    new_wallet_version,
                    duration_minutes=step['duration_minutes'],
                    traffic_percentage=step['percentage']
                )
                
                if not monitoring_result.success:
                    return MigrationResult(
                        success=False,
                        failed_at_percentage=step['percentage'],
                        rollback_point=monitoring_result.rollback_point,
                        error=monitoring_result.error
                    )
        
        return MigrationResult(
            success=True,
            completed_at=datetime.utcnow(),
            final_traffic_percentage=100
        )

4.2 Wallet Activation Procedure

-- Wallet activation procedure
-- Execute after successful deployment and validation

BEGIN;

-- Step 1: Activate the new wallet version
UPDATE wallet_versions 
SET status = 'active',
    activated_at = NOW()
WHERE version_id = :new_wallet_version
AND status = 'deployed';

-- Step 2: Set as default if requested
UPDATE wallet_versions 
SET is_default = CASE 
    WHEN version_id = :new_wallet_version THEN true 
    ELSE false 
END
WHERE :set_as_default = true;

-- Step 3: Create initial manufacturer registration (Plings)
INSERT INTO manufacturer_registry (
    wallet_version,
    manufacturer_index,
    manufacturer_name,
    status,
    registered_by,
    registered_at
) VALUES (
    :new_wallet_version,
    1,
    'Plings',
    'active',
    :operator_id,
    NOW()
) ON CONFLICT (wallet_version, manufacturer_index) DO NOTHING;

-- Step 4: Create manufacturer lineage if migration
INSERT INTO manufacturer_lineage (
    canonical_name,
    current_wallet_version,
    current_manufacturer_index,
    predecessor_wallet,
    predecessor_index,
    migration_reason,
    migration_date,
    migration_status,
    migration_completion_percentage
) VALUES (
    'Plings',
    :new_wallet_version,
    1,
    :previous_wallet_version,
    1,
    :migration_reason,
    NOW(),
    'completed',
    100
) ON CONFLICT (canonical_name) DO UPDATE SET
    current_wallet_version = :new_wallet_version,
    predecessor_wallet = :previous_wallet_version,
    migration_reason = :migration_reason,
    migration_date = NOW();

-- Step 5: Log activation
INSERT INTO wallet_operations_log (
    operation_type,
    wallet_version,
    operation_status,
    initiated_by,
    operation_details,
    timestamp
) VALUES (
    'wallet_activation',
    :new_wallet_version,
    'completed',
    :operator_id,
    jsonb_build_object(
        'set_as_default', :set_as_default,
        'previous_default_wallet', :previous_wallet_version,
        'activation_method', 'blue_green_deployment'
    ),
    NOW()
);

COMMIT;

Production Operations and Monitoring

1. Continuous Monitoring Framework

1.1 Wallet Health Monitoring

class WalletHealthMonitor:
    """Continuous wallet health monitoring and alerting"""
    
    def __init__(self):
        self.health_checks = [
            'hsm_connectivity',
            'signature_performance',
            'path_allocation_rate',
            'database_response_time',
            'error_rate',
            'security_alerts'
        ]
        
        self.alert_thresholds = {
            'hsm_response_time_ms': 100,
            'signature_generation_ms': 10,
            'path_allocation_ms': 500,
            'database_query_ms': 50,
            'error_rate_percentage': 0.1,
            'failed_requests_per_minute': 10
        }
    
    async def run_health_checks(self, wallet_version: int) -> HealthCheckResult:
        """Execute all health checks for wallet version"""
        
        check_results = {}
        
        for check_name in self.health_checks:
            try:
                check_method = getattr(self, f'check_{check_name}')
                result = await check_method(wallet_version)
                check_results[check_name] = result
            except Exception as e:
                check_results[check_name] = HealthCheck(
                    name=check_name,
                    status='failed',
                    error=str(e),
                    timestamp=datetime.utcnow()
                )
        
        # Determine overall health
        failed_checks = [name for name, result in check_results.items() 
                        if result.status == 'failed']
        warning_checks = [name for name, result in check_results.items() 
                         if result.status == 'warning']
        
        overall_status = 'healthy'
        if failed_checks:
            overall_status = 'unhealthy'
        elif warning_checks:
            overall_status = 'warning'
        
        return HealthCheckResult(
            wallet_version=wallet_version,
            overall_status=overall_status,
            check_results=check_results,
            failed_checks=failed_checks,
            warning_checks=warning_checks,
            timestamp=datetime.utcnow()
        )
    
    async def check_hsm_connectivity(self, wallet_version: int) -> HealthCheck:
        """Check HSM connectivity and response time"""
        
        start_time = time.time()
        try:
            # Test HSM ping
            hsm_response = await self.hsm_service.ping(wallet_version)
            response_time_ms = (time.time() - start_time) * 1000
            
            if response_time_ms > self.alert_thresholds['hsm_response_time_ms']:
                return HealthCheck(
                    name='hsm_connectivity',
                    status='warning',
                    message=f'HSM response time {response_time_ms:.2f}ms exceeds threshold',
                    metrics={'response_time_ms': response_time_ms}
                )
            
            return HealthCheck(
                name='hsm_connectivity',
                status='healthy',
                metrics={'response_time_ms': response_time_ms}
            )
            
        except Exception as e:
            return HealthCheck(
                name='hsm_connectivity',
                status='failed',
                error=str(e),
                metrics={'response_time_ms': (time.time() - start_time) * 1000}
            )

1.2 Automated Alerting System

# Wallet monitoring alerts configuration
wallet_monitoring_alerts:
  
  critical_alerts:
    - name: "hsm_connectivity_failed"
      condition: "hsm_connectivity.status == 'failed'"
      notification_channels: ["pagerduty", "slack_critical", "email_oncall"]
      auto_escalation: true
      escalation_time_minutes: 5
      
    - name: "wallet_error_rate_high"  
      condition: "error_rate_percentage > 1.0"
      notification_channels: ["pagerduty", "slack_critical"]
      auto_escalation: true
      escalation_time_minutes: 10
      
    - name: "signature_generation_failed"
      condition: "signature_performance.status == 'failed'"
      notification_channels: ["pagerduty", "slack_critical", "email_oncall"]
      auto_escalation: true
      escalation_time_minutes: 5
  
  warning_alerts:
    - name: "hsm_response_time_high"
      condition: "hsm_response_time_ms > 100"
      notification_channels: ["slack_warnings", "email_operations"]
      auto_escalation: false
      
    - name: "path_allocation_slow"
      condition: "path_allocation_ms > 500"
      notification_channels: ["slack_warnings"]
      auto_escalation: false
      
    - name: "database_response_slow"
      condition: "database_query_ms > 50"
      notification_channels: ["slack_warnings", "email_database_team"]
      auto_escalation: false

  notification_channels:
    pagerduty:
      integration_key: "${PAGERDUTY_INTEGRATION_KEY}"
      severity: "critical"
      
    slack_critical:
      webhook_url: "${SLACK_CRITICAL_WEBHOOK}"
      channel: "#wallet-alerts-critical"
      
    slack_warnings:
      webhook_url: "${SLACK_WARNINGS_WEBHOOK}" 
      channel: "#wallet-alerts-warnings"
      
    email_oncall:
      smtp_server: "${SMTP_SERVER}"
      recipients: ["oncall-engineer@plings.io"]
      
    email_operations:
      smtp_server: "${SMTP_SERVER}"
      recipients: ["operations-team@plings.io"]

2. Capacity Planning and Scaling

2.1 Wallet Capacity Analysis

class WalletCapacityPlanner:
    """Analyze wallet capacity and plan scaling requirements"""
    
    async def analyze_wallet_capacity(self, wallet_version: int) -> CapacityAnalysis:
        """Comprehensive capacity analysis for wallet version"""
        
        # Current usage metrics
        current_metrics = await self.get_current_usage_metrics(wallet_version)
        
        # Path allocation trends
        allocation_trends = await self.analyze_allocation_trends(wallet_version)
        
        # Performance under load
        performance_metrics = await self.analyze_performance_metrics(wallet_version)
        
        # Manufacturer growth
        manufacturer_growth = await self.analyze_manufacturer_growth(wallet_version)
        
        # Capacity projections
        projections = await self.project_capacity_requirements(
            current_metrics, allocation_trends, manufacturer_growth
        )
        
        return CapacityAnalysis(
            wallet_version=wallet_version,
            current_usage=current_metrics,
            allocation_trends=allocation_trends,
            performance_metrics=performance_metrics,
            manufacturer_growth=manufacturer_growth,
            capacity_projections=projections,
            recommendations=self.generate_capacity_recommendations(projections),
            analysis_timestamp=datetime.utcnow()
        )
    
    async def analyze_allocation_trends(self, wallet_version: int) -> AllocationTrends:
        """Analyze path allocation trends and growth patterns"""
        
        # Daily allocation rates over last 90 days
        daily_allocations = await self.db.query("""
            SELECT 
                DATE(allocated_at) as allocation_date,
                COUNT(*) as allocation_count,
                SUM(quantity) as total_quantity,
                COUNT(DISTINCT allocated_by) as unique_users,
                COUNT(DISTINCT manufacturer_name) as active_manufacturers
            FROM path_registry
            WHERE wallet_version = %s
            AND allocated_at >= NOW() - INTERVAL '90 days'
            GROUP BY DATE(allocated_at)
            ORDER BY allocation_date
        """, [wallet_version])
        
        # Calculate growth rate
        if len(daily_allocations) >= 30:
            recent_avg = sum(day.allocation_count for day in daily_allocations[-30:]) / 30
            older_avg = sum(day.allocation_count for day in daily_allocations[-60:-30]) / 30
            growth_rate = (recent_avg - older_avg) / older_avg if older_avg > 0 else 0
        else:
            growth_rate = 0
        
        # Peak usage analysis
        peak_allocation_day = max(daily_allocations, key=lambda x: x.allocation_count)
        
        return AllocationTrends(
            daily_allocations=daily_allocations,
            average_daily_allocations=sum(day.allocation_count for day in daily_allocations) / len(daily_allocations),
            growth_rate_percentage=growth_rate * 100,
            peak_allocation_day=peak_allocation_day,
            trend_direction='increasing' if growth_rate > 0.05 else 'stable' if growth_rate > -0.05 else 'decreasing'
        )

2.2 Automated Scaling Recommendations

#!/bin/bash
# Automated wallet capacity scaling recommendations
# Run weekly as part of capacity planning process

WALLET_VERSION="${1:-1}"
ANALYSIS_PERIOD="${2:-90}" # days

echo "=== Wallet Capacity Scaling Analysis ==="
echo "Wallet Version: $WALLET_VERSION"
echo "Analysis Period: ${ANALYSIS_PERIOD} days"
echo "Analysis Date: $(date)"
echo

# Generate capacity analysis report
./capacity-analysis.py \
    --wallet-version="$WALLET_VERSION" \
    --period-days="$ANALYSIS_PERIOD" \
    --output-format="json" \
    --output-file="/tmp/capacity_analysis.json"

# Extract key metrics
CURRENT_ALLOCATION_RATE=$(jq -r '.current_usage.daily_allocation_rate' /tmp/capacity_analysis.json)
GROWTH_RATE=$(jq -r '.allocation_trends.growth_rate_percentage' /tmp/capacity_analysis.json)
PROJECTED_PEAK=$(jq -r '.capacity_projections.projected_peak_daily_allocations' /tmp/capacity_analysis.json)
CURRENT_CAPACITY=$(jq -r '.performance_metrics.max_daily_capacity' /tmp/capacity_analysis.json)

echo "Current Metrics:"
echo "  Daily Allocation Rate: $CURRENT_ALLOCATION_RATE"
echo "  Growth Rate: ${GROWTH_RATE}%"
echo "  Current Max Capacity: $CURRENT_CAPACITY"
echo "  Projected Peak: $PROJECTED_PEAK"
echo

# Calculate capacity utilization
UTILIZATION=$(echo "scale=2; $CURRENT_ALLOCATION_RATE / $CURRENT_CAPACITY * 100" | bc)
PROJECTED_UTILIZATION=$(echo "scale=2; $PROJECTED_PEAK / $CURRENT_CAPACITY * 100" | bc)

echo "Capacity Analysis:"
echo "  Current Utilization: ${UTILIZATION}%"
echo "  Projected Peak Utilization: ${PROJECTED_UTILIZATION}%"
echo

# Generate recommendations
if (( $(echo "$PROJECTED_UTILIZATION > 80" | bc -l) )); then
    echo "🚨 RECOMMENDATION: Scale up required"
    echo "   Projected utilization exceeds 80% threshold"
    echo "   Consider creating additional wallet version or upgrading HSM capacity"
elif (( $(echo "$PROJECTED_UTILIZATION > 60" | bc -l) )); then
    echo "⚠️  RECOMMENDATION: Monitor closely"
    echo "   Projected utilization approaching capacity limits"
    echo "   Plan scaling activities for next quarter"
else
    echo "✅ RECOMMENDATION: Current capacity sufficient"
    echo "   Continue monitoring with current configuration"
fi

echo
echo "Detailed recommendations available in: /tmp/capacity_analysis.json"

Wallet Migration and Transition

1. Planned Migration Procedures

1.1 Manufacturer Migration Workflow

class ManufacturerMigrationOrchestrator:
    """Orchestrate manufacturer migration between wallet versions"""
    
    async def plan_manufacturer_migration(self, 
                                        from_wallet: int,
                                        to_wallet: int,
                                        migration_reason: str) -> MigrationPlan:
        """Create comprehensive migration plan for all manufacturers"""
        
        # Get all manufacturers in source wallet
        manufacturers = await self.get_wallet_manufacturers(from_wallet)
        
        # Assess migration complexity for each manufacturer
        migration_assessments = []
        for manufacturer in manufacturers:
            assessment = await self.assess_manufacturer_migration_complexity(
                manufacturer, from_wallet, to_wallet
            )
            migration_assessments.append(assessment)
        
        # Sort by migration complexity and business priority
        migration_order = sorted(
            migration_assessments,
            key=lambda x: (x.business_priority, -x.complexity_score)
        )
        
        # Create phased migration plan
        migration_phases = self.create_migration_phases(migration_order)
        
        return MigrationPlan(
            from_wallet=from_wallet,
            to_wallet=to_wallet,
            migration_reason=migration_reason,
            total_manufacturers=len(manufacturers),
            migration_phases=migration_phases,
            estimated_total_duration=sum(phase.estimated_duration for phase in migration_phases),
            risk_assessment=self.assess_migration_risk(migration_phases),
            rollback_plan=self.create_rollback_plan(migration_phases)
        )
    
    async def execute_manufacturer_migration(self, migration_plan: MigrationPlan) -> MigrationResult:
        """Execute manufacturer migration according to plan"""
        
        migration_results = []
        
        for phase in migration_plan.migration_phases:
            phase_result = await self.execute_migration_phase(phase)
            migration_results.append(phase_result)
            
            # Check for phase failure
            if not phase_result.success:
                # Execute rollback for failed phase
                rollback_result = await self.rollback_failed_phase(phase, phase_result)
                
                return MigrationResult(
                    success=False,
                    completed_phases=len([r for r in migration_results if r.success]),
                    failed_phase=phase.phase_name,
                    rollback_executed=rollback_result.success,
                    error=phase_result.error
                )
            
            # Monitor post-phase stability
            stability_check = await self.verify_phase_stability(phase, duration_minutes=30)
            if not stability_check.stable:
                # Execute rollback due to instability
                rollback_result = await self.rollback_unstable_phase(phase, stability_check)
                
                return MigrationResult(
                    success=False,
                    completed_phases=len([r for r in migration_results if r.success]),
                    failed_phase=phase.phase_name,
                    failure_reason='post_migration_instability',
                    rollback_executed=rollback_result.success
                )
        
        # All phases completed successfully
        return MigrationResult(
            success=True,
            completed_phases=len(migration_results),
            total_manufacturers_migrated=sum(phase.manufacturer_count for phase in migration_plan.migration_phases),
            migration_completed_at=datetime.utcnow(),
            final_verification=await self.verify_complete_migration(migration_plan)
        )

1.2 Zero-Downtime Migration Strategy

-- Zero-downtime manufacturer migration procedure
-- Maintains service availability during migration

BEGIN;

-- Step 1: Create manufacturer in target wallet (parallel registration)
INSERT INTO manufacturer_registry (
    wallet_version,
    manufacturer_index,
    manufacturer_name,
    organization_id,
    status,
    registered_by,
    registered_at,
    migrated_from_wallet
) VALUES (
    :target_wallet_version,
    :manufacturer_index,
    :manufacturer_name,
    :organization_id,
    'migrating', -- Status during migration
    :operator_id,
    NOW(),
    :source_wallet_version
);

-- Step 2: Update manufacturer lineage
INSERT INTO manufacturer_lineage (
    canonical_name,
    current_wallet_version,
    current_manufacturer_index,
    predecessor_wallet,
    predecessor_index,
    migration_reason,
    migration_date,
    migration_status,
    migration_completion_percentage
) VALUES (
    :canonical_name,
    :target_wallet_version,
    :manufacturer_index,
    :source_wallet_version,
    :manufacturer_index,
    :migration_reason,
    NOW(),
    'in_progress',
    0
) ON CONFLICT (canonical_name) DO UPDATE SET
    current_wallet_version = :target_wallet_version,
    predecessor_wallet = manufacturer_lineage.current_wallet_version,
    predecessor_index = manufacturer_lineage.current_manufacturer_index,
    migration_reason = :migration_reason,
    migration_date = NOW(),
    migration_status = 'in_progress',
    migration_completion_percentage = 0;

-- Step 3: Begin path migration (critical paths first)
-- This is done in batches to maintain system responsiveness
INSERT INTO path_migration_queue (
    source_wallet_version,
    target_wallet_version,
    manufacturer_name,
    path,
    priority,
    migration_status,
    queued_at
)
SELECT 
    wallet_version as source_wallet_version,
    :target_wallet_version as target_wallet_version,
    manufacturer_name,
    path,
    CASE 
        WHEN allocated_at > NOW() - INTERVAL '30 days' THEN 'high'
        WHEN allocated_at > NOW() - INTERVAL '90 days' THEN 'medium'
        ELSE 'low'
    END as priority,
    'queued' as migration_status,
    NOW() as queued_at
FROM path_registry
WHERE wallet_version = :source_wallet_version
AND manufacturer_name = :manufacturer_name
AND status = 'active'
ORDER BY allocated_at DESC;

-- Step 4: Log migration initiation
INSERT INTO wallet_operations_log (
    operation_type,
    wallet_version,
    operation_status,
    initiated_by,
    operation_details,
    timestamp
) VALUES (
    'manufacturer_migration_started',
    :target_wallet_version,
    'in_progress',
    :operator_id,
    jsonb_build_object(
        'source_wallet', :source_wallet_version,
        'manufacturer_name', :manufacturer_name,
        'migration_reason', :migration_reason,
        'total_paths_queued', (
            SELECT COUNT(*) FROM path_migration_queue 
            WHERE source_wallet_version = :source_wallet_version 
            AND manufacturer_name = :manufacturer_name
        )
    ),
    NOW()
);

COMMIT;

Wallet Retirement and Cleanup

1. Wallet Deprecation Process

1.1 Controlled Wallet Deprecation

class WalletRetirementManager:
    """Manage controlled retirement of wallet versions"""
    
    async def initiate_wallet_deprecation(self, wallet_version: int, 
                                        deprecation_reason: str,
                                        target_retirement_date: datetime) -> DeprecationPlan:
        """Create and execute wallet deprecation plan"""
        
        # Validate deprecation eligibility
        eligibility_check = await self.check_deprecation_eligibility(wallet_version)
        if not eligibility_check.eligible:
            raise DeprecationException(f"Wallet v{wallet_version} not eligible for deprecation: {eligibility_check.reason}")
        
        # Assess current usage
        usage_assessment = await self.assess_wallet_usage(wallet_version)
        
        # Create migration timeline
        migration_timeline = await self.create_migration_timeline(
            wallet_version, target_retirement_date, usage_assessment
        )
        
        # Mark wallet as deprecated
        await self.mark_wallet_deprecated(wallet_version, deprecation_reason)
        
        # Schedule migration notifications
        await self.schedule_deprecation_notifications(wallet_version, migration_timeline)
        
        return DeprecationPlan(
            wallet_version=wallet_version,
            deprecation_reason=deprecation_reason,
            current_usage=usage_assessment,
            migration_timeline=migration_timeline,
            target_retirement_date=target_retirement_date,
            deprecation_initiated_at=datetime.utcnow()
        )
    
    async def check_deprecation_eligibility(self, wallet_version: int) -> EligibilityCheck:
        """Check if wallet version is eligible for deprecation"""
        
        # Check 1: Not the only active wallet
        active_wallets = await self.db.fetch("""
            SELECT COUNT(*) as active_count
            FROM wallet_versions 
            WHERE status = 'active' AND version_id != %s
        """, [wallet_version])
        
        if active_wallets[0]['active_count'] == 0:
            return EligibilityCheck(
                eligible=False,
                reason="Cannot deprecate the only active wallet version"
            )
        
        # Check 2: No recent critical allocations
        recent_critical_allocations = await self.db.fetch("""
            SELECT COUNT(*) as critical_count
            FROM path_registry pr
            JOIN organizations o ON pr.organization_id = o.id
            WHERE pr.wallet_version = %s
            AND pr.allocated_at > NOW() - INTERVAL '7 days'
            AND o.type IN ('Company', 'Institution')
        """, [wallet_version])
        
        if recent_critical_allocations[0]['critical_count'] > 0:
            return EligibilityCheck(
                eligible=False,
                reason="Recent critical organizational allocations detected"
            )
        
        # Check 3: No active incident response
        active_incidents = await self.check_active_security_incidents(wallet_version)
        if active_incidents:
            return EligibilityCheck(
                eligible=False,
                reason="Active security incidents involving this wallet"
            )
        
        return EligibilityCheck(eligible=True)

1.2 Final Cleanup and Archival

#!/bin/bash
# Wallet retirement and archival procedure
# Execute after all manufacturers have been migrated

WALLET_VERSION="${1:-}"
RETENTION_PERIOD="${2:-2555}" # 7 years in days (regulatory requirement)

if [[ -z "$WALLET_VERSION" ]]; then
    echo "Usage: $0 <wallet_version> [retention_period_days]"
    exit 1
fi

echo "=== Wallet Retirement and Archival ==="
echo "Wallet Version: $WALLET_VERSION"
echo "Retention Period: $RETENTION_PERIOD days"
echo "Retirement Date: $(date)"
echo

# Verify wallet is ready for retirement
echo "Verifying wallet retirement eligibility..."
if ! ./verify-retirement-eligibility.sh "$WALLET_VERSION"; then
    echo "❌ Wallet not ready for retirement"
    exit 1
fi

# Create archival directory
ARCHIVE_DIR="/secure/wallet-archives/wallet-v${WALLET_VERSION}-$(date +%Y%m%d)"
mkdir -p "$ARCHIVE_DIR"
chmod 700 "$ARCHIVE_DIR"

# Archive operational data
echo "Archiving operational data..."
./archive-wallet-data.sh \
    --wallet-version="$WALLET_VERSION" \
    --output-dir="$ARCHIVE_DIR" \
    --include-logs \
    --include-metrics \
    --include-audit-trail

# Archive HSM key material
echo "Archiving HSM key material..."
./archive-hsm-keys.sh \
    --wallet-version="$WALLET_VERSION" \
    --archive-dir="$ARCHIVE_DIR" \
    --secure-wipe-hsm

# Update database records
echo "Updating database retirement records..."
./update-retirement-records.sql \
    --wallet-version="$WALLET_VERSION" \
    --retirement-date="$(date -u +%Y-%m-%d)" \
    --retention-until="$(date -d "+${RETENTION_PERIOD} days" +%Y-%m-%d)" \
    --archive-location="$ARCHIVE_DIR"

# Generate retirement certificate
echo "Generating retirement certificate..."
./generate-retirement-certificate.sh \
    --wallet-version="$WALLET_VERSION" \
    --archive-location="$ARCHIVE_DIR" \
    --output="$ARCHIVE_DIR/retirement_certificate.pdf"

# Final cleanup
echo "Performing final cleanup..."
./cleanup-retired-wallet.sh \
    --wallet-version="$WALLET_VERSION" \
    --preserve-audit-trail \
    --secure-delete-temp-files

echo "=== Wallet Retirement Complete ==="
echo "Archive Location: $ARCHIVE_DIR"
echo "Retention Until: $(date -d "+${RETENTION_PERIOD} days")"
echo "Retirement Certificate: $ARCHIVE_DIR/retirement_certificate.pdf"

Operational Procedures Summary

Daily Operations Checklist

# Daily wallet operations checklist
# Execute every morning by operations team

echo "=== Daily Wallet Operations Checklist ==="
echo "Date: $(date)"
echo

# 1. Check wallet health status
echo "1. Checking wallet health status..."
./check-all-wallet-health.sh --summary

# 2. Review overnight alerts
echo "2. Reviewing overnight alerts..."
./review-overnight-alerts.sh --since="yesterday"

# 3. Verify HSM status
echo "3. Verifying HSM status..."
./check-hsm-status.sh --all-wallets

# 4. Check capacity utilization
echo "4. Checking capacity utilization..."
./check-capacity-utilization.sh --alert-threshold=70

# 5. Verify backup integrity
echo "5. Verifying backup integrity..."
./verify-backup-integrity.sh --random-sample=10

# 6. Review performance metrics
echo "6. Reviewing performance metrics..."
./review-performance-metrics.sh --since="yesterday"

# 7. Check security alerts
echo "7. Checking security alerts..."
./check-security-alerts.sh --since="yesterday"

echo "=== Daily Checklist Complete ==="

Monthly Operations Review

# Monthly wallet operations review template
monthly_review:
  date: "2025-07-13"
  period: "June 2025"
  
  wallet_inventory:
    - version: "v1"
      status: "active"
      environment: "production"
      manufacturer_count: 15
      daily_allocation_avg: 50000
      
    - version: "v2" 
      status: "deprecated"
      environment: "production"
      manufacturer_count: 3
      migration_completion: "85%"
      
  performance_summary:
    signature_generation_avg_ms: 8.2
    path_allocation_avg_ms: 245
    hsm_response_avg_ms: 12
    uptime_percentage: 99.97
    
  security_events:
    total_alerts: 5
    critical_alerts: 0
    false_positives: 2
    incidents_resolved: 1
    
  capacity_planning:
    current_utilization: "68%"
    projected_growth: "12% monthly"
    scaling_recommendations: "Add HSM capacity Q4 2025"
    
  operational_improvements:
    - "Automated manufacturer migration tooling"
    - "Enhanced monitoring dashboards"
    - "Improved backup verification procedures"

For complete understanding of wallet lifecycle management:

Operations Contact Information:

  • Operations Team: operations@plings.io
  • On-Call Engineer: oncall@plings.io
  • Capacity Planning: capacity-planning@plings.io

Last Updated: Sön 13 Jul 2025 11:17:59 CEST - Comprehensive wallet lifecycle management procedures with operational frameworks, monitoring, migration, and retirement processes