Initial implementation: MQTT-based energy meter subscriber
Subscribes to Eastron SDM630 power meter data via MQTT broker, decodes Modbus RTU frames, and writes readings to InfluxDB. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
422
main.go
Normal file
422
main.go
Normal file
@@ -0,0 +1,422 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/binary"
|
||||
"encoding/json"
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
mqtt "github.com/eclipse/paho.mqtt.golang"
|
||||
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
||||
)
|
||||
|
||||
// ── Config ────────────────────────────────────────────────────────────────────
|
||||
|
||||
// Reading is a single decoded register value returned to the poller.
|
||||
type Reading struct {
|
||||
Name string
|
||||
Value float32
|
||||
Unit string
|
||||
}
|
||||
|
||||
type Register struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"` // "input" (FC4) or "holding" (FC3)
|
||||
Address uint16 `json:"address"` // register word address
|
||||
Count uint16 `json:"count"` // number of 16-bit registers to read (2 = one float32)
|
||||
Factor float32 `json:"factor"` // scale factor applied to the decoded value
|
||||
Unit string `json:"unit"`
|
||||
}
|
||||
|
||||
type DeviceConfig struct {
|
||||
SlaveAddress int `json:"slave_address"`
|
||||
Name string `json:"name"`
|
||||
Segment string `json:"segment"`
|
||||
Registers []Register `json:"registers"`
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Broker string `json:"broker"`
|
||||
Port int `json:"port"`
|
||||
ClientID string `json:"client_id"`
|
||||
TopicPrefix string `json:"topic_prefix"`
|
||||
SampleRate int `json:"sample_rate"` // polling interval in seconds
|
||||
CACert string `json:"ca_cert"`
|
||||
ClientCert string `json:"client_cert"`
|
||||
ClientKey string `json:"client_key"`
|
||||
InfluxHost string `json:"influxdb_host"`
|
||||
InfluxPort int `json:"influxdb_port"`
|
||||
InfluxToken string `json:"influxdb_token"`
|
||||
InfluxOrg string `json:"influxdb_org"`
|
||||
InfluxBucket string `json:"influxdb_bucket"`
|
||||
Devices []DeviceConfig `json:"devices"`
|
||||
}
|
||||
|
||||
// ── MQTT payload ──────────────────────────────────────────────────────────────
|
||||
|
||||
type modbusFrame struct {
|
||||
Value []int `json:"value"` // JSON number array, not base64
|
||||
Unit string `json:"unit"`
|
||||
}
|
||||
|
||||
type frameData struct {
|
||||
ModbusFrame modbusFrame `json:"modbus_frame"`
|
||||
}
|
||||
|
||||
type mqttPayload struct {
|
||||
Data frameData `json:"data"`
|
||||
}
|
||||
|
||||
// ── SDM 630 register map ──────────────────────────────────────────────────────
|
||||
|
||||
type regInfo struct{ Name, Unit string }
|
||||
|
||||
// Word (16-bit) addresses from the SDM 630 Modbus datasheet.
|
||||
// Each measurement is a float32 spanning 2 consecutive registers (big-endian).
|
||||
var sdm630 = map[uint16]regInfo{
|
||||
0: {"L1 Voltage", "V"},
|
||||
2: {"L2 Voltage", "V"},
|
||||
4: {"L3 Voltage", "V"},
|
||||
6: {"L1 Current", "A"},
|
||||
8: {"L2 Current", "A"},
|
||||
10: {"L3 Current", "A"},
|
||||
12: {"L1 Active Power", "W"},
|
||||
14: {"L2 Active Power", "W"},
|
||||
16: {"L3 Active Power", "W"},
|
||||
18: {"L1 Apparent Power", "VA"},
|
||||
20: {"L2 Apparent Power", "VA"},
|
||||
22: {"L3 Apparent Power", "VA"},
|
||||
24: {"L1 Reactive Power", "VAr"},
|
||||
26: {"L2 Reactive Power", "VAr"},
|
||||
28: {"L3 Reactive Power", "VAr"},
|
||||
30: {"L1 Power Factor", ""},
|
||||
32: {"L2 Power Factor", ""},
|
||||
34: {"L3 Power Factor", ""},
|
||||
36: {"L1 Phase Angle", "deg"},
|
||||
38: {"L2 Phase Angle", "deg"},
|
||||
40: {"L3 Phase Angle", "deg"},
|
||||
42: {"Average Voltage L-N", "V"},
|
||||
44: {"Average Current", "A"},
|
||||
46: {"Sum of Currents", "A"},
|
||||
48: {"Total Active Power", "W"},
|
||||
50: {"Total Apparent Power", "VA"},
|
||||
52: {"Total Reactive Power", "VAr"},
|
||||
54: {"Total Power Factor", ""},
|
||||
56: {"Total Phase Angle", "deg"},
|
||||
58: {"Frequency", "Hz"},
|
||||
60: {"Total Import Active Energy", "kWh"},
|
||||
62: {"Total Export Active Energy", "kWh"},
|
||||
72: {"Total Import Energy", "kWh"},
|
||||
74: {"Total Export Energy", "kWh"},
|
||||
76: {"Total VAh", "kVAh"},
|
||||
78: {"Ah", "Ah"},
|
||||
80: {"Total System Power Demand", "W"},
|
||||
84: {"Total System VA Demand", "VA"},
|
||||
88: {"Max Total System Power Demand", "W"},
|
||||
90: {"Max Total System VA Demand", "VA"},
|
||||
100: {"Current Demand L1", "A"},
|
||||
102: {"Current Demand L2", "A"},
|
||||
104: {"Current Demand L3", "A"},
|
||||
106: {"Max Current Demand L1", "A"},
|
||||
108: {"Max Current Demand L2", "A"},
|
||||
110: {"Max Current Demand L3", "A"},
|
||||
200: {"Total Active Energy", "kWh"},
|
||||
202: {"Total Reactive Energy", "kVArh"},
|
||||
226: {"L1 Import Active Energy", "kWh"},
|
||||
228: {"L2 Import Active Energy", "kWh"},
|
||||
230: {"L3 Import Active Energy", "kWh"},
|
||||
232: {"L1 Export Active Energy", "kWh"},
|
||||
234: {"L2 Export Active Energy", "kWh"},
|
||||
236: {"L3 Export Active Energy", "kWh"},
|
||||
238: {"L1 Total Active Energy", "kWh"},
|
||||
240: {"L2 Total Active Energy", "kWh"},
|
||||
242: {"L3 Total Active Energy", "kWh"},
|
||||
}
|
||||
|
||||
// ── Message handler ───────────────────────────────────────────────────────────
|
||||
|
||||
type slaveState struct {
|
||||
mu sync.Mutex
|
||||
lastRegAddr uint16
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
deviceMap map[int]string // slave addr → device name
|
||||
registerMap map[int]map[uint16]Register // slave addr → reg addr → Register
|
||||
statesMu sync.Mutex
|
||||
states map[int]*slaveState
|
||||
waitsMu sync.Mutex
|
||||
waits map[int]chan []Reading // slave addr → readings from next response
|
||||
}
|
||||
|
||||
func newHandler(cfg Config) *handler {
|
||||
dm := make(map[int]string, len(cfg.Devices))
|
||||
rm := make(map[int]map[uint16]Register, len(cfg.Devices))
|
||||
for _, d := range cfg.Devices {
|
||||
dm[d.SlaveAddress] = d.Name
|
||||
regs := make(map[uint16]Register, len(d.Registers))
|
||||
for _, r := range d.Registers {
|
||||
regs[r.Address] = r
|
||||
}
|
||||
rm[d.SlaveAddress] = regs
|
||||
}
|
||||
return &handler{
|
||||
deviceMap: dm,
|
||||
registerMap: rm,
|
||||
states: make(map[int]*slaveState),
|
||||
waits: make(map[int]chan []Reading),
|
||||
}
|
||||
}
|
||||
|
||||
// waitForResponse registers a one-shot channel that receives the decoded
|
||||
// readings from the next FC4 response for the given slave. Must be called
|
||||
// before publishing the request to avoid a race with a very fast response.
|
||||
func (h *handler) waitForResponse(slave int) <-chan []Reading {
|
||||
ch := make(chan []Reading, 1) // buffered so the handler never blocks
|
||||
h.waitsMu.Lock()
|
||||
h.waits[slave] = ch
|
||||
h.waitsMu.Unlock()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (h *handler) state(slave int) *slaveState {
|
||||
h.statesMu.Lock()
|
||||
defer h.statesMu.Unlock()
|
||||
if s, ok := h.states[slave]; ok {
|
||||
return s
|
||||
}
|
||||
s := &slaveState{}
|
||||
h.states[slave] = s
|
||||
return s
|
||||
}
|
||||
|
||||
func (h *handler) Handle(_ mqtt.Client, msg mqtt.Message) {
|
||||
slave, ok := slaveFromTopic(msg.Topic())
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
var p mqttPayload
|
||||
if err := json.Unmarshal(msg.Payload(), &p); err != nil {
|
||||
logger.Printf("json: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
frame := intsToBytes(p.Data.ModbusFrame.Value)
|
||||
if len(frame) < 4 || frame[1] != 0x04 {
|
||||
return // only FC4 (Read Input Registers) handled
|
||||
}
|
||||
|
||||
st := h.state(slave)
|
||||
devName := h.deviceMap[slave]
|
||||
if devName == "" {
|
||||
devName = fmt.Sprintf("slave-0x%02X", slave)
|
||||
}
|
||||
|
||||
switch {
|
||||
case isFC4Request(frame):
|
||||
// Store the requested register address so we can label the response.
|
||||
regAddr := binary.BigEndian.Uint16(frame[2:4])
|
||||
st.mu.Lock()
|
||||
st.lastRegAddr = regAddr
|
||||
st.mu.Unlock()
|
||||
|
||||
case isFC4Response(frame):
|
||||
st.mu.Lock()
|
||||
baseAddr := st.lastRegAddr
|
||||
st.mu.Unlock()
|
||||
|
||||
// data bytes: frame[3 .. 3+byte_count-1], CRC stripped
|
||||
data := frame[3 : 3+int(frame[2])]
|
||||
ts := time.Now().Format("15:04:05")
|
||||
|
||||
var readings []Reading
|
||||
for i := 0; i+4 <= len(data); i += 4 {
|
||||
// Each float32 spans 2 registers; offset in word addresses = i/2
|
||||
addr := baseAddr + uint16(i/2)
|
||||
raw := decodeFloat32(data[i : i+4])
|
||||
|
||||
// Label and scale: configured register takes precedence over generic SDM 630 map.
|
||||
name, unit, value := labelAndScale(raw, addr, h.registerMap[slave])
|
||||
fmt.Printf("[%s] %-20s %-32s %10.3f %s\n", ts, devName, name, value, unit)
|
||||
readings = append(readings, Reading{Name: name, Value: value, Unit: unit})
|
||||
}
|
||||
|
||||
h.waitsMu.Lock()
|
||||
if ch, ok := h.waits[slave]; ok {
|
||||
delete(h.waits, slave)
|
||||
ch <- readings
|
||||
}
|
||||
h.waitsMu.Unlock()
|
||||
|
||||
default:
|
||||
logger.Printf("unrecognised frame from %s: %v", devName, frame)
|
||||
}
|
||||
}
|
||||
|
||||
// ── Helpers ───────────────────────────────────────────────────────────────────
|
||||
|
||||
// FC4 request: [addr, 0x04, reg_hi, reg_lo, cnt_hi, cnt_lo, crc_lo, crc_hi] = 8 bytes
|
||||
func isFC4Request(f []byte) bool { return len(f) == 8 }
|
||||
|
||||
// FC4 response: [addr, 0x04, byte_count, data..., crc_lo, crc_hi] → total = 5 + byte_count
|
||||
func isFC4Response(f []byte) bool { return len(f) >= 5 && len(f) == 5+int(f[2]) }
|
||||
|
||||
// slaveFromTopic parses the last path component, e.g. "Energy/modbus-segment/0x01" → 1
|
||||
// labelAndScale returns the display name, unit, and scaled value for a register.
|
||||
// The per-device config takes precedence; falls back to the generic SDM 630 map.
|
||||
func labelAndScale(raw float32, addr uint16, regs map[uint16]Register) (name, unit string, value float32) {
|
||||
if r, ok := regs[addr]; ok {
|
||||
return r.Name, r.Unit, raw * r.Factor
|
||||
}
|
||||
if info, ok := sdm630[addr]; ok {
|
||||
return info.Name, info.Unit, raw
|
||||
}
|
||||
return fmt.Sprintf("reg:%d", addr), "", raw
|
||||
}
|
||||
|
||||
func slaveFromTopic(topic string) (int, bool) {
|
||||
parts := strings.Split(topic, "/")
|
||||
if len(parts) == 0 {
|
||||
return 0, false
|
||||
}
|
||||
s := strings.TrimPrefix(strings.ToLower(parts[len(parts)-1]), "0x")
|
||||
v, err := strconv.ParseInt(s, 16, 32)
|
||||
if err != nil {
|
||||
return 0, false
|
||||
}
|
||||
return int(v), true
|
||||
}
|
||||
|
||||
func intsToBytes(ints []int) []byte {
|
||||
b := make([]byte, len(ints))
|
||||
for i, v := range ints {
|
||||
b[i] = byte(v)
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func decodeFloat32(b []byte) float32 {
|
||||
return math.Float32frombits(binary.BigEndian.Uint32(b))
|
||||
}
|
||||
|
||||
// ── TLS / MQTT setup ──────────────────────────────────────────────────────────
|
||||
|
||||
func newTLSConfig(cfg Config) (*tls.Config, error) {
|
||||
caCert, err := os.ReadFile(cfg.CACert)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading CA cert: %w", err)
|
||||
}
|
||||
pool := x509.NewCertPool()
|
||||
if !pool.AppendCertsFromPEM(caCert) {
|
||||
return nil, fmt.Errorf("parsing CA cert failed")
|
||||
}
|
||||
cert, err := tls.LoadX509KeyPair(cfg.ClientCert, cfg.ClientKey)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("loading client cert/key: %w", err)
|
||||
}
|
||||
return &tls.Config{
|
||||
RootCAs: pool,
|
||||
Certificates: []tls.Certificate{cert},
|
||||
}, nil
|
||||
}
|
||||
|
||||
var logger = log.New(os.Stderr, "", log.Ltime|log.Lshortfile)
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("c", "./config/config.json", "Path to config file")
|
||||
debugMode := flag.Bool("debug", false, "Poll only the first register of slave 0x01 every 5 s")
|
||||
flag.Parse()
|
||||
|
||||
data, err := os.ReadFile(*configPath)
|
||||
if err != nil {
|
||||
log.Fatalf("reading config: %v", err)
|
||||
}
|
||||
var cfg Config
|
||||
if err := json.Unmarshal(data, &cfg); err != nil {
|
||||
log.Fatalf("parsing config: %v", err)
|
||||
}
|
||||
|
||||
tlsCfg, err := newTLSConfig(cfg)
|
||||
if err != nil {
|
||||
log.Fatalf("TLS setup: %v", err)
|
||||
}
|
||||
|
||||
h := newHandler(cfg)
|
||||
brokerURL := fmt.Sprintf("ssl://%s:%d", cfg.Broker, cfg.Port)
|
||||
topic := cfg.TopicPrefix + "/#"
|
||||
|
||||
opts := mqtt.NewClientOptions().
|
||||
AddBroker(brokerURL).
|
||||
SetClientID(cfg.ClientID).
|
||||
SetTLSConfig(tlsCfg).
|
||||
SetCleanSession(true).
|
||||
SetAutoReconnect(true).
|
||||
SetOnConnectHandler(func(c mqtt.Client) {
|
||||
logger.Printf("connected to %s", brokerURL)
|
||||
token := c.Subscribe(topic, 0, h.Handle)
|
||||
token.Wait()
|
||||
if err := token.Error(); err != nil {
|
||||
logger.Printf("subscribe error: %v", err)
|
||||
} else {
|
||||
logger.Printf("subscribed to %s", topic)
|
||||
}
|
||||
}).
|
||||
SetConnectionLostHandler(func(_ mqtt.Client, err error) {
|
||||
logger.Printf("connection lost: %v", err)
|
||||
})
|
||||
|
||||
client := mqtt.NewClient(opts)
|
||||
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
||||
log.Fatalf("connect: %v", token.Error())
|
||||
}
|
||||
|
||||
influxURL := fmt.Sprintf("http://%s:%d", cfg.InfluxHost, cfg.InfluxPort)
|
||||
influxClient := influxdb2.NewClient(influxURL, cfg.InfluxToken)
|
||||
defer influxClient.Close()
|
||||
writeAPI := influxClient.WriteAPIBlocking(cfg.InfluxOrg, cfg.InfluxBucket)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
requester := NewRequester(client, cfg.TopicPrefix)
|
||||
|
||||
pollDevices := cfg.Devices
|
||||
pollInterval := time.Duration(cfg.SampleRate) * time.Second
|
||||
if *debugMode {
|
||||
pollInterval = 5 * time.Second
|
||||
for _, d := range cfg.Devices {
|
||||
if d.SlaveAddress == 0x01 && len(d.Registers) > 0 {
|
||||
pollDevices = []DeviceConfig{{
|
||||
SlaveAddress: d.SlaveAddress,
|
||||
Name: d.Name,
|
||||
Segment: d.Segment,
|
||||
Registers: d.Registers[:1],
|
||||
}}
|
||||
break
|
||||
}
|
||||
}
|
||||
logger.Printf("debug mode: polling %s/%s every %s", pollDevices[0].Name, pollDevices[0].Registers[0].Name, pollInterval)
|
||||
}
|
||||
|
||||
poller := NewPoller(requester, h, writeAPI, cfg, pollDevices, pollInterval)
|
||||
go poller.Run(ctx)
|
||||
|
||||
sig := make(chan os.Signal, 1)
|
||||
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
|
||||
<-sig
|
||||
|
||||
cancel()
|
||||
client.Disconnect(250)
|
||||
logger.Println("disconnected")
|
||||
}
|
||||
Reference in New Issue
Block a user