Skip to main content

Statistics Tracking Implementation

Overview

Implemented a comprehensive, production-ready statistics tracking system with hourly updates, daily snapshots, and monthly aggregations for billing and historical reporting.

Architecture

Data Flow

Real-time:     MQTT messages → IoT Rule → MQTT Counter Lambda → increment project_stats counters
Hourly:        EventBridge → Hourly Stats Lambda → Query AWS APIs → Update project_stats
Daily (00:00): EventBridge → Daily Snapshot Lambda → Create daily_stats record
Monthly (1st): EventBridge → Monthly Reset Lambda → Aggregate daily_stats → Create monthly_stats → Reset counters

Database Schema

project_stats (Real-time Counters)

  • Current month counters (logs, sessions, MQTT messages, API calls, etc.)
  • Storage metrics (total_docker_storage_bytes, total_s3_storage_bytes, etc.)
  • Resource counts (total_devices, active_devices, current_ecr_images)
  • Lifetime aggregates (lifetime_logs, lifetime_sessions, etc.)

daily_stats (Daily Snapshots)

  • Immutable daily records created at 00:00 UTC
  • Captures previous day’s usage and storage
  • Used for daily trend charts
  • Aggregated into monthly stats

monthly_stats (Monthly Historical Records)

  • Created on 1st of each month from daily_stats aggregation
  • Peak storage values across the month
  • Totals for usage (logs, sessions, MQTT, etc.)
  • Billing metadata (tier, exceeded_limits, overage_charge)
  • Used for invoicing and compliance

Lambda Functions

1. MQTT Counter Lambda (infrastructure/lambda/mqtt-counter/)

  • Trigger: IoT Rule on all roboticks/# topics
  • Frequency: Real-time (every MQTT message)
  • Function: Increment MQTT inbound/outbound counters in database
  • Timeout: 30 seconds
  • Dependencies: psycopg2-binary, boto3
Topic Parsing:
roboticks/{org_id}/{project_id}/{device_id}/...

2. Hourly Stats Updater Lambda (infrastructure/lambda/hourly-stats-updater/)

  • Trigger: EventBridge hourly schedule
  • Frequency: Every hour
  • Function: Query AWS APIs (ECR, S3, DB) and refresh project_stats
  • Timeout: 5 minutes
  • Memory: 512 MB
  • Dependencies: psycopg2-binary, boto3, sqlalchemy, backend app layer
AWS API Calls:
  • ECR: list_images(), describe_images() for image counts and sizes
  • S3: list_objects_v2() for storage calculation
  • Database: Count devices, sessions, logs

3. Daily Snapshot Lambda (infrastructure/lambda/daily-snapshot/)

  • Trigger: EventBridge daily at 00:00 UTC
  • Frequency: Daily
  • Function: Create daily_stats snapshot for yesterday
  • Timeout: 5 minutes
  • Memory: 512 MB
  • Dependencies: psycopg2-binary, boto3, sqlalchemy, backend app layer
Process:
  1. For each active project
  2. Read current project_stats
  3. Create daily_stats record with yesterday’s date
  4. Store usage counts, storage metrics, resource counts

4. Monthly Reset Lambda (infrastructure/lambda/monthly-reset/)

  • Trigger: EventBridge on 1st of month at 00:00 UTC
  • Frequency: Monthly
  • Function: Aggregate daily_stats → monthly_stats, reset counters
  • Timeout: 10 minutes
  • Memory: 1024 MB
  • Dependencies: psycopg2-binary, boto3, sqlalchemy, backend app layer
Process:
  1. For each active project
  2. Query all daily_stats for previous month
  3. Sum usage counts (logs, sessions, MQTT, etc.)
  4. Find peak storage values
  5. Create monthly_stats record
  6. Archive to lifetime totals
  7. Reset monthly counters to 0

Backend Services

StatsUpdaterService (backend/app/services/stats_updater.py)

