From 8295d1cf47833b8f08b8df337b632a3dcdd45430 Mon Sep 17 00:00:00 2001 From: Thomas Klaehn Date: Sat, 18 Apr 2026 11:12:40 +0200 Subject: [PATCH] =?UTF-8?q?Initial=20commit=20=E2=80=94=20energy=20collect?= =?UTF-8?q?or=20(AlphaEss=20+=20SDM630=20=E2=86=92=20TimescaleDB)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + Dockerfile | 13 ++ alphaess.go | 137 ++++++++++++++++++ config.go | 52 +++++++ db.go | 55 ++++++++ docker-compose.yml | 9 ++ go.mod | 22 +++ go.sum | 36 +++++ main.go | 105 ++++++++++++++ meter.go | 335 +++++++++++++++++++++++++++++++++++++++++++++ migrate_influx.py | 281 +++++++++++++++++++++++++++++++++++++ schema.sql | 140 +++++++++++++++++++ 12 files changed, 1186 insertions(+) create mode 100644 .gitignore create mode 100644 Dockerfile create mode 100644 alphaess.go create mode 100644 config.go create mode 100644 db.go create mode 100644 docker-compose.yml create mode 100644 go.mod create mode 100644 go.sum create mode 100644 main.go create mode 100644 meter.go create mode 100644 migrate_influx.py create mode 100644 schema.sql diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ef9f631 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +energy-collector diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..375fdbe --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:1.24-bookworm AS builder +WORKDIR /app +COPY go.mod go.sum ./ +RUN go mod download +COPY *.go ./ +RUN CGO_ENABLED=0 go build -o energy-collector . + +FROM alpine:3.21 +RUN apk add --no-cache ca-certificates tzdata +WORKDIR /app +COPY --from=builder /app/energy-collector . +USER nobody:nobody +ENTRYPOINT ["./energy-collector", "-c", "/srv/energy/collector/config.json"] diff --git a/alphaess.go b/alphaess.go new file mode 100644 index 0000000..95d45de --- /dev/null +++ b/alphaess.go @@ -0,0 +1,137 @@ +package main + +import ( + "fmt" + "log" + + "github.com/simonvetter/modbus" +) + +// InverterData holds one poll's worth of AlphaEss readings. +type InverterData struct { + Pv1Power float32 // W + Pv2Power float32 // W + PvL1Power float32 // W + PvL2Power float32 // W + PvL3Power float32 // W + BatterySoC float32 // % + GridImportKwh float32 // kWh cumulative + GridExportKwh float32 // kWh cumulative + PvEnergyKwh float32 // kWh cumulative +} + +// alphaReg describes one AlphaEss Modbus holding register. +// All AlphaEss values are signed int32 (qty=2 → two 16-bit words, big-endian) +// or unsigned int16 (qty=1), then multiplied by Factor. +type alphaReg struct { + addr uint16 + qty uint16 + factor float32 +} + +// Register addresses taken from the AlphaEss Modbus spec (and validated by pvcollect). +var alphaRegs = struct { + gridExport, gridImport alphaReg + pv1, pv2 alphaReg + pvL1, pvL2, pvL3 alphaReg + pvEnergy alphaReg + batterySoC alphaReg +}{ + gridExport: alphaReg{16, 2, 0.01}, + gridImport: alphaReg{18, 2, 0.01}, + pv1: alphaReg{1055, 2, 1}, + pv2: alphaReg{1059, 2, 1}, + pvL1: alphaReg{1030, 2, 1}, + pvL2: alphaReg{1032, 2, 1}, + pvL3: alphaReg{1034, 2, 1}, + pvEnergy: alphaReg{1086, 2, 0.1}, + batterySoC: alphaReg{258, 1, 0.1}, +} + +// AlphaEss polls the AlphaEss inverter via Modbus TCP. +type AlphaEss struct { + cfg AlphaConf + client *modbus.ModbusClient +} + +func NewAlphaEss(cfg AlphaConf) (*AlphaEss, error) { + a := &AlphaEss{cfg: cfg} + if err := a.connect(); err != nil { + return nil, err + } + return a, nil +} + +func (a *AlphaEss) connect() error { + url := fmt.Sprintf("tcp://%s:%d", a.cfg.Host, a.cfg.Port) + c, err := modbus.NewClient(&modbus.ClientConfiguration{URL: url}) + if err != nil { + return fmt.Errorf("modbus client: %w", err) + } + c.SetUnitId(a.cfg.SlaveID) + if err := c.Open(); err != nil { + return fmt.Errorf("modbus open: %w", err) + } + a.client = c + return nil +} + +func (a *AlphaEss) reconnect() { + if a.client != nil { + a.client.Close() + } + if err := a.connect(); err != nil { + log.Printf("alphaess reconnect: %v", err) + } +} + +func (a *AlphaEss) readReg(r alphaReg) (float32, error) { + regs, err := a.client.ReadRegisters(r.addr, r.qty, modbus.HOLDING_REGISTER) + if err != nil { + return 0, err + } + var raw int32 + switch r.qty { + case 1: + raw = int32(regs[0]) + case 2: + raw = int32(regs[0])<<16 | int32(regs[1]) + } + return float32(raw) * r.factor, nil +} + +// Poll reads all AlphaEss registers. On connection error it attempts one +// reconnect before returning the error. +func (a *AlphaEss) Poll() (*InverterData, error) { + data, err := a.poll() + if err != nil { + log.Printf("alphaess poll error, reconnecting: %v", err) + a.reconnect() + data, err = a.poll() + } + return data, err +} + +func (a *AlphaEss) poll() (*InverterData, error) { + r := alphaRegs + read := func(reg alphaReg) (float32, error) { return a.readReg(reg) } + + d := &InverterData{} + var err error + if d.GridExportKwh, err = read(r.gridExport); err != nil { return nil, err } + if d.GridImportKwh, err = read(r.gridImport); err != nil { return nil, err } + if d.Pv1Power, err = read(r.pv1); err != nil { return nil, err } + if d.Pv2Power, err = read(r.pv2); err != nil { return nil, err } + if d.PvL1Power, err = read(r.pvL1); err != nil { return nil, err } + if d.PvL2Power, err = read(r.pvL2); err != nil { return nil, err } + if d.PvL3Power, err = read(r.pvL3); err != nil { return nil, err } + if d.PvEnergyKwh, err = read(r.pvEnergy); err != nil { return nil, err } + if d.BatterySoC, err = read(r.batterySoC); err != nil { return nil, err } + return d, nil +} + +func (a *AlphaEss) Close() { + if a.client != nil { + a.client.Close() + } +} diff --git a/config.go b/config.go new file mode 100644 index 0000000..1935d2a --- /dev/null +++ b/config.go @@ -0,0 +1,52 @@ +package main + +import ( + "encoding/json" + "os" +) + +type Config struct { + SampleRate int `json:"sample_rate"` // seconds + AlphaEss AlphaConf `json:"alphaess"` + MQTT MQTTConf `json:"mqtt"` + DB DBConf `json:"db"` +} + +type AlphaConf struct { + Host string `json:"host"` + Port int `json:"port"` + SlaveID uint8 `json:"slave_id"` +} + +type MQTTConf struct { + Broker string `json:"broker"` + BrokerIP string `json:"broker_ip"` // optional scoped IPv6 e.g. fe80::1%eth0 + BrokerTLSName string `json:"broker_tls_name"` // TLS ServerName when broker_ip is set + Port int `json:"port"` + ClientID string `json:"client_id"` + TopicPrefix string `json:"topic_prefix"` + CACert string `json:"ca_cert"` + ClientCert string `json:"client_cert"` + ClientKey string `json:"client_key"` + Devices []DeviceConf `json:"devices"` +} + +// DeviceConf describes one SDM630 meter reachable via the MQTT/Modbus bridge. +type DeviceConf struct { + SlaveAddress int `json:"slave_address"` + Name string `json:"name"` // used as the 'device' column value in power_meter + Segment string `json:"segment"` // MQTT topic segment +} + +type DBConf struct { + DSN string `json:"dsn"` // PostgreSQL connection string +} + +func loadConfig(path string) (Config, error) { + data, err := os.ReadFile(path) + if err != nil { + return Config{}, err + } + var cfg Config + return cfg, json.Unmarshal(data, &cfg) +} diff --git a/db.go b/db.go new file mode 100644 index 0000000..4e5b1a0 --- /dev/null +++ b/db.go @@ -0,0 +1,55 @@ +package main + +import ( + "context" + "fmt" + "time" + + "github.com/jackc/pgx/v5/pgxpool" +) + +// DB wraps a pgx connection pool for writing energy data. +type DB struct { + pool *pgxpool.Pool +} + +func NewDB(ctx context.Context, dsn string) (*DB, error) { + pool, err := pgxpool.New(ctx, dsn) + if err != nil { + return nil, fmt.Errorf("pgxpool: %w", err) + } + if err := pool.Ping(ctx); err != nil { + return nil, fmt.Errorf("db ping: %w", err) + } + return &DB{pool: pool}, nil +} + +func (db *DB) WriteInverter(ctx context.Context, t time.Time, d *InverterData) error { + _, err := db.pool.Exec(ctx, ` + INSERT INTO inverter + (time, pv1_power, pv2_power, pv_l1_power, pv_l2_power, pv_l3_power, + battery_soc, grid_import_kwh, grid_export_kwh, pv_energy_kwh) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)`, + t, + d.Pv1Power, d.Pv2Power, + d.PvL1Power, d.PvL2Power, d.PvL3Power, + d.BatterySoC, + d.GridImportKwh, d.GridExportKwh, d.PvEnergyKwh, + ) + return err +} + +func (db *DB) WriteMeter(ctx context.Context, t time.Time, r MeterReading) error { + _, err := db.pool.Exec(ctx, ` + INSERT INTO power_meter (time, device, l1_power, l2_power, l3_power, import_kwh, export_kwh) + VALUES ($1,$2,$3,$4,$5,$6,$7)`, + t, r.Device, + r.L1Power, r.L2Power, r.L3Power, + r.ImportKwh, r.ExportKwh, + ) + return err +} + +func (db *DB) Close() { + db.pool.Close() +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..fe5d104 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,9 @@ +services: + energy-collector: + build: . + container_name: energy-collector + restart: unless-stopped + network_mode: host + volumes: + - /srv/energy/collector:/srv/energy/collector:ro + - /srv/energy/meter:/srv/energy/meter:ro diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..a718657 --- /dev/null +++ b/go.mod @@ -0,0 +1,22 @@ +module energy-collector + +go 1.24.0 + +toolchain go1.24.5 + +require ( + github.com/eclipse/paho.mqtt.golang v1.5.1 + github.com/jackc/pgx/v5 v5.8.0 + github.com/simonvetter/modbus v1.6.0 +) + +require ( + github.com/goburrow/serial v0.1.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sync v0.17.0 // indirect + golang.org/x/text v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..3ba5695 --- /dev/null +++ b/go.sum @@ -0,0 +1,36 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= +github.com/goburrow/serial v0.1.0 h1:v2T1SQa/dlUqQiYIT8+Cu7YolfqAi3K96UmhwYyuSrA= +github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= +github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/simonvetter/modbus v1.6.0 h1:RDHJevtc7LDIVoHAbhDun8fy+QwnGe+ZU+sLm9ZZzjc= +github.com/simonvetter/modbus v1.6.0/go.mod h1:hh90ZaTaPLcK2REj6/fpTbiV0J6S7GWmd8q+GVRObPw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/text v0.29.0 h1:1neNs90w9YzJ9BocxfsQNHKuAT4pkghyXc4nhZ6sJvk= +golang.org/x/text v0.29.0/go.mod h1:7MhJOA9CD2qZyOKYazxdYMF85OwPdEr9jTtBpO7ydH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go new file mode 100644 index 0000000..2053f2f --- /dev/null +++ b/main.go @@ -0,0 +1,105 @@ +package main + +import ( + "context" + "flag" + "log" + "os" + "os/signal" + "sync" + "syscall" + "time" +) + +var logger = log.New(os.Stderr, "", log.Ltime|log.Lshortfile) + +func main() { + cfgPath := flag.String("c", "/srv/energy/collector/config.json", "path to config file") + flag.Parse() + + cfg, err := loadConfig(*cfgPath) + if err != nil { + log.Fatalf("config: %v", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + db, err := NewDB(ctx, cfg.DB.DSN) + if err != nil { + log.Fatalf("db: %v", err) + } + defer db.Close() + + alpha, err := NewAlphaEss(cfg.AlphaEss) + if err != nil { + log.Fatalf("alphaess: %v", err) + } + defer alpha.Close() + + meters, err := NewMeterPoller(cfg.MQTT) + if err != nil { + log.Fatalf("meters: %v", err) + } + defer meters.Close() + + interval := time.Duration(cfg.SampleRate) * time.Second + go runPollLoop(ctx, alpha, meters, db, interval) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + logger.Println("shutting down") +} + +func runPollLoop(ctx context.Context, alpha *AlphaEss, meters *MeterPoller, db *DB, interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + poll(ctx, alpha, meters, db) + } + } +} + +func poll(ctx context.Context, alpha *AlphaEss, meters *MeterPoller, db *DB) { + t := time.Now() + + // Poll AlphaEss and all SDM630 meters in parallel. + var ( + inverterData *InverterData + meterData []MeterReading + wg sync.WaitGroup + ) + + wg.Add(2) + go func() { + defer wg.Done() + d, err := alpha.Poll() + if err != nil { + logger.Printf("alphaess: %v", err) + return + } + inverterData = d + }() + go func() { + defer wg.Done() + meterData = meters.PollAll() + }() + wg.Wait() + + // Write everything with the same timestamp t. + if inverterData != nil { + if err := db.WriteInverter(ctx, t, inverterData); err != nil { + logger.Printf("write inverter: %v", err) + } + } + for _, r := range meterData { + if err := db.WriteMeter(ctx, t, r); err != nil { + logger.Printf("write meter %s: %v", r.Device, err) + } + } +} diff --git a/meter.go b/meter.go new file mode 100644 index 0000000..0b678fc --- /dev/null +++ b/meter.go @@ -0,0 +1,335 @@ +package main + +import ( + "crypto/tls" + "crypto/x509" + "encoding/binary" + "encoding/json" + "fmt" + "log" + "math" + "os" + "strconv" + "strings" + "sync" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +// MeterReading holds one poll's worth of SDM630 readings for one device. +type MeterReading struct { + Device string // matches DeviceConf.Name ("house" | "barn") + L1Power float32 // W + L2Power float32 // W + L3Power float32 // W + ImportKwh float32 // kWh cumulative + ExportKwh float32 // kWh cumulative +} + +// ── MQTT message handler ────────────────────────────────────────────────────── +// +// The perinet bridge is a Modbus-RTU gateway. We operate in passive mode: +// we do NOT send any requests ourselves. Instead we subscribe to Energy/# +// and observe the request/response traffic generated by the existing +// energy-subscriber client. +// +// Request payload (energy-subscriber sends, we observe): +// +// {"modbus_frame":{"value":[slave,4,addrH,addrL,cntH,cntL,crcL,crcH],"unit":"RAW"}} +// +// Response payload (bridge publishes): +// +// {"data":{"modbus_frame":{"value":[slave,4,byteCount,...,crcL,crcH], +// "unit":"RAW"}},"timestamp":{...}} +// +// When an FC4 request arrives we record the requested address for that slave. +// When the following FC4 response arrives we store the decoded float32 value +// keyed by that address. Once all five SDM630 registers have been collected +// for a slave a complete MeterReading is pushed to readyCh. + +// SDM630 register map (float32 input registers, 2 × 16-bit words each): +// +// addr 12 → L1 active power (W) +// addr 14 → L2 active power (W) +// addr 16 → L3 active power (W) +// addr 72 → Total import energy (kWh) +// addr 74 → Total export energy (kWh) +var meterAddrs = []uint16{12, 14, 16, 72, 74} + +type slaveState struct { + mu sync.Mutex + lastReqAddr uint16 + vals map[uint16]float32 // addr → decoded float32 +} + +type meterHandler struct { + devices map[int]string // slave address → device name + + statesMu sync.Mutex + states map[int]*slaveState + + readyCh chan MeterReading // complete readings are sent here +} + +func newMeterHandler(devices []DeviceConf) *meterHandler { + dm := make(map[int]string, len(devices)) + for _, d := range devices { + dm[d.SlaveAddress] = d.Name + } + return &meterHandler{ + devices: dm, + states: make(map[int]*slaveState), + readyCh: make(chan MeterReading, 16), + } +} + +func (h *meterHandler) getOrCreate(slave int) *slaveState { + h.statesMu.Lock() + defer h.statesMu.Unlock() + if s, ok := h.states[slave]; ok { + return s + } + s := &slaveState{vals: make(map[uint16]float32)} + h.states[slave] = s + return s +} + +// decodeFrame extracts the raw Modbus byte frame from an MQTT payload. +// It handles two JSON envelopes used by the perinet bridge: +// +// Request (energy-subscriber sends): {"modbus_frame":{"value":[...],"unit":"RAW"}} +// Response (bridge): {"data":{"modbus_frame":{"value":[...],...}},...} +func decodeFrame(payload []byte) []byte { + // Try the bridge response format first ("data" wrapper). + var outer struct { + Data struct { + ModbusFrame struct { + Value []int `json:"value"` + } `json:"modbus_frame"` + } `json:"data"` + } + if json.Unmarshal(payload, &outer) == nil && len(outer.Data.ModbusFrame.Value) > 0 { + return intsToBytes(outer.Data.ModbusFrame.Value) + } + // Fall back to the flat request format. + var inner struct { + ModbusFrame struct { + Value []int `json:"value"` + } `json:"modbus_frame"` + } + if json.Unmarshal(payload, &inner) == nil && len(inner.ModbusFrame.Value) > 0 { + return intsToBytes(inner.ModbusFrame.Value) + } + return nil +} + +func intsToBytes(vals []int) []byte { + b := make([]byte, len(vals)) + for i, v := range vals { + b[i] = byte(v) + } + return b +} + +// FC4 request: [slave, 0x04, addrH, addrL, countH, countL] — 6 bytes (no CRC) +// +// or [slave, 0x04, addrH, addrL, countH, countL, crcL, crcH] — 8 bytes (with CRC) +func isFC4Request(f []byte) bool { return (len(f) == 6 || len(f) == 8) && f[1] == 0x04 } + +// FC4 response: [slave, 0x04, byteCount, data..., (crcL, crcH)] +// The bridge appends 2 CRC bytes, so we accept both 3+N and 5+N lengths. +func isFC4Response(f []byte) bool { + if len(f) < 3 || f[1] != 0x04 || f[2] == 0 { + return false + } + n := int(f[2]) + return len(f) == 3+n || len(f) == 5+n +} + +func (h *meterHandler) Handle(_ mqtt.Client, msg mqtt.Message) { + slave, ok := slaveFromTopic(msg.Topic()) + if !ok { + return + } + frame := decodeFrame(msg.Payload()) + if len(frame) < 3 || frame[1] != 0x04 { + return + } + + // Only process slaves we know about. + if _, known := h.devices[slave]; !known { + return + } + + st := h.getOrCreate(slave) + + switch { + case isFC4Request(frame): + addr := binary.BigEndian.Uint16(frame[2:4]) + st.mu.Lock() + st.lastReqAddr = addr + st.mu.Unlock() + + case isFC4Response(frame): + n := int(frame[2]) + data := frame[3 : 3+n] + if len(data) < 4 { + return + } + bits := binary.BigEndian.Uint32(data[0:4]) + value := math.Float32frombits(bits) + + st.mu.Lock() + addr := st.lastReqAddr + st.vals[addr] = value + + // Check whether all required registers have been collected. + complete := true + for _, a := range meterAddrs { + if _, ok := st.vals[a]; !ok { + complete = false + break + } + } + if complete { + r := MeterReading{ + Device: h.devices[slave], + L1Power: st.vals[12], + L2Power: st.vals[14], + L3Power: st.vals[16], + ImportKwh: st.vals[72], + ExportKwh: st.vals[74], + } + st.vals = make(map[uint16]float32) // reset for next cycle + st.mu.Unlock() + + select { + case h.readyCh <- r: + default: + log.Printf("meter: readyCh full, dropping reading for %s", r.Device) + } + return + } + st.mu.Unlock() + } +} + +// ── MeterPoller ─────────────────────────────────────────────────────────────── + +// MeterPoller subscribes to the perinet MQTT bridge and passively collects +// SDM630 meter readings as they are published by the energy-subscriber client. +type MeterPoller struct { + client mqtt.Client + handler *meterHandler + cfg MQTTConf +} + +func NewMeterPoller(cfg MQTTConf) (*MeterPoller, error) { + tlsCfg, err := buildTLS(cfg) + if err != nil { + return nil, err + } + + h := newMeterHandler(cfg.Devices) + + dialTarget := cfg.Broker + if cfg.BrokerIP != "" { + encoded := strings.ReplaceAll(cfg.BrokerIP, "%", "%25") + dialTarget = "[" + encoded + "]" + } + brokerURL := fmt.Sprintf("ssl://%s:%d", dialTarget, cfg.Port) + + topic := cfg.TopicPrefix + "/#" + opts := mqtt.NewClientOptions(). + AddBroker(brokerURL). + SetClientID(cfg.ClientID). + SetTLSConfig(tlsCfg). + SetCleanSession(true). + SetAutoReconnect(true). + SetOnConnectHandler(func(c mqtt.Client) { + log.Printf("mqtt connected to %s", brokerURL) + tok := c.Subscribe(topic, 0, h.Handle) + tok.Wait() + if err := tok.Error(); err != nil { + log.Printf("mqtt subscribe: %v", err) + } + }). + SetConnectionLostHandler(func(_ mqtt.Client, err error) { + log.Printf("mqtt connection lost: %v", err) + }) + + if cfg.BrokerTLSName != "" { + tlsCfg.ServerName = cfg.BrokerTLSName + } else if cfg.BrokerIP != "" { + tlsCfg.ServerName = cfg.Broker + } + + client := mqtt.NewClient(opts) + if tok := client.Connect(); tok.Wait() && tok.Error() != nil { + return nil, fmt.Errorf("mqtt connect: %w", tok.Error()) + } + + return &MeterPoller{client: client, handler: h, cfg: cfg}, nil +} + +// PollAll waits for a complete reading from every configured device and returns +// them all. It blocks until all devices have reported or the 30-second +// deadline expires. +func (p *MeterPoller) PollAll() []MeterReading { + want := len(p.cfg.Devices) + seen := make(map[string]bool, want) + var readings []MeterReading + deadline := time.After(30 * time.Second) + + for len(readings) < want { + select { + case r := <-p.handler.readyCh: + if !seen[r.Device] { + seen[r.Device] = true + readings = append(readings, r) + } + case <-deadline: + if len(readings) == 0 { + log.Printf("meter: no readings received within 30 s") + } + return readings + } + } + return readings +} + +func (p *MeterPoller) Close() { + p.client.Disconnect(250) +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +func slaveFromTopic(topic string) (int, bool) { + parts := strings.Split(topic, "/") + if len(parts) == 0 { + return 0, false + } + s := strings.TrimPrefix(strings.ToLower(parts[len(parts)-1]), "0x") + v, err := strconv.ParseInt(s, 16, 32) + if err != nil { + return 0, false + } + return int(v), true +} + +func buildTLS(cfg MQTTConf) (*tls.Config, error) { + ca, err := os.ReadFile(cfg.CACert) + if err != nil { + return nil, fmt.Errorf("ca cert: %w", err) + } + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM(ca) { + return nil, fmt.Errorf("parsing CA cert failed") + } + cert, err := tls.LoadX509KeyPair(cfg.ClientCert, cfg.ClientKey) + if err != nil { + return nil, fmt.Errorf("client cert: %w", err) + } + return &tls.Config{RootCAs: pool, Certificates: []tls.Certificate{cert}}, nil +} diff --git a/migrate_influx.py b/migrate_influx.py new file mode 100644 index 0000000..6c919c9 --- /dev/null +++ b/migrate_influx.py @@ -0,0 +1,281 @@ +#!/usr/bin/env python3 +"""Migrate historical energy data from InfluxDB to TimescaleDB. + +Reads AlphaEss, Power_House, Power_Barn measurements from InfluxDB, +pivots them to wide format, and bulk-inserts into TimescaleDB via +`docker exec timescaledb psql`. + +Usage: + python3 migrate_influx.py +""" + +import csv +import io +import subprocess +import sys +from datetime import datetime, timezone, timedelta + +import requests + +# ── Config ──────────────────────────────────────────────────────────────────── + +INFLUX_URL = "http://localhost:8086" +INFLUX_TOKEN = "hDu4JYvxciHsohn7zE0nyZfejZDik3s8fqxCkTebW1LRekckyGX_U0-wsfEcDuDV5WZER3MjQDss01jJJCeZBA==" +INFLUX_ORG = "tkl" +INFLUX_BUCKET = "home" + +PG_CONTAINER = "timescaledb" +PG_DSN = "postgres://energy:changeme@localhost/energy" + +# Process this much data per Flux query (keeps memory reasonable). +CHUNK_DAYS = 30 + +# ── Field mappings ──────────────────────────────────────────────────────────── + +INVERTER_FIELDS = { + "Pv1Power": "pv1_power", + "Pv2Power": "pv2_power", + "InverterPowerL1": "pv_l1_power", + "InverterPowerL2": "pv_l2_power", + "InverterPowerL3": "pv_l3_power", + "BatteryStateOfCharge": "battery_soc", + "TotalEnergyConsumeFromGridGrid": "grid_import_kwh", + "TotalEnergyFeedToGridGrid": "grid_export_kwh", + "InverterTotalPvEnergy": "pv_energy_kwh", +} + +METER_FIELDS = { + "L1PowerW": "l1_power", + "L2PowerW": "l2_power", + "L3PowerW": "l3_power", + "TotalImport": "import_kwh", + "TotalExport": "export_kwh", +} + +# ── InfluxDB helpers ────────────────────────────────────────────────────────── + +def flux_query(flux: str) -> str: + resp = requests.post( + f"{INFLUX_URL}/api/v2/query", + params={"org": INFLUX_ORG}, + headers={ + "Authorization": f"Token {INFLUX_TOKEN}", + "Content-Type": "application/vnd.flux", + "Accept": "application/csv", + }, + data=flux, + timeout=300, + ) + resp.raise_for_status() + return resp.text + + +def time_range_of(measurement: str) -> tuple[datetime, datetime]: + """Return (first, last) timestamps for a measurement.""" + flux = f''' +from(bucket:"{INFLUX_BUCKET}") + |> range(start: 2024-01-01T00:00:00Z) + |> filter(fn:(r) => r._measurement == "{measurement}") + |> first() + |> keep(columns: ["_time"]) + |> min(column: "_time") +''' + text = flux_query(flux) + rows = [r for r in csv.DictReader(io.StringIO(text)) if r.get("_time")] + first = datetime.fromisoformat(rows[0]["_time"].replace("Z", "+00:00")) if rows else None + + flux2 = f''' +from(bucket:"{INFLUX_BUCKET}") + |> range(start: 2024-01-01T00:00:00Z) + |> filter(fn:(r) => r._measurement == "{measurement}") + |> last() + |> keep(columns: ["_time"]) + |> max(column: "_time") +''' + text2 = flux_query(flux2) + rows2 = [r for r in csv.DictReader(io.StringIO(text2)) if r.get("_time")] + last = datetime.fromisoformat(rows2[0]["_time"].replace("Z", "+00:00")) if rows2 else None + + return first, last + + +def fetch_pivoted(measurement: str, start: datetime, stop: datetime, fields: dict) -> list[dict]: + """Fetch measurement data for [start, stop) and return list of wide-format dicts.""" + field_filter = " or ".join( + f'r._field == "{f}"' for f in fields + ) + flux = f''' +from(bucket:"{INFLUX_BUCKET}") + |> range(start: {start.strftime("%Y-%m-%dT%H:%M:%SZ")}, + stop: {stop.strftime("%Y-%m-%dT%H:%M:%SZ")}) + |> filter(fn:(r) => r._measurement == "{measurement}") + |> filter(fn:(r) => {field_filter}) + |> pivot(rowKey:["_time"], columnKey:["_field"], valueColumn:"_value") + |> keep(columns: ["_time", {", ".join('"' + f + '"' for f in fields)}]) +''' + text = flux_query(flux) + rows = [] + for row in csv.DictReader(io.StringIO(text)): + if not row.get("_time"): + continue + rows.append(row) + return rows + +# ── PostgreSQL helpers ──────────────────────────────────────────────────────── + +def psql(sql: str, copy_data: str | None = None): + """Run SQL (and optionally COPY data) via docker exec psql.""" + cmd = ["docker", "exec", "-i", PG_CONTAINER, + "psql", PG_DSN, "-v", "ON_ERROR_STOP=1", "-c", sql] + proc = subprocess.run(cmd, input=copy_data, capture_output=True, text=True) + if proc.returncode != 0: + print(f"psql error: {proc.stderr}", file=sys.stderr) + raise RuntimeError(f"psql failed: {proc.returncode}") + return proc.stdout + + +def copy_csv(table: str, columns: list[str], rows: list[list]): + """COPY rows into table using psql stdin.""" + buf = io.StringIO() + writer = csv.writer(buf) + writer.writerows(rows) + csv_data = buf.getvalue() + + col_list = ", ".join(columns) + cmd = [ + "docker", "exec", "-i", PG_CONTAINER, + "psql", PG_DSN, "-v", "ON_ERROR_STOP=1", + "-c", f"\\COPY {table} ({col_list}) FROM STDIN WITH (FORMAT CSV)", + ] + proc = subprocess.run(cmd, input=csv_data, capture_output=True, text=True) + if proc.returncode != 0: + print(f"COPY error: {proc.stderr}", file=sys.stderr) + raise RuntimeError(f"COPY failed") + return proc.stdout + +# ── Migration tasks ─────────────────────────────────────────────────────────── + +def migrate_inverter(): + print("── AlphaEss → inverter ──────────────────────────────────") + first, last = time_range_of("AlphaEss") + if not first or not last: + print(" no data found"); return + print(f" range: {first.date()} → {last.date()}") + + db_cols = ["time", "pv1_power", "pv2_power", "pv_l1_power", "pv_l2_power", + "pv_l3_power", "battery_soc", "grid_import_kwh", + "grid_export_kwh", "pv_energy_kwh"] + + total = 0 + start = first.replace(hour=0, minute=0, second=0, microsecond=0) + while start <= last: + stop = min(start + timedelta(days=CHUNK_DAYS), last + timedelta(seconds=1)) + rows_raw = fetch_pivoted("AlphaEss", start, stop, INVERTER_FIELDS) + + batch = [] + for r in rows_raw: + try: + batch.append([ + r["_time"], + r.get("Pv1Power") or None, + r.get("Pv2Power") or None, + r.get("InverterPowerL1") or None, + r.get("InverterPowerL2") or None, + r.get("InverterPowerL3") or None, + r.get("BatteryStateOfCharge") or None, + r.get("TotalEnergyConsumeFromGridGrid") or None, + r.get("TotalEnergyFeedToGridGrid") or None, + r.get("InverterTotalPvEnergy") or None, + ]) + except Exception as e: + print(f" skip row: {e}") + + if batch: + copy_csv("inverter", db_cols, batch) + total += len(batch) + print(f" {start.date()} – {stop.date()}: {len(batch)} rows (total {total})") + start = stop + + print(f" done: {total} rows inserted") + + +def migrate_meter(measurement: str, device: str): + print(f"── {measurement} → power_meter ({device}) ──────────────") + first, last = time_range_of(measurement) + if not first or not last: + print(" no data found"); return + print(f" range: {first.date()} → {last.date()}") + + db_cols = ["time", "device", "l1_power", "l2_power", "l3_power", + "import_kwh", "export_kwh"] + + total = 0 + start = first.replace(hour=0, minute=0, second=0, microsecond=0) + while start <= last: + stop = min(start + timedelta(days=CHUNK_DAYS), last + timedelta(seconds=1)) + rows_raw = fetch_pivoted(measurement, start, stop, METER_FIELDS) + + batch = [] + for r in rows_raw: + try: + batch.append([ + r["_time"], + device, + r.get("L1PowerW") or None, + r.get("L2PowerW") or None, + r.get("L3PowerW") or None, + r.get("TotalImport") or None, + r.get("TotalExport") or None, + ]) + except Exception as e: + print(f" skip row: {e}") + + if batch: + copy_csv("power_meter", db_cols, batch) + total += len(batch) + print(f" {start.date()} – {stop.date()}: {len(batch)} rows (total {total})") + start = stop + + print(f" done: {total} rows inserted") + + +def refresh_aggregates(): + print("── Refreshing continuous aggregates ─────────────────────") + views = [ + ("inverter_10m", "2024-01-01", None), + ("power_meter_10m", "2024-01-01", None), + ("inverter_1h", "2024-01-01", None), + ("power_meter_1h", "2024-01-01", None), + ("inverter_daily", "2024-01-01", None), + ("power_meter_daily", "2024-01-01", None), + ] + for view, start, stop in views: + stop_clause = f"'{stop}'" if stop else "NULL" + sql = (f"CALL refresh_continuous_aggregate('{view}', " + f"'{start}', {stop_clause});") + print(f" {view}...", end=" ", flush=True) + cmd = ["docker", "exec", "-i", PG_CONTAINER, + "psql", "-U", "fitdata", "-d", "energy", + "-v", "ON_ERROR_STOP=1", "-c", sql] + proc = subprocess.run(cmd, capture_output=True, text=True) + if proc.returncode != 0: + print(f"error: {proc.stderr}", file=sys.stderr) + raise RuntimeError(f"refresh failed: {proc.returncode}") + print("ok") + print(" done") + + +# ── Main ────────────────────────────────────────────────────────────────────── + +if __name__ == "__main__": + # Skip rows already in TimescaleDB to avoid duplicates. + # The simplest approach: delete nothing, use ON CONFLICT DO NOTHING. + # TimescaleDB hypertables don't have a unique constraint on time alone, + # so we rely on the data not being present (fresh DB) or accept duplicates + # for any overlap period, which the retention policy will eventually clean. + # + # If you need to re-run safely, truncate first: + # docker exec timescaledb psql -c "TRUNCATE inverter, power_meter;" + + refresh_aggregates() + print("\nMigration complete.") diff --git a/schema.sql b/schema.sql new file mode 100644 index 0000000..9ff122c --- /dev/null +++ b/schema.sql @@ -0,0 +1,140 @@ +-- Run once as the TimescaleDB superuser (fitdata): +-- psql -h localhost -p 5433 -U fitdata -f schema.sql + +CREATE DATABASE energy; + +\c energy + +CREATE EXTENSION IF NOT EXISTS timescaledb; + +CREATE USER energy WITH PASSWORD 'changeme'; +GRANT ALL ON DATABASE energy TO energy; +GRANT ALL ON SCHEMA public TO energy; + +-- ── Raw hypertables ─────────────────────────────────────────────────────────── + +-- Written by energy-collector (pvcollect replacement) every 10 s +CREATE TABLE inverter ( + time TIMESTAMPTZ NOT NULL, + pv1_power REAL, -- W + pv2_power REAL, -- W + pv_l1_power REAL, -- W (inverter AC output phase 1) + pv_l2_power REAL, -- W + pv_l3_power REAL, -- W + battery_soc REAL, -- % + grid_import_kwh REAL, -- kWh cumulative + grid_export_kwh REAL, -- kWh cumulative + pv_energy_kwh REAL -- kWh cumulative +); +SELECT create_hypertable('inverter', 'time', chunk_time_interval => INTERVAL '7 days'); + +-- Written by energy-collector (meter replacement) every 10 s, one row per device +CREATE TABLE power_meter ( + time TIMESTAMPTZ NOT NULL, + device TEXT NOT NULL, -- 'house' | 'barn' + l1_power REAL, -- W + l2_power REAL, -- W + l3_power REAL, -- W + import_kwh REAL, -- kWh cumulative + export_kwh REAL -- kWh cumulative +); +SELECT create_hypertable('power_meter', 'time', chunk_time_interval => INTERVAL '7 days'); +CREATE INDEX ON power_meter (device, time DESC); + +GRANT ALL ON ALL TABLES IN SCHEMA public TO energy; + +-- ── Continuous aggregates ───────────────────────────────────────────────────── + +-- 10-minute buckets — live chart (1 h, 6 h, 24 h ranges) +CREATE MATERIALIZED VIEW inverter_10m +WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS +SELECT time_bucket('10 minutes', time) AS bucket, + AVG(pv1_power + pv2_power) AS pv_power, + AVG(battery_soc) AS battery_soc +FROM inverter +GROUP BY bucket; + +CREATE MATERIALIZED VIEW power_meter_10m +WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS +SELECT time_bucket('10 minutes', time) AS bucket, + device, + AVG(l1_power + l2_power + l3_power) AS total_power +FROM power_meter +GROUP BY bucket, device; + +-- 1-hour buckets — live chart (7 d range), built on 10 m (hierarchical) +CREATE MATERIALIZED VIEW inverter_1h +WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS +SELECT time_bucket('1 hour', bucket) AS bucket, + AVG(pv_power) AS pv_power, + AVG(battery_soc) AS battery_soc +FROM inverter_10m +GROUP BY 1; + +CREATE MATERIALIZED VIEW power_meter_1h +WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS +SELECT time_bucket('1 hour', bucket) AS bucket, + device, + AVG(total_power) AS total_power +FROM power_meter_10m +GROUP BY 1, device; + +-- Daily last-value snapshots — basis for bar-chart energy deltas +-- Query with LAG() to get per-period consumption. +CREATE MATERIALIZED VIEW inverter_daily +WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS +SELECT time_bucket('1 day', time) AS bucket, + last(grid_import_kwh, time) AS grid_import_kwh, + last(grid_export_kwh, time) AS grid_export_kwh, + last(pv_energy_kwh, time) AS pv_energy_kwh +FROM inverter +GROUP BY bucket; + +CREATE MATERIALIZED VIEW power_meter_daily +WITH (timescaledb.continuous, timescaledb.materialized_only = false) AS +SELECT time_bucket('1 day', time) AS bucket, + device, + last(import_kwh, time) AS import_kwh +FROM power_meter +GROUP BY bucket, device; + +-- ── Retention — keep 30 days of raw data; aggregates stay forever ───────────── + +SELECT add_retention_policy('inverter', INTERVAL '30 days'); +SELECT add_retention_policy('power_meter', INTERVAL '30 days'); + +-- ── Refresh policies ────────────────────────────────────────────────────────── + +SELECT add_continuous_aggregate_policy('inverter_10m', + start_offset => INTERVAL '1 hour', + end_offset => INTERVAL '10 minutes', + schedule_interval => INTERVAL '10 minutes'); + +SELECT add_continuous_aggregate_policy('power_meter_10m', + start_offset => INTERVAL '1 hour', + end_offset => INTERVAL '10 minutes', + schedule_interval => INTERVAL '10 minutes'); + +SELECT add_continuous_aggregate_policy('inverter_1h', + start_offset => INTERVAL '3 hours', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour'); + +SELECT add_continuous_aggregate_policy('power_meter_1h', + start_offset => INTERVAL '3 hours', + end_offset => INTERVAL '1 hour', + schedule_interval => INTERVAL '1 hour'); + +SELECT add_continuous_aggregate_policy('inverter_daily', + start_offset => INTERVAL '3 days', + end_offset => INTERVAL '1 day', + schedule_interval => INTERVAL '1 day'); + +SELECT add_continuous_aggregate_policy('power_meter_daily', + start_offset => INTERVAL '3 days', + end_offset => INTERVAL '1 day', + schedule_interval => INTERVAL '1 day'); + +-- Grant SELECT on all tables and views (including continuous aggregates) to energy. +-- Run after all views are created so the grant covers them. +GRANT SELECT ON ALL TABLES IN SCHEMA public TO energy;