| ← Back to Main Documentation | Core Systems Index |
Plings Wallet Lifecycle Management - Operational Procedures
Created: Sön 13 Jul 2025 11:17:59 CEST
Document Version: 2.0 - Wallet-First Operational Framework
Review Schedule: Monthly (Next: August 2025)
Target Audience: Operations Teams, DevOps Engineers, System Administrators
Overview
The Plings Wallet Lifecycle Management system provides comprehensive operational procedures for managing the complete lifecycle of wallet versions in the wallet-first architecture. This includes wallet creation, deployment, monitoring, maintenance, migration, and retirement procedures that ensure continuous service availability and security.
Lifecycle Stages
1. Planning & Approval → 2. Creation & Testing → 3. Deployment & Activation
↓ ↓ ↓
8. Retirement & Cleanup ← 7. Migration & Transition ← 6. Monitoring & Maintenance
↑ ↑ ↑
└── 5. Incident Response ← 4. Production Operations ←┘
Key Operational Principles
- Zero-Downtime Operations: All wallet lifecycle operations maintain service availability
- Automated Validation: Comprehensive testing at every lifecycle stage
- Audit Transparency: Complete operational audit trails for compliance
- Risk Mitigation: Proactive risk assessment and mitigation procedures
- Scalability Planning: Capacity planning and performance optimization
Wallet Creation and Deployment
1. Wallet Planning and Approval Process
1.1 Wallet Creation Request Process
# Wallet Creation Request Template
wallet_creation_request:
metadata:
request_id: "WCR-2025-07-13-001"
requested_by: "operations@plings.io"
request_date: "2025-07-13T11:17:59Z"
priority: "standard" # standard, high, emergency
wallet_specification:
environment: "production" # production, testing, development
purpose: "planned_key_rotation" # planned_rotation, capacity_expansion, security_upgrade, incident_response
security_level: "standard" # standard, high, experimental
estimated_lifespan: "12_months"
business_justification:
reason: "Annual planned key rotation for production environment"
business_impact: "Maintains security best practices and regulatory compliance"
risk_if_delayed: "Medium - delayed rotation increases security exposure"
technical_requirements:
hsm_requirements: "FIPS_140_2_Level_3"
performance_requirements: "10000_signatures_per_second"
storage_requirements: "encrypted_at_rest"
backup_requirements: "geographic_distribution"
approval_workflow:
security_team_approval: "required"
operations_team_approval: "required"
executive_approval: "required_for_production"
compliance_review: "required_for_production"
1.2 Automated Approval Workflow
class WalletCreationWorkflow:
"""Automated workflow for wallet creation approval"""
async def process_wallet_creation_request(self, request: WalletCreationRequest) -> ApprovalResult:
"""Process wallet creation request through approval workflow"""
# Step 1: Automated validation
validation_result = await self.validate_request(request)
if not validation_result.success:
return ApprovalResult(
status="rejected",
reason="validation_failed",
details=validation_result.errors
)
# Step 2: Risk assessment
risk_assessment = await self.assess_risk(request)
if risk_assessment.risk_level == "high":
request.approval_requirements.append("executive_approval")
# Step 3: Resource availability check
resource_check = await self.check_resource_availability(request)
if not resource_check.sufficient:
return ApprovalResult(
status="pending_resources",
estimated_availability=resource_check.next_available_slot
)
# Step 4: Approval routing
approval_tasks = []
for approver in request.approval_requirements:
task = self.request_approval(approver, request)
approval_tasks.append(task)
# Step 5: Collect approvals
approvals = await asyncio.gather(*approval_tasks)
if all(approval.approved for approval in approvals):
return ApprovalResult(
status="approved",
approval_id=f"WAP-{datetime.utcnow().strftime('%Y%m%d%H%M%S')}",
approvers=[a.approver for a in approvals],
valid_until=datetime.utcnow() + timedelta(days=7)
)
else:
return ApprovalResult(
status="rejected",
reason="insufficient_approvals",
pending_approvers=[a.approver for a in approvals if not a.approved]
)
2. Wallet Creation Procedure
2.1 HSM Key Generation
#!/bin/bash
# HSM Master Key Generation Script
# Executed in secure air-gapped environment
set -euo pipefail
WALLET_VERSION="${1:-}"
APPROVAL_ID="${2:-}"
WITNESS_COUNT="${3:-3}"
if [[ -z "$WALLET_VERSION" || -z "$APPROVAL_ID" ]]; then
echo "Usage: $0 <wallet_version> <approval_id> [witness_count]"
exit 1
fi
echo "=== Plings Wallet Master Key Generation ==="
echo "Wallet Version: $WALLET_VERSION"
echo "Approval ID: $APPROVAL_ID"
echo "Witnesses Required: $WITNESS_COUNT"
echo "Generation Time: $(date -u)"
echo
# Verify air-gapped environment
if ping -c 1 8.8.8.8 &> /dev/null; then
echo "ERROR: Network connectivity detected. Key generation requires air-gapped environment."
exit 1
fi
# Verify HSM availability
if ! ./hsm-status.sh --verify-ready; then
echo "ERROR: HSM not ready for key generation"
exit 1
fi
# Collect witness signatures
echo "Collecting witness signatures..."
for i in $(seq 1 $WITNESS_COUNT); do
echo "Witness $i: Please provide your signature for approval $APPROVAL_ID"
read -p "Witness $i Name: " witness_name
read -s -p "Witness $i Signature: " witness_signature
echo
# Verify witness signature
if ! ./verify-witness-signature.sh "$witness_name" "$witness_signature" "$APPROVAL_ID"; then
echo "ERROR: Invalid witness signature for $witness_name"
exit 1
fi
echo "Witness $i verified: $witness_name"
done
# Generate entropy from multiple sources
echo "Generating cryptographic entropy..."
HARDWARE_ENTROPY=$(./hardware-rng.sh --bytes=32)
QUANTUM_ENTROPY=$(./quantum-rng.sh --bytes=32)
ATMOSPHERIC_ENTROPY=$(./atmospheric-rng.sh --bytes=32)
# Combine entropy sources
COMBINED_ENTROPY=$(./combine-entropy.sh "$HARDWARE_ENTROPY" "$QUANTUM_ENTROPY" "$ATMOSPHERIC_ENTROPY")
# Generate master key in HSM
echo "Generating master key in HSM..."
HSM_KEY_ID="plings_master_key_v${WALLET_VERSION}"
./hsm-generate-key.sh \
--key-id="$HSM_KEY_ID" \
--algorithm="Ed25519" \
--entropy="$COMBINED_ENTROPY" \
--witnesses="$WITNESS_COUNT" \
--backup-required
# Verify key generation
if ./hsm-verify-key.sh --key-id="$HSM_KEY_ID"; then
echo "✅ Master key generated successfully: $HSM_KEY_ID"
else
echo "❌ Master key generation verification failed"
exit 1
fi
# Generate backup
echo "Creating secure backup..."
./hsm-backup-key.sh --key-id="$HSM_KEY_ID" --geographic-distribution
echo "=== Key Generation Complete ==="
echo "Key ID: $HSM_KEY_ID"
echo "Backup Locations: $(./hsm-list-backups.sh --key-id=$HSM_KEY_ID)"
echo "Next Step: Execute wallet deployment procedure"
2.2 Wallet Database Creation
-- Wallet creation procedure
-- Execute after successful HSM key generation
BEGIN;
-- Create new wallet version
INSERT INTO wallet_versions (
version_id,
version_name,
master_key_id,
description,
environment,
security_level,
status,
created_at,
is_default
) VALUES (
:wallet_version_id,
:wallet_version_name,
:hsm_key_id,
:description,
:environment,
:security_level,
'created', -- Initial status
NOW(),
false -- Will be set to true during activation
);
-- Create operational audit record
INSERT INTO wallet_operations_log (
operation_type,
wallet_version,
operation_status,
initiated_by,
approval_id,
operation_details,
timestamp
) VALUES (
'wallet_creation',
:wallet_version_id,
'completed',
:operator_id,
:approval_id,
jsonb_build_object(
'hsm_key_id', :hsm_key_id,
'environment', :environment,
'security_level', :security_level,
'witness_count', :witness_count
),
NOW()
);
COMMIT;
3. Wallet Testing and Validation
3.1 Comprehensive Wallet Testing Suite
class WalletValidationSuite:
"""Comprehensive testing suite for new wallet versions"""
async def run_full_validation(self, wallet_version: int) -> ValidationResult:
"""Execute complete wallet validation test suite"""
test_results = {}
# Test 1: HSM connectivity and key access
test_results['hsm_connectivity'] = await self.test_hsm_connectivity(wallet_version)
# Test 2: Cryptographic operations
test_results['crypto_operations'] = await self.test_cryptographic_operations(wallet_version)
# Test 3: Path derivation accuracy
test_results['path_derivation'] = await self.test_path_derivation(wallet_version)
# Test 4: Database integration
test_results['database_integration'] = await self.test_database_integration(wallet_version)
# Test 5: Performance baseline
test_results['performance'] = await self.test_performance_baseline(wallet_version)
# Test 6: Security controls
test_results['security_controls'] = await self.test_security_controls(wallet_version)
# Test 7: Backup and recovery
test_results['backup_recovery'] = await self.test_backup_recovery(wallet_version)
# Analyze results
all_passed = all(result.success for result in test_results.values())
critical_failures = [name for name, result in test_results.items()
if not result.success and result.severity == 'critical']
return ValidationResult(
wallet_version=wallet_version,
all_tests_passed=all_passed,
test_results=test_results,
critical_failures=critical_failures,
validation_timestamp=datetime.utcnow(),
ready_for_deployment=all_passed and len(critical_failures) == 0
)
async def test_cryptographic_operations(self, wallet_version: int) -> TestResult:
"""Test cryptographic operations for wallet version"""
try:
# Test 1: Basic signature generation
test_signature = await self.wallet_service.generate_test_signature(wallet_version)
# Test 2: Signature verification
verification_result = await self.wallet_service.verify_test_signature(
wallet_version, test_signature
)
# Test 3: Path key derivation
test_paths = ["2.2.2.2.2", "2.3.2.7P.3Lz", "2.G.5j.2qQ.3L8z"] # Class pointers separate
derivation_results = []
for path in test_paths:
derived_key = await self.wallet_service.derive_path_key(wallet_version, path)
derivation_results.append({
'path': path,
'derived_successfully': derived_key is not None,
'key_format_valid': self.validate_key_format(derived_key)
})
# Test 4: Performance under load
performance_test = await self.test_signature_performance(wallet_version, count=1000)
all_tests_passed = (
verification_result.valid and
all(r['derived_successfully'] and r['key_format_valid'] for r in derivation_results) and
performance_test.average_time_ms < 10 # Sub-10ms signature generation
)
return TestResult(
test_name='cryptographic_operations',
success=all_tests_passed,
severity='critical',
details={
'signature_verification': verification_result,
'path_derivations': derivation_results,
'performance': performance_test
}
)
except Exception as e:
return TestResult(
test_name='cryptographic_operations',
success=False,
severity='critical',
error=str(e)
)
3.2 Performance Baseline Establishment
#!/bin/bash
# Wallet Performance Baseline Testing
# Establish performance baselines for new wallet version
WALLET_VERSION="${1:-}"
TEST_DURATION="${2:-300}" # 5 minutes default
CONCURRENCY="${3:-10}"
if [[ -z "$WALLET_VERSION" ]]; then
echo "Usage: $0 <wallet_version> [test_duration_seconds] [concurrency_level]"
exit 1
fi
echo "=== Wallet Performance Baseline Testing ==="
echo "Wallet Version: $WALLET_VERSION"
echo "Test Duration: ${TEST_DURATION}s"
echo "Concurrency: $CONCURRENCY"
echo "Start Time: $(date)"
echo
# Create test results directory
RESULTS_DIR="./test-results/wallet-v${WALLET_VERSION}-$(date +%Y%m%d-%H%M%S)"
mkdir -p "$RESULTS_DIR"
# Test 1: Signature Generation Performance
echo "Testing signature generation performance..."
./performance-test-signatures.sh \
--wallet="$WALLET_VERSION" \
--duration="$TEST_DURATION" \
--concurrency="$CONCURRENCY" \
--output="$RESULTS_DIR/signature_performance.json"
# Test 2: Path Derivation Performance
echo "Testing path derivation performance..."
./performance-test-derivation.sh \
--wallet="$WALLET_VERSION" \
--duration="$TEST_DURATION" \
--concurrency="$CONCURRENCY" \
--output="$RESULTS_DIR/derivation_performance.json"
# Test 3: Database Query Performance
echo "Testing database query performance..."
./performance-test-database.sh \
--wallet="$WALLET_VERSION" \
--duration="$TEST_DURATION" \
--output="$RESULTS_DIR/database_performance.json"
# Test 4: End-to-End Allocation Performance
echo "Testing end-to-end allocation performance..."
./performance-test-allocation.sh \
--wallet="$WALLET_VERSION" \
--duration="$TEST_DURATION" \
--concurrency="$CONCURRENCY" \
--output="$RESULTS_DIR/allocation_performance.json"
# Generate performance report
echo "Generating performance baseline report..."
./generate-performance-report.sh \
--results-dir="$RESULTS_DIR" \
--wallet="$WALLET_VERSION" \
--output="$RESULTS_DIR/baseline_report.html"
# Store baseline in database
echo "Storing performance baseline..."
./store-performance-baseline.sh \
--wallet="$WALLET_VERSION" \
--results-dir="$RESULTS_DIR"
echo "=== Performance Baseline Complete ==="
echo "Results stored in: $RESULTS_DIR"
echo "Baseline report: $RESULTS_DIR/baseline_report.html"
4. Wallet Deployment and Activation
4.1 Blue-Green Deployment Strategy
class WalletDeploymentManager:
"""Blue-green deployment strategy for wallet versions"""
async def deploy_wallet_blue_green(self, new_wallet_version: int) -> DeploymentResult:
"""Deploy new wallet version using blue-green strategy"""
# Phase 1: Prepare blue environment (new wallet)
blue_preparation = await self.prepare_blue_environment(new_wallet_version)
if not blue_preparation.success:
return DeploymentResult(
success=False,
phase="blue_preparation_failed",
error=blue_preparation.error
)
# Phase 2: Run comprehensive testing on blue
blue_testing = await self.test_blue_environment(new_wallet_version)
if not blue_testing.all_tests_passed:
return DeploymentResult(
success=False,
phase="blue_testing_failed",
test_results=blue_testing
)
# Phase 3: Begin gradual traffic migration
migration_result = await self.gradual_traffic_migration(new_wallet_version)
if not migration_result.success:
# Automatic rollback on failure
await self.rollback_to_green(migration_result.rollback_point)
return DeploymentResult(
success=False,
phase="traffic_migration_failed",
rollback_completed=True
)
# Phase 4: Monitor and validate production traffic
production_validation = await self.validate_production_traffic(
new_wallet_version, duration_minutes=30
)
if not production_validation.success:
# Automatic rollback on production issues
await self.rollback_to_green(production_validation.rollback_point)
return DeploymentResult(
success=False,
phase="production_validation_failed",
rollback_completed=True
)
# Phase 5: Complete deployment
completion_result = await self.complete_deployment(new_wallet_version)
return DeploymentResult(
success=True,
deployed_wallet_version=new_wallet_version,
deployment_completed_at=datetime.utcnow(),
performance_metrics=production_validation.metrics
)
async def gradual_traffic_migration(self, new_wallet_version: int) -> MigrationResult:
"""Gradually migrate traffic to new wallet version"""
migration_steps = [
{'percentage': 1, 'duration_minutes': 10}, # 1% for 10 minutes
{'percentage': 5, 'duration_minutes': 15}, # 5% for 15 minutes
{'percentage': 25, 'duration_minutes': 20}, # 25% for 20 minutes
{'percentage': 50, 'duration_minutes': 30}, # 50% for 30 minutes
{'percentage': 100, 'duration_minutes': 0} # 100% (complete)
]
for step in migration_steps:
# Update traffic routing
await self.update_traffic_routing(
new_wallet_version,
percentage=step['percentage']
)
# Monitor during step
if step['duration_minutes'] > 0:
monitoring_result = await self.monitor_migration_step(
new_wallet_version,
duration_minutes=step['duration_minutes'],
traffic_percentage=step['percentage']
)
if not monitoring_result.success:
return MigrationResult(
success=False,
failed_at_percentage=step['percentage'],
rollback_point=monitoring_result.rollback_point,
error=monitoring_result.error
)
return MigrationResult(
success=True,
completed_at=datetime.utcnow(),
final_traffic_percentage=100
)
4.2 Wallet Activation Procedure
-- Wallet activation procedure
-- Execute after successful deployment and validation
BEGIN;
-- Step 1: Activate the new wallet version
UPDATE wallet_versions
SET status = 'active',
activated_at = NOW()
WHERE version_id = :new_wallet_version
AND status = 'deployed';
-- Step 2: Set as default if requested
UPDATE wallet_versions
SET is_default = CASE
WHEN version_id = :new_wallet_version THEN true
ELSE false
END
WHERE :set_as_default = true;
-- Step 3: Create initial manufacturer registration (Plings)
INSERT INTO manufacturer_registry (
wallet_version,
manufacturer_index,
manufacturer_name,
status,
registered_by,
registered_at
) VALUES (
:new_wallet_version,
1,
'Plings',
'active',
:operator_id,
NOW()
) ON CONFLICT (wallet_version, manufacturer_index) DO NOTHING;
-- Step 4: Create manufacturer lineage if migration
INSERT INTO manufacturer_lineage (
canonical_name,
current_wallet_version,
current_manufacturer_index,
predecessor_wallet,
predecessor_index,
migration_reason,
migration_date,
migration_status,
migration_completion_percentage
) VALUES (
'Plings',
:new_wallet_version,
1,
:previous_wallet_version,
1,
:migration_reason,
NOW(),
'completed',
100
) ON CONFLICT (canonical_name) DO UPDATE SET
current_wallet_version = :new_wallet_version,
predecessor_wallet = :previous_wallet_version,
migration_reason = :migration_reason,
migration_date = NOW();
-- Step 5: Log activation
INSERT INTO wallet_operations_log (
operation_type,
wallet_version,
operation_status,
initiated_by,
operation_details,
timestamp
) VALUES (
'wallet_activation',
:new_wallet_version,
'completed',
:operator_id,
jsonb_build_object(
'set_as_default', :set_as_default,
'previous_default_wallet', :previous_wallet_version,
'activation_method', 'blue_green_deployment'
),
NOW()
);
COMMIT;
Production Operations and Monitoring
1. Continuous Monitoring Framework
1.1 Wallet Health Monitoring
class WalletHealthMonitor:
"""Continuous wallet health monitoring and alerting"""
def __init__(self):
self.health_checks = [
'hsm_connectivity',
'signature_performance',
'path_allocation_rate',
'database_response_time',
'error_rate',
'security_alerts'
]
self.alert_thresholds = {
'hsm_response_time_ms': 100,
'signature_generation_ms': 10,
'path_allocation_ms': 500,
'database_query_ms': 50,
'error_rate_percentage': 0.1,
'failed_requests_per_minute': 10
}
async def run_health_checks(self, wallet_version: int) -> HealthCheckResult:
"""Execute all health checks for wallet version"""
check_results = {}
for check_name in self.health_checks:
try:
check_method = getattr(self, f'check_{check_name}')
result = await check_method(wallet_version)
check_results[check_name] = result
except Exception as e:
check_results[check_name] = HealthCheck(
name=check_name,
status='failed',
error=str(e),
timestamp=datetime.utcnow()
)
# Determine overall health
failed_checks = [name for name, result in check_results.items()
if result.status == 'failed']
warning_checks = [name for name, result in check_results.items()
if result.status == 'warning']
overall_status = 'healthy'
if failed_checks:
overall_status = 'unhealthy'
elif warning_checks:
overall_status = 'warning'
return HealthCheckResult(
wallet_version=wallet_version,
overall_status=overall_status,
check_results=check_results,
failed_checks=failed_checks,
warning_checks=warning_checks,
timestamp=datetime.utcnow()
)
async def check_hsm_connectivity(self, wallet_version: int) -> HealthCheck:
"""Check HSM connectivity and response time"""
start_time = time.time()
try:
# Test HSM ping
hsm_response = await self.hsm_service.ping(wallet_version)
response_time_ms = (time.time() - start_time) * 1000
if response_time_ms > self.alert_thresholds['hsm_response_time_ms']:
return HealthCheck(
name='hsm_connectivity',
status='warning',
message=f'HSM response time {response_time_ms:.2f}ms exceeds threshold',
metrics={'response_time_ms': response_time_ms}
)
return HealthCheck(
name='hsm_connectivity',
status='healthy',
metrics={'response_time_ms': response_time_ms}
)
except Exception as e:
return HealthCheck(
name='hsm_connectivity',
status='failed',
error=str(e),
metrics={'response_time_ms': (time.time() - start_time) * 1000}
)
1.2 Automated Alerting System
# Wallet monitoring alerts configuration
wallet_monitoring_alerts:
critical_alerts:
- name: "hsm_connectivity_failed"
condition: "hsm_connectivity.status == 'failed'"
notification_channels: ["pagerduty", "slack_critical", "email_oncall"]
auto_escalation: true
escalation_time_minutes: 5
- name: "wallet_error_rate_high"
condition: "error_rate_percentage > 1.0"
notification_channels: ["pagerduty", "slack_critical"]
auto_escalation: true
escalation_time_minutes: 10
- name: "signature_generation_failed"
condition: "signature_performance.status == 'failed'"
notification_channels: ["pagerduty", "slack_critical", "email_oncall"]
auto_escalation: true
escalation_time_minutes: 5
warning_alerts:
- name: "hsm_response_time_high"
condition: "hsm_response_time_ms > 100"
notification_channels: ["slack_warnings", "email_operations"]
auto_escalation: false
- name: "path_allocation_slow"
condition: "path_allocation_ms > 500"
notification_channels: ["slack_warnings"]
auto_escalation: false
- name: "database_response_slow"
condition: "database_query_ms > 50"
notification_channels: ["slack_warnings", "email_database_team"]
auto_escalation: false
notification_channels:
pagerduty:
integration_key: "${PAGERDUTY_INTEGRATION_KEY}"
severity: "critical"
slack_critical:
webhook_url: "${SLACK_CRITICAL_WEBHOOK}"
channel: "#wallet-alerts-critical"
slack_warnings:
webhook_url: "${SLACK_WARNINGS_WEBHOOK}"
channel: "#wallet-alerts-warnings"
email_oncall:
smtp_server: "${SMTP_SERVER}"
recipients: ["oncall-engineer@plings.io"]
email_operations:
smtp_server: "${SMTP_SERVER}"
recipients: ["operations-team@plings.io"]
2. Capacity Planning and Scaling
2.1 Wallet Capacity Analysis
class WalletCapacityPlanner:
"""Analyze wallet capacity and plan scaling requirements"""
async def analyze_wallet_capacity(self, wallet_version: int) -> CapacityAnalysis:
"""Comprehensive capacity analysis for wallet version"""
# Current usage metrics
current_metrics = await self.get_current_usage_metrics(wallet_version)
# Path allocation trends
allocation_trends = await self.analyze_allocation_trends(wallet_version)
# Performance under load
performance_metrics = await self.analyze_performance_metrics(wallet_version)
# Manufacturer growth
manufacturer_growth = await self.analyze_manufacturer_growth(wallet_version)
# Capacity projections
projections = await self.project_capacity_requirements(
current_metrics, allocation_trends, manufacturer_growth
)
return CapacityAnalysis(
wallet_version=wallet_version,
current_usage=current_metrics,
allocation_trends=allocation_trends,
performance_metrics=performance_metrics,
manufacturer_growth=manufacturer_growth,
capacity_projections=projections,
recommendations=self.generate_capacity_recommendations(projections),
analysis_timestamp=datetime.utcnow()
)
async def analyze_allocation_trends(self, wallet_version: int) -> AllocationTrends:
"""Analyze path allocation trends and growth patterns"""
# Daily allocation rates over last 90 days
daily_allocations = await self.db.query("""
SELECT
DATE(allocated_at) as allocation_date,
COUNT(*) as allocation_count,
SUM(quantity) as total_quantity,
COUNT(DISTINCT allocated_by) as unique_users,
COUNT(DISTINCT manufacturer_name) as active_manufacturers
FROM path_registry
WHERE wallet_version = %s
AND allocated_at >= NOW() - INTERVAL '90 days'
GROUP BY DATE(allocated_at)
ORDER BY allocation_date
""", [wallet_version])
# Calculate growth rate
if len(daily_allocations) >= 30:
recent_avg = sum(day.allocation_count for day in daily_allocations[-30:]) / 30
older_avg = sum(day.allocation_count for day in daily_allocations[-60:-30]) / 30
growth_rate = (recent_avg - older_avg) / older_avg if older_avg > 0 else 0
else:
growth_rate = 0
# Peak usage analysis
peak_allocation_day = max(daily_allocations, key=lambda x: x.allocation_count)
return AllocationTrends(
daily_allocations=daily_allocations,
average_daily_allocations=sum(day.allocation_count for day in daily_allocations) / len(daily_allocations),
growth_rate_percentage=growth_rate * 100,
peak_allocation_day=peak_allocation_day,
trend_direction='increasing' if growth_rate > 0.05 else 'stable' if growth_rate > -0.05 else 'decreasing'
)
2.2 Automated Scaling Recommendations
#!/bin/bash
# Automated wallet capacity scaling recommendations
# Run weekly as part of capacity planning process
WALLET_VERSION="${1:-1}"
ANALYSIS_PERIOD="${2:-90}" # days
echo "=== Wallet Capacity Scaling Analysis ==="
echo "Wallet Version: $WALLET_VERSION"
echo "Analysis Period: ${ANALYSIS_PERIOD} days"
echo "Analysis Date: $(date)"
echo
# Generate capacity analysis report
./capacity-analysis.py \
--wallet-version="$WALLET_VERSION" \
--period-days="$ANALYSIS_PERIOD" \
--output-format="json" \
--output-file="/tmp/capacity_analysis.json"
# Extract key metrics
CURRENT_ALLOCATION_RATE=$(jq -r '.current_usage.daily_allocation_rate' /tmp/capacity_analysis.json)
GROWTH_RATE=$(jq -r '.allocation_trends.growth_rate_percentage' /tmp/capacity_analysis.json)
PROJECTED_PEAK=$(jq -r '.capacity_projections.projected_peak_daily_allocations' /tmp/capacity_analysis.json)
CURRENT_CAPACITY=$(jq -r '.performance_metrics.max_daily_capacity' /tmp/capacity_analysis.json)
echo "Current Metrics:"
echo " Daily Allocation Rate: $CURRENT_ALLOCATION_RATE"
echo " Growth Rate: ${GROWTH_RATE}%"
echo " Current Max Capacity: $CURRENT_CAPACITY"
echo " Projected Peak: $PROJECTED_PEAK"
echo
# Calculate capacity utilization
UTILIZATION=$(echo "scale=2; $CURRENT_ALLOCATION_RATE / $CURRENT_CAPACITY * 100" | bc)
PROJECTED_UTILIZATION=$(echo "scale=2; $PROJECTED_PEAK / $CURRENT_CAPACITY * 100" | bc)
echo "Capacity Analysis:"
echo " Current Utilization: ${UTILIZATION}%"
echo " Projected Peak Utilization: ${PROJECTED_UTILIZATION}%"
echo
# Generate recommendations
if (( $(echo "$PROJECTED_UTILIZATION > 80" | bc -l) )); then
echo "🚨 RECOMMENDATION: Scale up required"
echo " Projected utilization exceeds 80% threshold"
echo " Consider creating additional wallet version or upgrading HSM capacity"
elif (( $(echo "$PROJECTED_UTILIZATION > 60" | bc -l) )); then
echo "⚠️ RECOMMENDATION: Monitor closely"
echo " Projected utilization approaching capacity limits"
echo " Plan scaling activities for next quarter"
else
echo "✅ RECOMMENDATION: Current capacity sufficient"
echo " Continue monitoring with current configuration"
fi
echo
echo "Detailed recommendations available in: /tmp/capacity_analysis.json"
Wallet Migration and Transition
1. Planned Migration Procedures
1.1 Manufacturer Migration Workflow
class ManufacturerMigrationOrchestrator:
"""Orchestrate manufacturer migration between wallet versions"""
async def plan_manufacturer_migration(self,
from_wallet: int,
to_wallet: int,
migration_reason: str) -> MigrationPlan:
"""Create comprehensive migration plan for all manufacturers"""
# Get all manufacturers in source wallet
manufacturers = await self.get_wallet_manufacturers(from_wallet)
# Assess migration complexity for each manufacturer
migration_assessments = []
for manufacturer in manufacturers:
assessment = await self.assess_manufacturer_migration_complexity(
manufacturer, from_wallet, to_wallet
)
migration_assessments.append(assessment)
# Sort by migration complexity and business priority
migration_order = sorted(
migration_assessments,
key=lambda x: (x.business_priority, -x.complexity_score)
)
# Create phased migration plan
migration_phases = self.create_migration_phases(migration_order)
return MigrationPlan(
from_wallet=from_wallet,
to_wallet=to_wallet,
migration_reason=migration_reason,
total_manufacturers=len(manufacturers),
migration_phases=migration_phases,
estimated_total_duration=sum(phase.estimated_duration for phase in migration_phases),
risk_assessment=self.assess_migration_risk(migration_phases),
rollback_plan=self.create_rollback_plan(migration_phases)
)
async def execute_manufacturer_migration(self, migration_plan: MigrationPlan) -> MigrationResult:
"""Execute manufacturer migration according to plan"""
migration_results = []
for phase in migration_plan.migration_phases:
phase_result = await self.execute_migration_phase(phase)
migration_results.append(phase_result)
# Check for phase failure
if not phase_result.success:
# Execute rollback for failed phase
rollback_result = await self.rollback_failed_phase(phase, phase_result)
return MigrationResult(
success=False,
completed_phases=len([r for r in migration_results if r.success]),
failed_phase=phase.phase_name,
rollback_executed=rollback_result.success,
error=phase_result.error
)
# Monitor post-phase stability
stability_check = await self.verify_phase_stability(phase, duration_minutes=30)
if not stability_check.stable:
# Execute rollback due to instability
rollback_result = await self.rollback_unstable_phase(phase, stability_check)
return MigrationResult(
success=False,
completed_phases=len([r for r in migration_results if r.success]),
failed_phase=phase.phase_name,
failure_reason='post_migration_instability',
rollback_executed=rollback_result.success
)
# All phases completed successfully
return MigrationResult(
success=True,
completed_phases=len(migration_results),
total_manufacturers_migrated=sum(phase.manufacturer_count for phase in migration_plan.migration_phases),
migration_completed_at=datetime.utcnow(),
final_verification=await self.verify_complete_migration(migration_plan)
)
1.2 Zero-Downtime Migration Strategy
-- Zero-downtime manufacturer migration procedure
-- Maintains service availability during migration
BEGIN;
-- Step 1: Create manufacturer in target wallet (parallel registration)
INSERT INTO manufacturer_registry (
wallet_version,
manufacturer_index,
manufacturer_name,
organization_id,
status,
registered_by,
registered_at,
migrated_from_wallet
) VALUES (
:target_wallet_version,
:manufacturer_index,
:manufacturer_name,
:organization_id,
'migrating', -- Status during migration
:operator_id,
NOW(),
:source_wallet_version
);
-- Step 2: Update manufacturer lineage
INSERT INTO manufacturer_lineage (
canonical_name,
current_wallet_version,
current_manufacturer_index,
predecessor_wallet,
predecessor_index,
migration_reason,
migration_date,
migration_status,
migration_completion_percentage
) VALUES (
:canonical_name,
:target_wallet_version,
:manufacturer_index,
:source_wallet_version,
:manufacturer_index,
:migration_reason,
NOW(),
'in_progress',
0
) ON CONFLICT (canonical_name) DO UPDATE SET
current_wallet_version = :target_wallet_version,
predecessor_wallet = manufacturer_lineage.current_wallet_version,
predecessor_index = manufacturer_lineage.current_manufacturer_index,
migration_reason = :migration_reason,
migration_date = NOW(),
migration_status = 'in_progress',
migration_completion_percentage = 0;
-- Step 3: Begin path migration (critical paths first)
-- This is done in batches to maintain system responsiveness
INSERT INTO path_migration_queue (
source_wallet_version,
target_wallet_version,
manufacturer_name,
path,
priority,
migration_status,
queued_at
)
SELECT
wallet_version as source_wallet_version,
:target_wallet_version as target_wallet_version,
manufacturer_name,
path,
CASE
WHEN allocated_at > NOW() - INTERVAL '30 days' THEN 'high'
WHEN allocated_at > NOW() - INTERVAL '90 days' THEN 'medium'
ELSE 'low'
END as priority,
'queued' as migration_status,
NOW() as queued_at
FROM path_registry
WHERE wallet_version = :source_wallet_version
AND manufacturer_name = :manufacturer_name
AND status = 'active'
ORDER BY allocated_at DESC;
-- Step 4: Log migration initiation
INSERT INTO wallet_operations_log (
operation_type,
wallet_version,
operation_status,
initiated_by,
operation_details,
timestamp
) VALUES (
'manufacturer_migration_started',
:target_wallet_version,
'in_progress',
:operator_id,
jsonb_build_object(
'source_wallet', :source_wallet_version,
'manufacturer_name', :manufacturer_name,
'migration_reason', :migration_reason,
'total_paths_queued', (
SELECT COUNT(*) FROM path_migration_queue
WHERE source_wallet_version = :source_wallet_version
AND manufacturer_name = :manufacturer_name
)
),
NOW()
);
COMMIT;
Wallet Retirement and Cleanup
1. Wallet Deprecation Process
1.1 Controlled Wallet Deprecation
class WalletRetirementManager:
"""Manage controlled retirement of wallet versions"""
async def initiate_wallet_deprecation(self, wallet_version: int,
deprecation_reason: str,
target_retirement_date: datetime) -> DeprecationPlan:
"""Create and execute wallet deprecation plan"""
# Validate deprecation eligibility
eligibility_check = await self.check_deprecation_eligibility(wallet_version)
if not eligibility_check.eligible:
raise DeprecationException(f"Wallet v{wallet_version} not eligible for deprecation: {eligibility_check.reason}")
# Assess current usage
usage_assessment = await self.assess_wallet_usage(wallet_version)
# Create migration timeline
migration_timeline = await self.create_migration_timeline(
wallet_version, target_retirement_date, usage_assessment
)
# Mark wallet as deprecated
await self.mark_wallet_deprecated(wallet_version, deprecation_reason)
# Schedule migration notifications
await self.schedule_deprecation_notifications(wallet_version, migration_timeline)
return DeprecationPlan(
wallet_version=wallet_version,
deprecation_reason=deprecation_reason,
current_usage=usage_assessment,
migration_timeline=migration_timeline,
target_retirement_date=target_retirement_date,
deprecation_initiated_at=datetime.utcnow()
)
async def check_deprecation_eligibility(self, wallet_version: int) -> EligibilityCheck:
"""Check if wallet version is eligible for deprecation"""
# Check 1: Not the only active wallet
active_wallets = await self.db.fetch("""
SELECT COUNT(*) as active_count
FROM wallet_versions
WHERE status = 'active' AND version_id != %s
""", [wallet_version])
if active_wallets[0]['active_count'] == 0:
return EligibilityCheck(
eligible=False,
reason="Cannot deprecate the only active wallet version"
)
# Check 2: No recent critical allocations
recent_critical_allocations = await self.db.fetch("""
SELECT COUNT(*) as critical_count
FROM path_registry pr
JOIN organizations o ON pr.organization_id = o.id
WHERE pr.wallet_version = %s
AND pr.allocated_at > NOW() - INTERVAL '7 days'
AND o.type IN ('Company', 'Institution')
""", [wallet_version])
if recent_critical_allocations[0]['critical_count'] > 0:
return EligibilityCheck(
eligible=False,
reason="Recent critical organizational allocations detected"
)
# Check 3: No active incident response
active_incidents = await self.check_active_security_incidents(wallet_version)
if active_incidents:
return EligibilityCheck(
eligible=False,
reason="Active security incidents involving this wallet"
)
return EligibilityCheck(eligible=True)
1.2 Final Cleanup and Archival
#!/bin/bash
# Wallet retirement and archival procedure
# Execute after all manufacturers have been migrated
WALLET_VERSION="${1:-}"
RETENTION_PERIOD="${2:-2555}" # 7 years in days (regulatory requirement)
if [[ -z "$WALLET_VERSION" ]]; then
echo "Usage: $0 <wallet_version> [retention_period_days]"
exit 1
fi
echo "=== Wallet Retirement and Archival ==="
echo "Wallet Version: $WALLET_VERSION"
echo "Retention Period: $RETENTION_PERIOD days"
echo "Retirement Date: $(date)"
echo
# Verify wallet is ready for retirement
echo "Verifying wallet retirement eligibility..."
if ! ./verify-retirement-eligibility.sh "$WALLET_VERSION"; then
echo "❌ Wallet not ready for retirement"
exit 1
fi
# Create archival directory
ARCHIVE_DIR="/secure/wallet-archives/wallet-v${WALLET_VERSION}-$(date +%Y%m%d)"
mkdir -p "$ARCHIVE_DIR"
chmod 700 "$ARCHIVE_DIR"
# Archive operational data
echo "Archiving operational data..."
./archive-wallet-data.sh \
--wallet-version="$WALLET_VERSION" \
--output-dir="$ARCHIVE_DIR" \
--include-logs \
--include-metrics \
--include-audit-trail
# Archive HSM key material
echo "Archiving HSM key material..."
./archive-hsm-keys.sh \
--wallet-version="$WALLET_VERSION" \
--archive-dir="$ARCHIVE_DIR" \
--secure-wipe-hsm
# Update database records
echo "Updating database retirement records..."
./update-retirement-records.sql \
--wallet-version="$WALLET_VERSION" \
--retirement-date="$(date -u +%Y-%m-%d)" \
--retention-until="$(date -d "+${RETENTION_PERIOD} days" +%Y-%m-%d)" \
--archive-location="$ARCHIVE_DIR"
# Generate retirement certificate
echo "Generating retirement certificate..."
./generate-retirement-certificate.sh \
--wallet-version="$WALLET_VERSION" \
--archive-location="$ARCHIVE_DIR" \
--output="$ARCHIVE_DIR/retirement_certificate.pdf"
# Final cleanup
echo "Performing final cleanup..."
./cleanup-retired-wallet.sh \
--wallet-version="$WALLET_VERSION" \
--preserve-audit-trail \
--secure-delete-temp-files
echo "=== Wallet Retirement Complete ==="
echo "Archive Location: $ARCHIVE_DIR"
echo "Retention Until: $(date -d "+${RETENTION_PERIOD} days")"
echo "Retirement Certificate: $ARCHIVE_DIR/retirement_certificate.pdf"
Operational Procedures Summary
Daily Operations Checklist
# Daily wallet operations checklist
# Execute every morning by operations team
echo "=== Daily Wallet Operations Checklist ==="
echo "Date: $(date)"
echo
# 1. Check wallet health status
echo "1. Checking wallet health status..."
./check-all-wallet-health.sh --summary
# 2. Review overnight alerts
echo "2. Reviewing overnight alerts..."
./review-overnight-alerts.sh --since="yesterday"
# 3. Verify HSM status
echo "3. Verifying HSM status..."
./check-hsm-status.sh --all-wallets
# 4. Check capacity utilization
echo "4. Checking capacity utilization..."
./check-capacity-utilization.sh --alert-threshold=70
# 5. Verify backup integrity
echo "5. Verifying backup integrity..."
./verify-backup-integrity.sh --random-sample=10
# 6. Review performance metrics
echo "6. Reviewing performance metrics..."
./review-performance-metrics.sh --since="yesterday"
# 7. Check security alerts
echo "7. Checking security alerts..."
./check-security-alerts.sh --since="yesterday"
echo "=== Daily Checklist Complete ==="
Monthly Operations Review
# Monthly wallet operations review template
monthly_review:
date: "2025-07-13"
period: "June 2025"
wallet_inventory:
- version: "v1"
status: "active"
environment: "production"
manufacturer_count: 15
daily_allocation_avg: 50000
- version: "v2"
status: "deprecated"
environment: "production"
manufacturer_count: 3
migration_completion: "85%"
performance_summary:
signature_generation_avg_ms: 8.2
path_allocation_avg_ms: 245
hsm_response_avg_ms: 12
uptime_percentage: 99.97
security_events:
total_alerts: 5
critical_alerts: 0
false_positives: 2
incidents_resolved: 1
capacity_planning:
current_utilization: "68%"
projected_growth: "12% monthly"
scaling_recommendations: "Add HSM capacity Q4 2025"
operational_improvements:
- "Automated manufacturer migration tooling"
- "Enhanced monitoring dashboards"
- "Improved backup verification procedures"
References and Related Documentation
For complete understanding of wallet lifecycle management:
- Path Registry System - Business-focused guide to path allocation and management
- Wallet Management Architecture - System architecture and design principles
- Wallet Security Model - Security frameworks and threat mitigation
- Key Compromise Procedures - Incident response procedures
- HD Wallet Database Schema - Database operations and schemas
Operations Contact Information:
- Operations Team: operations@plings.io
- On-Call Engineer: oncall@plings.io
- Capacity Planning: capacity-planning@plings.io
Last Updated: Sön 13 Jul 2025 11:17:59 CEST - Comprehensive wallet lifecycle management procedures with operational frameworks, monitoring, migration, and retirement processes