Production-ready service with real AWS integration. Key Methods:
  • update_all_projects() - Refresh stats for all active projects
  • update_project_stats(project) - Update one project from AWS APIs
  • increment_mqtt_inbound(project_id, count) - Real-time counter
  • increment_mqtt_outbound(project_id, count) - Real-time counter
  • create_daily_snapshot(project) - Create daily stats record
  • create_all_daily_snapshots() - Create for all projects
  • reset_monthly_stats(project) - Aggregate and reset
  • reset_all_monthly_stats() - Reset all projects
AWS Integration:
  • CloudWatch: MQTT metrics validation
  • ECR: Image counts and sizes via ecr.list_images(), ecr.describe_images()
  • S3: Storage usage via s3.list_objects_v2() with project prefix
  • Database: Resource counts via SQLAlchemy queries

LimitService (backend/app/services/limit_service.py)

Refactored to only check limits (read-only), not update stats. Methods:
  • check_project_limit(org) - Verify project count
  • check_user_limit(org, count) - Verify user count
  • check_device_limit(org, project) - Verify device count
  • check_storage_limit(org, project, bytes) - Verify storage
  • check_mqtt_inbound_limit(org, project) - Verify MQTT inbound
  • check_mqtt_outbound_limit(org, project) - Verify MQTT outbound
  • check_ecr_image_limit(org, project) - Verify ECR images

Database Migrations

Migration: 20251108_create_daily_stats.py (Revision: j8901234567h)

Creates daily_stats table with:
  • Foreign keys to projects and organizations
  • Time period fields (date, year, month, day)
  • Usage counters (logs, sessions, MQTT, API, deployments)
  • Storage snapshots (storage_bytes, s3_storage_bytes, docker_storage_bytes, logs_storage_bytes)
  • Network metrics (docker_upload_bytes, docker_download_bytes, data_transfer_bytes)
  • Resource counts (device_count, active_device_count, ecr_image_count)
  • Indexes for efficient querying by project, org, date, year/month
  • Unique constraint on (project_id, date)

Migration: 20251108_create_monthly_stats.py (Revision: i7890123456g)

Creates monthly_stats table with:
  • Foreign keys to projects and organizations
  • Time period fields (year, month)
  • Aggregated usage totals
  • Peak storage values
  • Billing metadata (tier, exceeded_limits, overage_charge)
  • Indexes for efficient querying by project, org, year/month
  • Unique constraint on (project_id, year, month)

CDK Infrastructure

Lambda Layers

psycopg2Layer (existing):
  • Contains psycopg2-binary for PostgreSQL access
  • Used by all Lambdas
backendAppLayer (new):
  • Bundles backend app code and dependencies
  • Used by hourly-stats, daily-snapshot, monthly-reset Lambdas
  • Enables importing app.services.stats_updater

Environment Variables

All stats Lambdas receive:
{
  DB_HOST: database.dbInstanceEndpointAddress,
  DB_PORT: database.dbInstanceEndpointPort,
  DB_NAME: 'conduit',
  DB_USER: 'conduit',
  DB_PASSWORD: dbCredentials.secretValueFromJson('password'),
  AWS_REGION: this.region,
  ECR_REPOSITORY_URI: compositionsRepository.repositoryUri,
  S3_BUCKET: storageBucket.bucketName,
}

IAM Permissions

  • Database: All Lambdas can connect to RDS PostgreSQL in VPC
  • Secrets Manager: All Lambdas can read DB credentials
  • S3: Hourly stats Lambda can read storage bucket
  • ECR: Hourly stats Lambda can pull/describe images
  • IoT: MQTT counter Lambda invoked by IoT Rules

EventBridge Schedules

  • Hourly: Schedule.rate(cdk.Duration.hours(1))
  • Daily: Schedule.cron({ hour: '0', minute: '0' })
  • Monthly: Schedule.cron({ day: '1', hour: '0', minute: '0' })

Testing

Local Development

# Test StatsUpdaterService
cd backend
python -m pytest tests/test_stats_updater.py

# Test LimitService
python -m pytest tests/test_limit_service.py

Lambda Testing

  1. Deploy stack: cd infrastructure && cdk deploy
  2. Monitor CloudWatch Logs for each Lambda
  3. Check database for daily_stats and monthly_stats records
  4. Verify project_stats counters increment correctly

