diff --git a/charger.go b/charger.go new file mode 100644 index 0000000..8a389f8 --- /dev/null +++ b/charger.go @@ -0,0 +1,43 @@ +package main + +import ( + "encoding/json" + "fmt" + "net/http" + "time" +) + +type ChargerReading struct { + Time time.Time + Power float32 // W total + EtoWh int64 // Wh cumulative +} + +type chargerStatus struct { + Nrg []float64 `json:"nrg"` // [0..15] voltages/currents/powers; [11] = total W + Eto int64 `json:"eto"` // cumulative Wh +} + +var chargerClient = &http.Client{Timeout: 5 * time.Second} + +func PollCharger(host string) (ChargerReading, error) { + url := "http://" + host + "/api/status?filter=nrg,eto" + resp, err := chargerClient.Get(url) + if err != nil { + return ChargerReading{}, fmt.Errorf("get: %w", err) + } + defer resp.Body.Close() + + var s chargerStatus + if err := json.NewDecoder(resp.Body).Decode(&s); err != nil { + return ChargerReading{}, fmt.Errorf("decode: %w", err) + } + if len(s.Nrg) < 12 { + return ChargerReading{}, fmt.Errorf("nrg array too short (%d)", len(s.Nrg)) + } + return ChargerReading{ + Time: time.Now(), + Power: float32(s.Nrg[11]), + EtoWh: s.Eto, + }, nil +} diff --git a/config.go b/config.go index 1935d2a..34a8e35 100644 --- a/config.go +++ b/config.go @@ -6,10 +6,15 @@ import ( ) type Config struct { - SampleRate int `json:"sample_rate"` // seconds - AlphaEss AlphaConf `json:"alphaess"` - MQTT MQTTConf `json:"mqtt"` - DB DBConf `json:"db"` + SampleRate int `json:"sample_rate"` // seconds + AlphaEss AlphaConf `json:"alphaess"` + MQTT MQTTConf `json:"mqtt"` + Charger ChargerConf `json:"charger"` + DB DBConf `json:"db"` +} + +type ChargerConf struct { + Host string `json:"host"` // hostname or IP of go-e charger } type AlphaConf struct { diff --git a/db.go b/db.go index 4e5b1a0..4db7bfe 100644 --- a/db.go +++ b/db.go @@ -50,6 +50,14 @@ func (db *DB) WriteMeter(ctx context.Context, t time.Time, r MeterReading) error return err } +func (db *DB) WriteCharger(ctx context.Context, r ChargerReading) error { + _, err := db.pool.Exec(ctx, + `INSERT INTO charger (time, power, eto_wh) VALUES ($1, $2, $3)`, + r.Time, r.Power, r.EtoWh, + ) + return err +} + func (db *DB) Close() { db.pool.Close() } diff --git a/main.go b/main.go index 2053f2f..49f9917 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,7 @@ func main() { defer meters.Close() interval := time.Duration(cfg.SampleRate) * time.Second - go runPollLoop(ctx, alpha, meters, db, interval) + go runPollLoop(ctx, alpha, meters, cfg.Charger.Host, db, interval) sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) @@ -52,7 +52,7 @@ func main() { logger.Println("shutting down") } -func runPollLoop(ctx context.Context, alpha *AlphaEss, meters *MeterPoller, db *DB, interval time.Duration) { +func runPollLoop(ctx context.Context, alpha *AlphaEss, meters *MeterPoller, chargerHost string, db *DB, interval time.Duration) { ticker := time.NewTicker(interval) defer ticker.Stop() for { @@ -60,22 +60,22 @@ func runPollLoop(ctx context.Context, alpha *AlphaEss, meters *MeterPoller, db * case <-ctx.Done(): return case <-ticker.C: - poll(ctx, alpha, meters, db) + poll(ctx, alpha, meters, chargerHost, db) } } } -func poll(ctx context.Context, alpha *AlphaEss, meters *MeterPoller, db *DB) { +func poll(ctx context.Context, alpha *AlphaEss, meters *MeterPoller, chargerHost string, db *DB) { t := time.Now() - // Poll AlphaEss and all SDM630 meters in parallel. var ( - inverterData *InverterData - meterData []MeterReading - wg sync.WaitGroup + inverterData *InverterData + meterData []MeterReading + chargerData *ChargerReading + wg sync.WaitGroup ) - wg.Add(2) + wg.Add(3) go func() { defer wg.Done() d, err := alpha.Poll() @@ -89,9 +89,21 @@ func poll(ctx context.Context, alpha *AlphaEss, meters *MeterPoller, db *DB) { defer wg.Done() meterData = meters.PollAll() }() + go func() { + defer wg.Done() + if chargerHost == "" { + return + } + r, err := PollCharger(chargerHost) + if err != nil { + logger.Printf("charger: %v", err) + return + } + r.Time = t + chargerData = &r + }() wg.Wait() - // Write everything with the same timestamp t. if inverterData != nil { if err := db.WriteInverter(ctx, t, inverterData); err != nil { logger.Printf("write inverter: %v", err) @@ -102,4 +114,9 @@ func poll(ctx context.Context, alpha *AlphaEss, meters *MeterPoller, db *DB) { logger.Printf("write meter %s: %v", r.Device, err) } } + if chargerData != nil { + if err := db.WriteCharger(ctx, *chargerData); err != nil { + logger.Printf("write charger: %v", err) + } + } } diff --git a/schema.sql b/schema.sql index 9ff122c..92cd388 100644 --- a/schema.sql +++ b/schema.sql @@ -98,10 +98,40 @@ SELECT time_bucket('1 day', time) AS bucket, FROM power_meter GROUP BY bucket, device; +-- Written by energy-collector every 10 s +CREATE TABLE charger ( + time TIMESTAMPTZ NOT NULL, + power REAL, -- W total charging power + eto_wh BIGINT -- Wh cumulative total energy (from go-e eto field) +); +SELECT create_hypertable('charger', 'time', chunk_time_interval => INTERVAL '7 days'); + +CREATE MATERIALIZED VIEW charger_10m +WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS +SELECT time_bucket('10 minutes', time) AS bucket, + AVG(power) AS power +FROM charger +GROUP BY bucket; + +CREATE MATERIALIZED VIEW charger_1h +WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS +SELECT time_bucket('1 hour', bucket) AS bucket, + AVG(power) AS power +FROM charger_10m +GROUP BY 1; + +CREATE MATERIALIZED VIEW charger_daily +WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS +SELECT time_bucket('1 day', time) AS bucket, + last(eto_wh, time) AS eto_wh +FROM charger +GROUP BY bucket; + -- ── Retention — keep 30 days of raw data; aggregates stay forever ───────────── SELECT add_retention_policy('inverter', INTERVAL '30 days'); SELECT add_retention_policy('power_meter', INTERVAL '30 days'); +SELECT add_retention_policy('charger', INTERVAL '30 days'); -- ── Refresh policies ────────────────────────────────────────────────────────── @@ -135,6 +165,22 @@ SELECT add_continuous_aggregate_policy('power_meter_daily', end_offset => INTERVAL '1 day', schedule_interval => INTERVAL '1 day'); --- Grant SELECT on all tables and views (including continuous aggregates) to energy. --- Run after all views are created so the grant covers them. +SELECT add_continuous_aggregate_policy('charger_10m', + start_offset => INTERVAL '1 hour', + end_offset => INTERVAL '10 minutes', + schedule_interval => INTERVAL '10 minutes'); + +SELECT add_continuous_aggregate_policy('charger_1h', + start_offset => INTERVAL '3 hours', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour'); + +SELECT add_continuous_aggregate_policy('charger_daily', + start_offset => INTERVAL '3 days', + end_offset => INTERVAL '1 day', + schedule_interval => INTERVAL '1 day'); + +-- Grant privileges after all objects are created. +-- INSERT on raw hypertables (collector writes), SELECT on everything else (frontend reads). +GRANT INSERT ON inverter, power_meter, charger TO energy; GRANT SELECT ON ALL TABLES IN SCHEMA public TO energy;