commit 066fa5ca51d51a8728cb9ff421bc52b994d8cd5e Author: Thomas Klaehn Date: Fri Apr 10 08:01:17 2026 +0200 Initial implementation: MQTT-based energy meter subscriber Subscribes to Eastron SDM630 power meter data via MQTT broker, decodes Modbus RTU frames, and writes readings to InfluxDB. Co-Authored-By: Claude Sonnet 4.6 diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..cafde36 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM golang:1.24-alpine AS builder +WORKDIR /app +COPY go.mod go.sum ./ +RUN go mod download +COPY *.go ./ +RUN CGO_ENABLED=0 go build -o energy-mqtt-sub . + +FROM alpine:3.21 +RUN apk add --no-cache ca-certificates tzdata +WORKDIR /app +COPY --from=builder /app/energy-mqtt-sub . +USER nobody:nobody +ENTRYPOINT ["./energy-mqtt-sub", "-c", "/srv/energy/meter/config.json"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..11015ff --- /dev/null +++ b/Makefile @@ -0,0 +1,12 @@ +BIN := bin/energy-mqtt-sub + +.PHONY: build run clean + +build: + go build -o $(BIN) . + +run: build + ./$(BIN) -c ./config/config.json + +clean: + rm -f $(BIN) diff --git a/bin/energy-mqtt-sub b/bin/energy-mqtt-sub new file mode 100755 index 0000000..fd6b597 Binary files /dev/null and b/bin/energy-mqtt-sub differ diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..5b80b24 --- /dev/null +++ b/config/config.json @@ -0,0 +1,41 @@ +{ + "broker": "EnergyMqtt-perimica-nxktq.local", + "port": 8883, + "client_id": "energy-subscriber", + "topic_prefix": "Energy", + "sample_rate": 10, + "influxdb_host": "p5.local", + "influxdb_port": 8086, + "influxdb_token": "hDu4JYvxciHsohn7zE0nyZfejZDik3s8fqxCkTebW1LRekckyGX_U0-wsfEcDuDV5WZER3MjQDss01jJJCeZBA==", + "influxdb_org": "tkl", + "influxdb_bucket": "home", + "ca_cert": "/srv/energy/meter/Energy-root-ca.crt", + "client_cert": "/srv/energy/meter/Energy-admin_user.crt", + "client_key": "/srv/energy/meter/Energy-admin_user.key", + "devices": [ + { + "slave_address": 1, + "name": "Power_House", + "segment": "modbus-segment", + "registers": [ + {"name": "L1PowerW", "type": "input", "address": 12, "count": 2, "factor": 1, "unit": "W"}, + {"name": "L2PowerW", "type": "input", "address": 14, "count": 2, "factor": 1, "unit": "W"}, + {"name": "L3PowerW", "type": "input", "address": 16, "count": 2, "factor": 1, "unit": "W"}, + {"name": "TotalImport", "type": "input", "address": 72, "count": 2, "factor": 1, "unit": "kWh"}, + {"name": "TotalExport", "type": "input", "address": 74, "count": 2, "factor": 1, "unit": "kWh"} + ] + }, + { + "slave_address": 2, + "name": "Power_Barn", + "segment": "modbus-segment", + "registers": [ + {"name": "L1PowerW", "type": "input", "address": 12, "count": 2, "factor": 1, "unit": "W"}, + {"name": "L2PowerW", "type": "input", "address": 14, "count": 2, "factor": 1, "unit": "W"}, + {"name": "L3PowerW", "type": "input", "address": 16, "count": 2, "factor": 1, "unit": "W"}, + {"name": "TotalImport", "type": "input", "address": 72, "count": 2, "factor": 1, "unit": "kWh"}, + {"name": "TotalExport", "type": "input", "address": 74, "count": 2, "factor": 1, "unit": "kWh"} + ] + } + ] +} diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..1dca028 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,8 @@ +services: + energy-meter: + build: . + container_name: energy-meter + restart: unless-stopped + network_mode: host + volumes: + - /srv/energy/meter:/srv/energy/meter:ro diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..dee8917 --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module energy-mqtt-sub + +go 1.24.5 + +require ( + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect + github.com/eclipse/paho.mqtt.golang v1.5.1 // indirect + github.com/google/uuid v1.3.1 // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/influxdata/influxdb-client-go/v2 v2.14.0 // indirect + github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect + github.com/oapi-codegen/runtime v1.0.0 // indirect + golang.org/x/net v0.44.0 // indirect + golang.org/x/sync v0.17.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..fb24edd --- /dev/null +++ b/go.sum @@ -0,0 +1,26 @@ +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.5.1 h1:/VSOv3oDLlpqR2Epjn1Q7b2bSTplJIeV2ISgCl2W7nE= +github.com/eclipse/paho.mqtt.golang v1.5.1/go.mod h1:1/yJCneuyOoCOzKSsOTUc0AJfpsItBGWvYpBLimhArU= +github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4= +github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/influxdata/influxdb-client-go/v2 v2.14.0 h1:AjbBfJuq+QoaXNcrova8smSjwJdUHnwvfjMF71M1iI4= +github.com/influxdata/influxdb-client-go/v2 v2.14.0/go.mod h1:Ahpm3QXKMJslpXl3IftVLVezreAUtBOTZssDrjZEFHI= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= +github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= +github.com/oapi-codegen/runtime v1.0.0 h1:P4rqFX5fMFWqRzY9M/3YF9+aPSPPB06IzP2P7oOxrWo= +github.com/oapi-codegen/runtime v1.0.0/go.mod h1:LmCUMQuPB4M/nLXilQXhHw+BLZdDb18B34OO356yJ/A= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +golang.org/x/net v0.44.0 h1:evd8IRDyfNBMBTTY5XRF1vaZlD+EmWx6x8PkhR04H/I= +golang.org/x/net v0.44.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= diff --git a/main.go b/main.go new file mode 100644 index 0000000..7bb85e8 --- /dev/null +++ b/main.go @@ -0,0 +1,422 @@ +package main + +import ( + "crypto/tls" + "crypto/x509" + "encoding/binary" + "encoding/json" + "context" + "flag" + "fmt" + "log" + "math" + "os" + "os/signal" + "strconv" + "strings" + "sync" + "syscall" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" +) + +// ── Config ──────────────────────────────────────────────────────────────────── + +// Reading is a single decoded register value returned to the poller. +type Reading struct { + Name string + Value float32 + Unit string +} + +type Register struct { + Name string `json:"name"` + Type string `json:"type"` // "input" (FC4) or "holding" (FC3) + Address uint16 `json:"address"` // register word address + Count uint16 `json:"count"` // number of 16-bit registers to read (2 = one float32) + Factor float32 `json:"factor"` // scale factor applied to the decoded value + Unit string `json:"unit"` +} + +type DeviceConfig struct { + SlaveAddress int `json:"slave_address"` + Name string `json:"name"` + Segment string `json:"segment"` + Registers []Register `json:"registers"` +} + +type Config struct { + Broker string `json:"broker"` + Port int `json:"port"` + ClientID string `json:"client_id"` + TopicPrefix string `json:"topic_prefix"` + SampleRate int `json:"sample_rate"` // polling interval in seconds + CACert string `json:"ca_cert"` + ClientCert string `json:"client_cert"` + ClientKey string `json:"client_key"` + InfluxHost string `json:"influxdb_host"` + InfluxPort int `json:"influxdb_port"` + InfluxToken string `json:"influxdb_token"` + InfluxOrg string `json:"influxdb_org"` + InfluxBucket string `json:"influxdb_bucket"` + Devices []DeviceConfig `json:"devices"` +} + +// ── MQTT payload ────────────────────────────────────────────────────────────── + +type modbusFrame struct { + Value []int `json:"value"` // JSON number array, not base64 + Unit string `json:"unit"` +} + +type frameData struct { + ModbusFrame modbusFrame `json:"modbus_frame"` +} + +type mqttPayload struct { + Data frameData `json:"data"` +} + +// ── SDM 630 register map ────────────────────────────────────────────────────── + +type regInfo struct{ Name, Unit string } + +// Word (16-bit) addresses from the SDM 630 Modbus datasheet. +// Each measurement is a float32 spanning 2 consecutive registers (big-endian). +var sdm630 = map[uint16]regInfo{ + 0: {"L1 Voltage", "V"}, + 2: {"L2 Voltage", "V"}, + 4: {"L3 Voltage", "V"}, + 6: {"L1 Current", "A"}, + 8: {"L2 Current", "A"}, + 10: {"L3 Current", "A"}, + 12: {"L1 Active Power", "W"}, + 14: {"L2 Active Power", "W"}, + 16: {"L3 Active Power", "W"}, + 18: {"L1 Apparent Power", "VA"}, + 20: {"L2 Apparent Power", "VA"}, + 22: {"L3 Apparent Power", "VA"}, + 24: {"L1 Reactive Power", "VAr"}, + 26: {"L2 Reactive Power", "VAr"}, + 28: {"L3 Reactive Power", "VAr"}, + 30: {"L1 Power Factor", ""}, + 32: {"L2 Power Factor", ""}, + 34: {"L3 Power Factor", ""}, + 36: {"L1 Phase Angle", "deg"}, + 38: {"L2 Phase Angle", "deg"}, + 40: {"L3 Phase Angle", "deg"}, + 42: {"Average Voltage L-N", "V"}, + 44: {"Average Current", "A"}, + 46: {"Sum of Currents", "A"}, + 48: {"Total Active Power", "W"}, + 50: {"Total Apparent Power", "VA"}, + 52: {"Total Reactive Power", "VAr"}, + 54: {"Total Power Factor", ""}, + 56: {"Total Phase Angle", "deg"}, + 58: {"Frequency", "Hz"}, + 60: {"Total Import Active Energy", "kWh"}, + 62: {"Total Export Active Energy", "kWh"}, + 72: {"Total Import Energy", "kWh"}, + 74: {"Total Export Energy", "kWh"}, + 76: {"Total VAh", "kVAh"}, + 78: {"Ah", "Ah"}, + 80: {"Total System Power Demand", "W"}, + 84: {"Total System VA Demand", "VA"}, + 88: {"Max Total System Power Demand", "W"}, + 90: {"Max Total System VA Demand", "VA"}, + 100: {"Current Demand L1", "A"}, + 102: {"Current Demand L2", "A"}, + 104: {"Current Demand L3", "A"}, + 106: {"Max Current Demand L1", "A"}, + 108: {"Max Current Demand L2", "A"}, + 110: {"Max Current Demand L3", "A"}, + 200: {"Total Active Energy", "kWh"}, + 202: {"Total Reactive Energy", "kVArh"}, + 226: {"L1 Import Active Energy", "kWh"}, + 228: {"L2 Import Active Energy", "kWh"}, + 230: {"L3 Import Active Energy", "kWh"}, + 232: {"L1 Export Active Energy", "kWh"}, + 234: {"L2 Export Active Energy", "kWh"}, + 236: {"L3 Export Active Energy", "kWh"}, + 238: {"L1 Total Active Energy", "kWh"}, + 240: {"L2 Total Active Energy", "kWh"}, + 242: {"L3 Total Active Energy", "kWh"}, +} + +// ── Message handler ─────────────────────────────────────────────────────────── + +type slaveState struct { + mu sync.Mutex + lastRegAddr uint16 +} + +type handler struct { + deviceMap map[int]string // slave addr → device name + registerMap map[int]map[uint16]Register // slave addr → reg addr → Register + statesMu sync.Mutex + states map[int]*slaveState + waitsMu sync.Mutex + waits map[int]chan []Reading // slave addr → readings from next response +} + +func newHandler(cfg Config) *handler { + dm := make(map[int]string, len(cfg.Devices)) + rm := make(map[int]map[uint16]Register, len(cfg.Devices)) + for _, d := range cfg.Devices { + dm[d.SlaveAddress] = d.Name + regs := make(map[uint16]Register, len(d.Registers)) + for _, r := range d.Registers { + regs[r.Address] = r + } + rm[d.SlaveAddress] = regs + } + return &handler{ + deviceMap: dm, + registerMap: rm, + states: make(map[int]*slaveState), + waits: make(map[int]chan []Reading), + } +} + +// waitForResponse registers a one-shot channel that receives the decoded +// readings from the next FC4 response for the given slave. Must be called +// before publishing the request to avoid a race with a very fast response. +func (h *handler) waitForResponse(slave int) <-chan []Reading { + ch := make(chan []Reading, 1) // buffered so the handler never blocks + h.waitsMu.Lock() + h.waits[slave] = ch + h.waitsMu.Unlock() + return ch +} + +func (h *handler) state(slave int) *slaveState { + h.statesMu.Lock() + defer h.statesMu.Unlock() + if s, ok := h.states[slave]; ok { + return s + } + s := &slaveState{} + h.states[slave] = s + return s +} + +func (h *handler) Handle(_ mqtt.Client, msg mqtt.Message) { + slave, ok := slaveFromTopic(msg.Topic()) + if !ok { + return + } + + var p mqttPayload + if err := json.Unmarshal(msg.Payload(), &p); err != nil { + logger.Printf("json: %v", err) + return + } + + frame := intsToBytes(p.Data.ModbusFrame.Value) + if len(frame) < 4 || frame[1] != 0x04 { + return // only FC4 (Read Input Registers) handled + } + + st := h.state(slave) + devName := h.deviceMap[slave] + if devName == "" { + devName = fmt.Sprintf("slave-0x%02X", slave) + } + + switch { + case isFC4Request(frame): + // Store the requested register address so we can label the response. + regAddr := binary.BigEndian.Uint16(frame[2:4]) + st.mu.Lock() + st.lastRegAddr = regAddr + st.mu.Unlock() + + case isFC4Response(frame): + st.mu.Lock() + baseAddr := st.lastRegAddr + st.mu.Unlock() + + // data bytes: frame[3 .. 3+byte_count-1], CRC stripped + data := frame[3 : 3+int(frame[2])] + ts := time.Now().Format("15:04:05") + + var readings []Reading + for i := 0; i+4 <= len(data); i += 4 { + // Each float32 spans 2 registers; offset in word addresses = i/2 + addr := baseAddr + uint16(i/2) + raw := decodeFloat32(data[i : i+4]) + + // Label and scale: configured register takes precedence over generic SDM 630 map. + name, unit, value := labelAndScale(raw, addr, h.registerMap[slave]) + fmt.Printf("[%s] %-20s %-32s %10.3f %s\n", ts, devName, name, value, unit) + readings = append(readings, Reading{Name: name, Value: value, Unit: unit}) + } + + h.waitsMu.Lock() + if ch, ok := h.waits[slave]; ok { + delete(h.waits, slave) + ch <- readings + } + h.waitsMu.Unlock() + + default: + logger.Printf("unrecognised frame from %s: %v", devName, frame) + } +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +// FC4 request: [addr, 0x04, reg_hi, reg_lo, cnt_hi, cnt_lo, crc_lo, crc_hi] = 8 bytes +func isFC4Request(f []byte) bool { return len(f) == 8 } + +// FC4 response: [addr, 0x04, byte_count, data..., crc_lo, crc_hi] → total = 5 + byte_count +func isFC4Response(f []byte) bool { return len(f) >= 5 && len(f) == 5+int(f[2]) } + +// slaveFromTopic parses the last path component, e.g. "Energy/modbus-segment/0x01" → 1 +// labelAndScale returns the display name, unit, and scaled value for a register. +// The per-device config takes precedence; falls back to the generic SDM 630 map. +func labelAndScale(raw float32, addr uint16, regs map[uint16]Register) (name, unit string, value float32) { + if r, ok := regs[addr]; ok { + return r.Name, r.Unit, raw * r.Factor + } + if info, ok := sdm630[addr]; ok { + return info.Name, info.Unit, raw + } + return fmt.Sprintf("reg:%d", addr), "", raw +} + +func slaveFromTopic(topic string) (int, bool) { + parts := strings.Split(topic, "/") + if len(parts) == 0 { + return 0, false + } + s := strings.TrimPrefix(strings.ToLower(parts[len(parts)-1]), "0x") + v, err := strconv.ParseInt(s, 16, 32) + if err != nil { + return 0, false + } + return int(v), true +} + +func intsToBytes(ints []int) []byte { + b := make([]byte, len(ints)) + for i, v := range ints { + b[i] = byte(v) + } + return b +} + +func decodeFloat32(b []byte) float32 { + return math.Float32frombits(binary.BigEndian.Uint32(b)) +} + +// ── TLS / MQTT setup ────────────────────────────────────────────────────────── + +func newTLSConfig(cfg Config) (*tls.Config, error) { + caCert, err := os.ReadFile(cfg.CACert) + if err != nil { + return nil, fmt.Errorf("reading CA cert: %w", err) + } + pool := x509.NewCertPool() + if !pool.AppendCertsFromPEM(caCert) { + return nil, fmt.Errorf("parsing CA cert failed") + } + cert, err := tls.LoadX509KeyPair(cfg.ClientCert, cfg.ClientKey) + if err != nil { + return nil, fmt.Errorf("loading client cert/key: %w", err) + } + return &tls.Config{ + RootCAs: pool, + Certificates: []tls.Certificate{cert}, + }, nil +} + +var logger = log.New(os.Stderr, "", log.Ltime|log.Lshortfile) + +func main() { + configPath := flag.String("c", "./config/config.json", "Path to config file") + debugMode := flag.Bool("debug", false, "Poll only the first register of slave 0x01 every 5 s") + flag.Parse() + + data, err := os.ReadFile(*configPath) + if err != nil { + log.Fatalf("reading config: %v", err) + } + var cfg Config + if err := json.Unmarshal(data, &cfg); err != nil { + log.Fatalf("parsing config: %v", err) + } + + tlsCfg, err := newTLSConfig(cfg) + if err != nil { + log.Fatalf("TLS setup: %v", err) + } + + h := newHandler(cfg) + brokerURL := fmt.Sprintf("ssl://%s:%d", cfg.Broker, cfg.Port) + topic := cfg.TopicPrefix + "/#" + + opts := mqtt.NewClientOptions(). + AddBroker(brokerURL). + SetClientID(cfg.ClientID). + SetTLSConfig(tlsCfg). + SetCleanSession(true). + SetAutoReconnect(true). + SetOnConnectHandler(func(c mqtt.Client) { + logger.Printf("connected to %s", brokerURL) + token := c.Subscribe(topic, 0, h.Handle) + token.Wait() + if err := token.Error(); err != nil { + logger.Printf("subscribe error: %v", err) + } else { + logger.Printf("subscribed to %s", topic) + } + }). + SetConnectionLostHandler(func(_ mqtt.Client, err error) { + logger.Printf("connection lost: %v", err) + }) + + client := mqtt.NewClient(opts) + if token := client.Connect(); token.Wait() && token.Error() != nil { + log.Fatalf("connect: %v", token.Error()) + } + + influxURL := fmt.Sprintf("http://%s:%d", cfg.InfluxHost, cfg.InfluxPort) + influxClient := influxdb2.NewClient(influxURL, cfg.InfluxToken) + defer influxClient.Close() + writeAPI := influxClient.WriteAPIBlocking(cfg.InfluxOrg, cfg.InfluxBucket) + + ctx, cancel := context.WithCancel(context.Background()) + requester := NewRequester(client, cfg.TopicPrefix) + + pollDevices := cfg.Devices + pollInterval := time.Duration(cfg.SampleRate) * time.Second + if *debugMode { + pollInterval = 5 * time.Second + for _, d := range cfg.Devices { + if d.SlaveAddress == 0x01 && len(d.Registers) > 0 { + pollDevices = []DeviceConfig{{ + SlaveAddress: d.SlaveAddress, + Name: d.Name, + Segment: d.Segment, + Registers: d.Registers[:1], + }} + break + } + } + logger.Printf("debug mode: polling %s/%s every %s", pollDevices[0].Name, pollDevices[0].Registers[0].Name, pollInterval) + } + + poller := NewPoller(requester, h, writeAPI, cfg, pollDevices, pollInterval) + go poller.Run(ctx) + + sig := make(chan os.Signal, 1) + signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) + <-sig + + cancel() + client.Disconnect(250) + logger.Println("disconnected") +} diff --git a/poller.go b/poller.go new file mode 100644 index 0000000..058450e --- /dev/null +++ b/poller.go @@ -0,0 +1,93 @@ +package main + +import ( + "context" + "time" + + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" +) + +const responseTimeout = 2 * time.Second + +// Poller periodically publishes FC4 read requests for every configured register +// on every device, waiting for each response before sending the next request. +// After all registers for a device are collected, one InfluxDB point is written. +type Poller struct { + requester *Requester + handler *handler + writeAPI api.WriteAPIBlocking + org string + bucket string + devices []DeviceConfig + interval time.Duration +} + +func NewPoller(r *Requester, h *handler, writeAPI api.WriteAPIBlocking, cfg Config, devices []DeviceConfig, interval time.Duration) *Poller { + return &Poller{ + requester: r, + handler: h, + writeAPI: writeAPI, + org: cfg.InfluxOrg, + bucket: cfg.InfluxBucket, + devices: devices, + interval: interval, + } +} + +// Run blocks until ctx is cancelled, polling immediately on start. +func (p *Poller) Run(ctx context.Context) { + p.poll(ctx) + ticker := time.NewTicker(p.interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + p.poll(ctx) + } + } +} + +func (p *Poller) poll(ctx context.Context) { + now := time.Now() + for _, dev := range p.devices { + point := influxdb2.NewPointWithMeasurement(dev.Name).SetTime(now) + hasFields := false + + for _, reg := range dev.Registers { + if reg.Type != "input" { + logger.Printf("skipping holding register %s/%s (FC3 not implemented)", dev.Name, reg.Name) + continue + } + + // Register the waiter before publishing to avoid a race with a fast response. + respCh := p.handler.waitForResponse(dev.SlaveAddress) + + if err := p.requester.ReadInputRegisters(dev.Segment, byte(dev.SlaveAddress), reg.Address, reg.Count); err != nil { + logger.Printf("poll %s/%s: %v", dev.Name, reg.Name, err) + continue + } + + select { + case readings := <-respCh: + for _, r := range readings { + point.AddField(r.Name, r.Value) + hasFields = true + } + case <-time.After(responseTimeout): + logger.Printf("timeout waiting for response: %s/%s", dev.Name, reg.Name) + case <-ctx.Done(): + return + } + } + + if !hasFields { + continue + } + if err := p.writeAPI.WritePoint(ctx, point); err != nil { + logger.Printf("influx write %s: %v", dev.Name, err) + } + } +} diff --git a/requester.go b/requester.go new file mode 100644 index 0000000..965dac7 --- /dev/null +++ b/requester.go @@ -0,0 +1,112 @@ +package main + +import ( + "encoding/binary" + "encoding/json" + "fmt" + "sync/atomic" + "time" + + mqtt "github.com/eclipse/paho.mqtt.golang" +) + +// ── CRC-16 / Modbus ─────────────────────────────────────────────────────────── + +// crc16 computes the Modbus RTU CRC-16 (poly 0xA001, initial value 0xFFFF). +// The result must be appended as [low_byte, high_byte]. +func crc16(data []byte) uint16 { + crc := uint16(0xFFFF) + for _, b := range data { + crc ^= uint16(b) + for range 8 { + if crc&1 != 0 { + crc = (crc >> 1) ^ 0xA001 + } else { + crc >>= 1 + } + } + } + return crc +} + +// ── Frame building ──────────────────────────────────────────────────────────── + +// buildFC4Frame builds a Modbus RTU FC4 (Read Input Registers) request frame. +// +// [slave, 0x04, regAddr_hi, regAddr_lo, count_hi, count_lo, crc_lo, crc_hi] +// +// regAddr and count are in register (word) units. +func buildFC4Frame(slave byte, regAddr, count uint16) []byte { + frame := make([]byte, 6) + frame[0] = slave + frame[1] = 0x04 + binary.BigEndian.PutUint16(frame[2:], regAddr) + binary.BigEndian.PutUint16(frame[4:], count) + crc := crc16(frame) + return append(frame, byte(crc), byte(crc>>8)) // little-endian CRC +} + +// ── MQTT payload ────────────────────────────────────────────────────────────── + +type requestPayload struct { + Data struct { + ModbusFrame struct { + Value []int `json:"value"` + Unit string `json:"unit"` + } `json:"modbus_frame"` + } `json:"data"` + Timestamp struct { + Incarnation int64 `json:"incarnation"` + SystemTime int64 `json:"system_time"` + } `json:"timestamp"` +} + +// ── Requester ───────────────────────────────────────────────────────────────── + +// Requester publishes Modbus FC4 read requests to the MQTT broker. +// Topic pattern: //0x +type Requester struct { + client mqtt.Client + topicPrefix string + incarnation atomic.Int64 + startTime time.Time +} + +func NewRequester(client mqtt.Client, topicPrefix string) *Requester { + return &Requester{ + client: client, + topicPrefix: topicPrefix, + startTime: time.Now(), + } +} + +// ReadInputRegisters sends an FC4 read request for `count` registers (words) +// starting at `regAddr` on the given slave within the named segment. +// count=2 reads one float32 value; use a multiple of 2 for more values. +func (r *Requester) ReadInputRegisters(segment string, slave byte, regAddr, count uint16) error { + frame := buildFC4Frame(slave, regAddr, count) + + var p requestPayload + p.Data.ModbusFrame.Unit = "RAW" + p.Data.ModbusFrame.Value = bytesToInts(frame) + p.Timestamp.Incarnation = r.incarnation.Add(1) + p.Timestamp.SystemTime = time.Since(r.startTime).Nanoseconds() + + payload, err := json.Marshal(p) + if err != nil { + return fmt.Errorf("marshalling request: %w", err) + } + + topic := fmt.Sprintf("%s/%s/0x%02x", r.topicPrefix, segment, slave) + token := r.client.Publish(topic, 0, false, payload) + token.Wait() + return token.Error() +} + +func bytesToInts(b []byte) []int { + ints := make([]int, len(b)) + for i, v := range b { + ints[i] = int(v) + } + return ints +}