Manual Triggers

# Invoke hourly stats update
aws lambda invoke --function-name RoboticksStack-HourlyStatsLambda-XXXXX response.json

# Invoke daily snapshot
aws lambda invoke --function-name RoboticksStack-DailySnapshotLambda-XXXXX response.json

# Invoke monthly reset (test on non-1st day)
aws lambda invoke --function-name RoboticksStack-MonthlyResetLambda-XXXXX response.json

Future Enhancements

High Priority

  1. Add retry logic with exponential backoff for AWS API throttling
  2. Implement dead-letter queues for failed Lambda invocations
  3. Add CloudWatch alarms for Lambda errors
  4. Create API endpoints to query daily_stats and monthly_stats

Medium Priority

  1. Add query-by-date-range logic in create_daily_snapshot() for more accurate counts
  2. Implement pagination for large S3 buckets and ECR repositories
  3. Add support for multiple regions
  4. Create admin dashboard for viewing aggregated stats

Low Priority

  1. Add support for custom retention policies (delete old daily_stats)
  2. Export monthly_stats to S3 for long-term archival
  3. Integrate with Stripe for automated billing
  4. Add anomaly detection for unusual usage patterns

Deployment Checklist

  • Database migrations created and reviewed
  • Lambda functions written and tested locally
  • CDK stack updated with Lambda definitions
  • EventBridge rules configured
  • IoT Rule for MQTT counter created
  • IAM permissions granted
  • Environment variables configured
  • Run database migrations: cd backend && alembic upgrade head
  • Deploy CDK stack: cd infrastructure && cdk deploy
  • Verify Lambda functions in AWS console
  • Check CloudWatch Logs for errors
  • Verify daily_stats records created after 00:00 UTC
  • Monitor MQTT message counters in real-time
  • Wait for 1st of month to verify monthly_stats creation

Monitoring

CloudWatch Metrics

Monitor Lambda invocations, errors, and duration:
  • AWS/Lambda/Invocations
  • AWS/Lambda/Errors
  • AWS/Lambda/Duration
  • AWS/Lambda/Throttles

Database Queries

-- Check real-time stats
SELECT * FROM project_stats WHERE project_id = 1;

-- View daily snapshots
SELECT * FROM daily_stats
WHERE project_id = 1
ORDER BY date DESC
LIMIT 30;

-- View monthly history
SELECT * FROM monthly_stats
WHERE project_id = 1
ORDER BY year DESC, month DESC;

-- Check for missing daily snapshots
SELECT generate_series(
  date_trunc('day', NOW() - interval '30 days'),
  date_trunc('day', NOW() - interval '1 day'),
  interval '1 day'
) AS expected_date
EXCEPT
SELECT date FROM daily_stats WHERE project_id = 1;

Troubleshooting

MQTT counters not incrementing

  • Check IoT Rule is active: aws iot get-topic-rule --rule-name MqttCounterRule
  • Verify Lambda has IoT invoke permission
  • Check CloudWatch Logs for MQTT counter Lambda
  • Verify topic format: roboticks/{org_id}/{project_id}/{device_id}/...

Hourly stats not updating

  • Check EventBridge rule is enabled
  • Verify Lambda timeout is sufficient (5 min)
  • Check AWS API rate limits
  • Verify VPC and security group configuration

Daily snapshots missing

  • Verify EventBridge cron schedule
  • Check Lambda execution logs
  • Ensure database has active projects
  • Verify timezone is UTC

Monthly reset failed

  • Check if daily_stats exist for previous month
  • Verify Lambda memory is sufficient (1024 MB)
  • Check database transaction timeout
  • Review aggregation logic for edge cases

References

  • Backend service: backend/app/services/stats_updater.py
  • Limit checking: backend/app/services/limit_service.py
  • Lambda functions: infrastructure/lambda/*/
  • CDK stack: infrastructure/lib/roboticks-stack.ts
  • Database models: backend/app/models/daily_stats.py, backend/app/models/monthly_stats.py
  • Migrations: backend/alembic/versions/20251108_*.py