package main import ( "crypto/tls" "crypto/x509" "encoding/binary" "encoding/json" "fmt" "log" "math" "os" "strconv" "strings" "sync" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) // MeterReading holds one poll's worth of SDM630 readings for one device. type MeterReading struct { Device string // matches DeviceConf.Name ("house" | "barn") L1Power float32 // W L2Power float32 // W L3Power float32 // W ImportKwh float32 // kWh cumulative ExportKwh float32 // kWh cumulative } // ── MQTT message handler ────────────────────────────────────────────────────── // // The perinet bridge is a Modbus-RTU gateway. We operate in passive mode: // we do NOT send any requests ourselves. Instead we subscribe to Energy/# // and observe the request/response traffic generated by the existing // energy-subscriber client. // // Request payload (energy-subscriber sends, we observe): // // {"modbus_frame":{"value":[slave,4,addrH,addrL,cntH,cntL,crcL,crcH],"unit":"RAW"}} // // Response payload (bridge publishes): // // {"data":{"modbus_frame":{"value":[slave,4,byteCount,...,crcL,crcH], // "unit":"RAW"}},"timestamp":{...}} // // When an FC4 request arrives we record the requested address for that slave. // When the following FC4 response arrives we store the decoded float32 value // keyed by that address. Once all five SDM630 registers have been collected // for a slave a complete MeterReading is pushed to readyCh. // SDM630 register map (float32 input registers, 2 × 16-bit words each): // // addr 12 → L1 active power (W) // addr 14 → L2 active power (W) // addr 16 → L3 active power (W) // addr 72 → Total import energy (kWh) // addr 74 → Total export energy (kWh) var meterAddrs = []uint16{12, 14, 16, 72, 74} type slaveState struct { mu sync.Mutex lastReqAddr uint16 vals map[uint16]float32 // addr → decoded float32 } type meterHandler struct { devices map[int]string // slave address → device name statesMu sync.Mutex states map[int]*slaveState readyCh chan MeterReading // complete readings are sent here } func newMeterHandler(devices []DeviceConf) *meterHandler { dm := make(map[int]string, len(devices)) for _, d := range devices { dm[d.SlaveAddress] = d.Name } return &meterHandler{ devices: dm, states: make(map[int]*slaveState), readyCh: make(chan MeterReading, 16), } } func (h *meterHandler) getOrCreate(slave int) *slaveState { h.statesMu.Lock() defer h.statesMu.Unlock() if s, ok := h.states[slave]; ok { return s } s := &slaveState{vals: make(map[uint16]float32)} h.states[slave] = s return s } // decodeFrame extracts the raw Modbus byte frame from an MQTT payload. // It handles two JSON envelopes used by the perinet bridge: // // Request (energy-subscriber sends): {"modbus_frame":{"value":[...],"unit":"RAW"}} // Response (bridge): {"data":{"modbus_frame":{"value":[...],...}},...} func decodeFrame(payload []byte) []byte { // Try the bridge response format first ("data" wrapper). var outer struct { Data struct { ModbusFrame struct { Value []int `json:"value"` } `json:"modbus_frame"` } `json:"data"` } if json.Unmarshal(payload, &outer) == nil && len(outer.Data.ModbusFrame.Value) > 0 { return intsToBytes(outer.Data.ModbusFrame.Value) } // Fall back to the flat request format. var inner struct { ModbusFrame struct { Value []int `json:"value"` } `json:"modbus_frame"` } if json.Unmarshal(payload, &inner) == nil && len(inner.ModbusFrame.Value) > 0 { return intsToBytes(inner.ModbusFrame.Value) } return nil } func intsToBytes(vals []int) []byte { b := make([]byte, len(vals)) for i, v := range vals { b[i] = byte(v) } return b } // FC4 request: [slave, 0x04, addrH, addrL, countH, countL] — 6 bytes (no CRC) // // or [slave, 0x04, addrH, addrL, countH, countL, crcL, crcH] — 8 bytes (with CRC) func isFC4Request(f []byte) bool { return (len(f) == 6 || len(f) == 8) && f[1] == 0x04 } // FC4 response: [slave, 0x04, byteCount, data..., (crcL, crcH)] // The bridge appends 2 CRC bytes, so we accept both 3+N and 5+N lengths. func isFC4Response(f []byte) bool { if len(f) < 3 || f[1] != 0x04 || f[2] == 0 { return false } n := int(f[2]) return len(f) == 3+n || len(f) == 5+n } func (h *meterHandler) Handle(_ mqtt.Client, msg mqtt.Message) { slave, ok := slaveFromTopic(msg.Topic()) if !ok { return } frame := decodeFrame(msg.Payload()) if len(frame) < 3 || frame[1] != 0x04 { return } // Only process slaves we know about. if _, known := h.devices[slave]; !known { return } st := h.getOrCreate(slave) switch { case isFC4Request(frame): addr := binary.BigEndian.Uint16(frame[2:4]) st.mu.Lock() st.lastReqAddr = addr st.mu.Unlock() case isFC4Response(frame): n := int(frame[2]) data := frame[3 : 3+n] if len(data) < 4 { return } bits := binary.BigEndian.Uint32(data[0:4]) value := math.Float32frombits(bits) st.mu.Lock() addr := st.lastReqAddr st.vals[addr] = value // Check whether all required registers have been collected. complete := true for _, a := range meterAddrs { if _, ok := st.vals[a]; !ok { complete = false break } } if complete { r := MeterReading{ Device: h.devices[slave], L1Power: st.vals[12], L2Power: st.vals[14], L3Power: st.vals[16], ImportKwh: st.vals[72], ExportKwh: st.vals[74], } st.vals = make(map[uint16]float32) // reset for next cycle st.mu.Unlock() select { case h.readyCh <- r: default: log.Printf("meter: readyCh full, dropping reading for %s", r.Device) } return } st.mu.Unlock() } } // ── MeterPoller ─────────────────────────────────────────────────────────────── // MeterPoller subscribes to the perinet MQTT bridge and passively collects // SDM630 meter readings as they are published by the energy-subscriber client. type MeterPoller struct { client mqtt.Client handler *meterHandler cfg MQTTConf } func NewMeterPoller(cfg MQTTConf) (*MeterPoller, error) { tlsCfg, err := buildTLS(cfg) if err != nil { return nil, err } h := newMeterHandler(cfg.Devices) dialTarget := cfg.Broker if cfg.BrokerIP != "" { encoded := strings.ReplaceAll(cfg.BrokerIP, "%", "%25") dialTarget = "[" + encoded + "]" } brokerURL := fmt.Sprintf("ssl://%s:%d", dialTarget, cfg.Port) topic := cfg.TopicPrefix + "/#" opts := mqtt.NewClientOptions(). AddBroker(brokerURL). SetClientID(cfg.ClientID). SetTLSConfig(tlsCfg). SetCleanSession(true). SetAutoReconnect(true). SetOnConnectHandler(func(c mqtt.Client) { log.Printf("mqtt connected to %s", brokerURL) tok := c.Subscribe(topic, 0, h.Handle) tok.Wait() if err := tok.Error(); err != nil { log.Printf("mqtt subscribe: %v", err) } }). SetConnectionLostHandler(func(_ mqtt.Client, err error) { log.Printf("mqtt connection lost: %v", err) }) if cfg.BrokerTLSName != "" { tlsCfg.ServerName = cfg.BrokerTLSName } else if cfg.BrokerIP != "" { tlsCfg.ServerName = cfg.Broker } client := mqtt.NewClient(opts) if tok := client.Connect(); tok.Wait() && tok.Error() != nil { return nil, fmt.Errorf("mqtt connect: %w", tok.Error()) } return &MeterPoller{client: client, handler: h, cfg: cfg}, nil } // PollAll waits for a complete reading from every configured device and returns // them all. It blocks until all devices have reported or the 30-second // deadline expires. func (p *MeterPoller) PollAll() []MeterReading { want := len(p.cfg.Devices) seen := make(map[string]bool, want) var readings []MeterReading deadline := time.After(30 * time.Second) for len(readings) < want { select { case r := <-p.handler.readyCh: if !seen[r.Device] { seen[r.Device] = true readings = append(readings, r) } case <-deadline: if len(readings) == 0 { log.Printf("meter: no readings received within 30 s") } return readings } } return readings } func (p *MeterPoller) Close() { p.client.Disconnect(250) } // ── Helpers ─────────────────────────────────────────────────────────────────── func slaveFromTopic(topic string) (int, bool) { parts := strings.Split(topic, "/") if len(parts) == 0 { return 0, false } s := strings.TrimPrefix(strings.ToLower(parts[len(parts)-1]), "0x") v, err := strconv.ParseInt(s, 16, 32) if err != nil { return 0, false } return int(v), true } func buildTLS(cfg MQTTConf) (*tls.Config, error) { ca, err := os.ReadFile(cfg.CACert) if err != nil { return nil, fmt.Errorf("ca cert: %w", err) } pool := x509.NewCertPool() if !pool.AppendCertsFromPEM(ca) { return nil, fmt.Errorf("parsing CA cert failed") } cert, err := tls.LoadX509KeyPair(cfg.ClientCert, cfg.ClientKey) if err != nil { return nil, fmt.Errorf("client cert: %w", err) } return &tls.Config{RootCAs: pool, Certificates: []tls.Certificate{cert}}, nil }