Files
energy-meter/poller.go
Thomas Klaehn 066fa5ca51 Initial implementation: MQTT-based energy meter subscriber
Subscribes to Eastron SDM630 power meter data via MQTT broker,
decodes Modbus RTU frames, and writes readings to InfluxDB.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-04-10 08:01:17 +02:00

94 lines
2.3 KiB
Go

package main
import (
"context"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
const responseTimeout = 2 * time.Second
// Poller periodically publishes FC4 read requests for every configured register
// on every device, waiting for each response before sending the next request.
// After all registers for a device are collected, one InfluxDB point is written.
type Poller struct {
requester *Requester
handler *handler
writeAPI api.WriteAPIBlocking
org string
bucket string
devices []DeviceConfig
interval time.Duration
}
func NewPoller(r *Requester, h *handler, writeAPI api.WriteAPIBlocking, cfg Config, devices []DeviceConfig, interval time.Duration) *Poller {
return &Poller{
requester: r,
handler: h,
writeAPI: writeAPI,
org: cfg.InfluxOrg,
bucket: cfg.InfluxBucket,
devices: devices,
interval: interval,
}
}
// Run blocks until ctx is cancelled, polling immediately on start.
func (p *Poller) Run(ctx context.Context) {
p.poll(ctx)
ticker := time.NewTicker(p.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
p.poll(ctx)
}
}
}
func (p *Poller) poll(ctx context.Context) {
now := time.Now()
for _, dev := range p.devices {
point := influxdb2.NewPointWithMeasurement(dev.Name).SetTime(now)
hasFields := false
for _, reg := range dev.Registers {
if reg.Type != "input" {
logger.Printf("skipping holding register %s/%s (FC3 not implemented)", dev.Name, reg.Name)
continue
}
// Register the waiter before publishing to avoid a race with a fast response.
respCh := p.handler.waitForResponse(dev.SlaveAddress)
if err := p.requester.ReadInputRegisters(dev.Segment, byte(dev.SlaveAddress), reg.Address, reg.Count); err != nil {
logger.Printf("poll %s/%s: %v", dev.Name, reg.Name, err)
continue
}
select {
case readings := <-respCh:
for _, r := range readings {
point.AddField(r.Name, r.Value)
hasFields = true
}
case <-time.After(responseTimeout):
logger.Printf("timeout waiting for response: %s/%s", dev.Name, reg.Name)
case <-ctx.Done():
return
}
}
if !hasFields {
continue
}
if err := p.writeAPI.WritePoint(ctx, point); err != nil {
logger.Printf("influx write %s: %v", dev.Name, err)
}
}
}