Flows Database
The Flows platform uses TimescaleDB (PostgreSQL extension) for efficient time-series data storage and querying.
Schema Organization
The database is organized into the flows schema containing all meter and infrastructure tables:
flows.
├── meter_* # Meter-related tables
├── register_* # Consumption data
├── circuit_* # Circuit definitions and data
├── service_head_* # Electrical topology
├── properties # Physical locations
├── sites_new # Site groupings
└── escos # Energy communities
Key Technologies
TimescaleDB
- Hypertables for time-series data partitioning
- Continuous aggregates for real-time materialized views
- Compression policies for historical data
- Retention policies for data lifecycle
PostgREST
- Automatic REST API generation from schema
- Row-level security (RLS)
- JWT authentication
- OpenAPI documentation
Table Categories
Configuration Tables
Static or slowly changing data:
meter_registry- Meter configurationscircuits- Circuit definitionsproperties- Property informationescos- Organization data
State Tables
Current state snapshots:
meter_shadows- Real-time meter statemeter_prepay_balance- Current balances
Time-Series Tables
High-frequency measurement data:
register_import- Consumption readingsregister_export- Generation readingsregister_interval_hh- Half-hourly aggregatesmeter_voltage- Voltage measurementsmeter_csq- Signal quality
Junction Tables
Many-to-many relationships:
circuit_register- Circuit to register mappingservice_head_meter- Service head to meterproperties_service_head- Property connections
Performance Optimizations
Indexing Strategy
-- Time-series indexes
CREATE INDEX idx_register_import_time
ON flows.register_import(timestamp DESC);
-- Foreign key indexes
CREATE INDEX idx_meter_shadows_id
ON flows.meter_shadows(id);
-- Composite indexes for common queries
CREATE INDEX idx_register_import_composite
ON flows.register_import(register_id, timestamp DESC);
Partitioning
TimescaleDB automatically partitions by time:
-- Convert to hypertable
SELECT create_hypertable('flows.register_import', 'timestamp');
-- Set chunk interval to 1 day
SELECT set_chunk_time_interval('flows.register_import', INTERVAL '1 day');
Compression
Historical data compression:
-- Add compression policy
ALTER TABLE flows.register_import
SET (timescaledb.compress);
-- Compress data older than 7 days
SELECT add_compression_policy('flows.register_import', INTERVAL '7 days');
Views and Aggregations
Continuous Aggregates
Real-time materialized views:
CREATE MATERIALIZED VIEW flows.circuit_interval_daily
WITH (timescaledb.continuous) AS
SELECT
circuit_id,
time_bucket('1 day', timestamp) AS day,
SUM(import_kwh) as total_import_kwh,
SUM(export_kwh) as total_export_kwh
FROM flows.register_interval_hh rih
JOIN flows.circuit_register cr ON rih.register_id = cr.register
GROUP BY circuit_id, day;
Standard Views
Logical data representations:
meter_shadows_tariffs- Denormalized tariff datameters_low_balance- Meters needing top-upmeters_offline_recently- Connectivity monitoring
Access Control
Roles
tsdbadmin- Full administrative accessflows- Application read/write accessgrafanareader- Read-only for monitoringpublic_backend- API access
Row-Level Security
-- Enable RLS
ALTER TABLE flows.meter_registry ENABLE ROW LEVEL SECURITY;
-- Create policy
CREATE POLICY meter_esco_isolation ON flows.meter_registry
FOR ALL
TO flows
USING (esco = current_setting('app.current_esco')::uuid);
Backup and Recovery
Backup Strategy
- Continuous backups via WAL archiving
- Daily snapshots of full database
- Retention: 30 days of point-in-time recovery
Recovery Procedures
# Point-in-time recovery
pg_restore --dbname=flows --clean --create flows_backup.dump
# Specific table recovery
pg_restore --dbname=flows --table=meter_registry flows_backup.dump
Monitoring
Key Metrics
- Table sizes and growth rates
- Query performance (pg_stat_statements)
- Connection pool usage
- Replication lag
Health Checks
-- Check chunk sizes
SELECT
hypertable_name,
chunk_name,
pg_size_pretty(total_bytes) as size
FROM timescaledb_information.chunks
WHERE hypertable_name = 'register_import'
ORDER BY total_bytes DESC
LIMIT 10;
-- Check compression status
SELECT
hypertable_name,
uncompressed_total_bytes,
compressed_total_bytes,
compression_ratio
FROM timescaledb_information.hypertable_compression_stats;
Maintenance Tasks
Regular Operations
-- Update statistics
ANALYZE flows.register_import;
-- Reindex for performance
REINDEX TABLE flows.meter_shadows;
-- Vacuum to reclaim space
VACUUM ANALYZE flows.register_interval_hh;
Data Retention
-- Add retention policy (keep 2 years)
SELECT add_retention_policy('flows.register_import', INTERVAL '2 years');
-- Manual data pruning
DELETE FROM flows.meter_event_log
WHERE timestamp < NOW() - INTERVAL '90 days';
Next Steps
- Schema Reference - Documentation coming soon
- Query Patterns - Documentation coming soon
- Migration Guide - Documentation coming soon