package main import ( "crypto/tls" "crypto/x509" "encoding/binary" "encoding/json" "context" "flag" "fmt" "log" "math" "os" "os/signal" "strconv" "strings" "sync" "syscall" "time" mqtt "github.com/eclipse/paho.mqtt.golang" influxdb2 "github.com/influxdata/influxdb-client-go/v2" ) // ── Config ──────────────────────────────────────────────────────────────────── // Reading is a single decoded register value returned to the poller. type Reading struct { Name string Value float32 Unit string } type Register struct { Name string `json:"name"` Type string `json:"type"` // "input" (FC4) or "holding" (FC3) Address uint16 `json:"address"` // register word address Count uint16 `json:"count"` // number of 16-bit registers to read (2 = one float32) Factor float32 `json:"factor"` // scale factor applied to the decoded value Unit string `json:"unit"` } type DeviceConfig struct { SlaveAddress int `json:"slave_address"` Name string `json:"name"` Segment string `json:"segment"` Registers []Register `json:"registers"` } type Config struct { Broker string `json:"broker"` BrokerIP string `json:"broker_ip"` // optional: scoped IPv6 dial address (e.g. fe80::1%eth0) BrokerTLSName string `json:"broker_tls_name"` // optional: TLS ServerName when broker_ip is set Port int `json:"port"` ClientID string `json:"client_id"` TopicPrefix string `json:"topic_prefix"` SampleRate int `json:"sample_rate"` // polling interval in seconds CACert string `json:"ca_cert"` ClientCert string `json:"client_cert"` ClientKey string `json:"client_key"` InfluxHost string `json:"influxdb_host"` InfluxPort int `json:"influxdb_port"` InfluxToken string `json:"influxdb_token"` InfluxOrg string `json:"influxdb_org"` InfluxBucket string `json:"influxdb_bucket"` Devices []DeviceConfig `json:"devices"` } // ── MQTT payload ────────────────────────────────────────────────────────────── type modbusFrame struct { Value []int `json:"value"` // JSON number array, not base64 Unit string `json:"unit"` } type frameData struct { ModbusFrame modbusFrame `json:"modbus_frame"` } type mqttPayload struct { Data frameData `json:"data"` } // ── SDM 630 register map ────────────────────────────────────────────────────── type regInfo struct{ Name, Unit string } // Word (16-bit) addresses from the SDM 630 Modbus datasheet. // Each measurement is a float32 spanning 2 consecutive registers (big-endian). var sdm630 = map[uint16]regInfo{ 0: {"L1 Voltage", "V"}, 2: {"L2 Voltage", "V"}, 4: {"L3 Voltage", "V"}, 6: {"L1 Current", "A"}, 8: {"L2 Current", "A"}, 10: {"L3 Current", "A"}, 12: {"L1 Active Power", "W"}, 14: {"L2 Active Power", "W"}, 16: {"L3 Active Power", "W"}, 18: {"L1 Apparent Power", "VA"}, 20: {"L2 Apparent Power", "VA"}, 22: {"L3 Apparent Power", "VA"}, 24: {"L1 Reactive Power", "VAr"}, 26: {"L2 Reactive Power", "VAr"}, 28: {"L3 Reactive Power", "VAr"}, 30: {"L1 Power Factor", ""}, 32: {"L2 Power Factor", ""}, 34: {"L3 Power Factor", ""}, 36: {"L1 Phase Angle", "deg"}, 38: {"L2 Phase Angle", "deg"}, 40: {"L3 Phase Angle", "deg"}, 42: {"Average Voltage L-N", "V"}, 44: {"Average Current", "A"}, 46: {"Sum of Currents", "A"}, 48: {"Total Active Power", "W"}, 50: {"Total Apparent Power", "VA"}, 52: {"Total Reactive Power", "VAr"}, 54: {"Total Power Factor", ""}, 56: {"Total Phase Angle", "deg"}, 58: {"Frequency", "Hz"}, 60: {"Total Import Active Energy", "kWh"}, 62: {"Total Export Active Energy", "kWh"}, 72: {"Total Import Energy", "kWh"}, 74: {"Total Export Energy", "kWh"}, 76: {"Total VAh", "kVAh"}, 78: {"Ah", "Ah"}, 80: {"Total System Power Demand", "W"}, 84: {"Total System VA Demand", "VA"}, 88: {"Max Total System Power Demand", "W"}, 90: {"Max Total System VA Demand", "VA"}, 100: {"Current Demand L1", "A"}, 102: {"Current Demand L2", "A"}, 104: {"Current Demand L3", "A"}, 106: {"Max Current Demand L1", "A"}, 108: {"Max Current Demand L2", "A"}, 110: {"Max Current Demand L3", "A"}, 200: {"Total Active Energy", "kWh"}, 202: {"Total Reactive Energy", "kVArh"}, 226: {"L1 Import Active Energy", "kWh"}, 228: {"L2 Import Active Energy", "kWh"}, 230: {"L3 Import Active Energy", "kWh"}, 232: {"L1 Export Active Energy", "kWh"}, 234: {"L2 Export Active Energy", "kWh"}, 236: {"L3 Export Active Energy", "kWh"}, 238: {"L1 Total Active Energy", "kWh"}, 240: {"L2 Total Active Energy", "kWh"}, 242: {"L3 Total Active Energy", "kWh"}, } // ── Message handler ─────────────────────────────────────────────────────────── type slaveState struct { mu sync.Mutex lastRegAddr uint16 } type handler struct { deviceMap map[int]string // slave addr → device name registerMap map[int]map[uint16]Register // slave addr → reg addr → Register statesMu sync.Mutex states map[int]*slaveState waitsMu sync.Mutex waits map[int]chan []Reading // slave addr → readings from next response } func newHandler(cfg Config) *handler { dm := make(map[int]string, len(cfg.Devices)) rm := make(map[int]map[uint16]Register, len(cfg.Devices)) for _, d := range cfg.Devices { dm[d.SlaveAddress] = d.Name regs := make(map[uint16]Register, len(d.Registers)) for _, r := range d.Registers { regs[r.Address] = r } rm[d.SlaveAddress] = regs } return &handler{ deviceMap: dm, registerMap: rm, states: make(map[int]*slaveState), waits: make(map[int]chan []Reading), } } // waitForResponse registers a one-shot channel that receives the decoded // readings from the next FC4 response for the given slave. Must be called // before publishing the request to avoid a race with a very fast response. func (h *handler) waitForResponse(slave int) <-chan []Reading { ch := make(chan []Reading, 1) // buffered so the handler never blocks h.waitsMu.Lock() h.waits[slave] = ch h.waitsMu.Unlock() return ch } func (h *handler) state(slave int) *slaveState { h.statesMu.Lock() defer h.statesMu.Unlock() if s, ok := h.states[slave]; ok { return s } s := &slaveState{} h.states[slave] = s return s } func (h *handler) Handle(_ mqtt.Client, msg mqtt.Message) { slave, ok := slaveFromTopic(msg.Topic()) if !ok { return } var p mqttPayload if err := json.Unmarshal(msg.Payload(), &p); err != nil { logger.Printf("json: %v", err) return } frame := intsToBytes(p.Data.ModbusFrame.Value) if len(frame) < 4 || frame[1] != 0x04 { return // only FC4 (Read Input Registers) handled } st := h.state(slave) devName := h.deviceMap[slave] if devName == "" { devName = fmt.Sprintf("slave-0x%02X", slave) } switch { case isFC4Request(frame): // Store the requested register address so we can label the response. regAddr := binary.BigEndian.Uint16(frame[2:4]) st.mu.Lock() st.lastRegAddr = regAddr st.mu.Unlock() case isFC4Response(frame): st.mu.Lock() baseAddr := st.lastRegAddr st.mu.Unlock() // data bytes: frame[3 .. 3+byte_count-1], CRC stripped data := frame[3 : 3+int(frame[2])] ts := time.Now().Format("15:04:05") var readings []Reading for i := 0; i+4 <= len(data); i += 4 { // Each float32 spans 2 registers; offset in word addresses = i/2 addr := baseAddr + uint16(i/2) raw := decodeFloat32(data[i : i+4]) // Label and scale: configured register takes precedence over generic SDM 630 map. name, unit, value := labelAndScale(raw, addr, h.registerMap[slave]) fmt.Printf("[%s] %-20s %-32s %10.3f %s\n", ts, devName, name, value, unit) readings = append(readings, Reading{Name: name, Value: value, Unit: unit}) } h.waitsMu.Lock() if ch, ok := h.waits[slave]; ok { delete(h.waits, slave) ch <- readings } h.waitsMu.Unlock() default: logger.Printf("unrecognised frame from %s: %v", devName, frame) } } // ── Helpers ─────────────────────────────────────────────────────────────────── // FC4 request: [addr, 0x04, reg_hi, reg_lo, cnt_hi, cnt_lo, crc_lo, crc_hi] = 8 bytes func isFC4Request(f []byte) bool { return len(f) == 8 } // FC4 response: [addr, 0x04, byte_count, data..., crc_lo, crc_hi] → total = 5 + byte_count func isFC4Response(f []byte) bool { return len(f) >= 5 && len(f) == 5+int(f[2]) } // slaveFromTopic parses the last path component, e.g. "Energy/modbus-segment/0x01" → 1 // labelAndScale returns the display name, unit, and scaled value for a register. // The per-device config takes precedence; falls back to the generic SDM 630 map. func labelAndScale(raw float32, addr uint16, regs map[uint16]Register) (name, unit string, value float32) { if r, ok := regs[addr]; ok { return r.Name, r.Unit, raw * r.Factor } if info, ok := sdm630[addr]; ok { return info.Name, info.Unit, raw } return fmt.Sprintf("reg:%d", addr), "", raw } func slaveFromTopic(topic string) (int, bool) { parts := strings.Split(topic, "/") if len(parts) == 0 { return 0, false } s := strings.TrimPrefix(strings.ToLower(parts[len(parts)-1]), "0x") v, err := strconv.ParseInt(s, 16, 32) if err != nil { return 0, false } return int(v), true } func intsToBytes(ints []int) []byte { b := make([]byte, len(ints)) for i, v := range ints { b[i] = byte(v) } return b } func decodeFloat32(b []byte) float32 { return math.Float32frombits(binary.BigEndian.Uint32(b)) } // ── TLS / MQTT setup ────────────────────────────────────────────────────────── func newTLSConfig(cfg Config) (*tls.Config, error) { caCert, err := os.ReadFile(cfg.CACert) if err != nil { return nil, fmt.Errorf("reading CA cert: %w", err) } pool := x509.NewCertPool() if !pool.AppendCertsFromPEM(caCert) { return nil, fmt.Errorf("parsing CA cert failed") } cert, err := tls.LoadX509KeyPair(cfg.ClientCert, cfg.ClientKey) if err != nil { return nil, fmt.Errorf("loading client cert/key: %w", err) } return &tls.Config{ RootCAs: pool, Certificates: []tls.Certificate{cert}, }, nil } var logger = log.New(os.Stderr, "", log.Ltime|log.Lshortfile) func main() { configPath := flag.String("c", "./config/config.json", "Path to config file") debugMode := flag.Bool("debug", false, "Poll only the first register of slave 0x01 every 5 s") flag.Parse() data, err := os.ReadFile(*configPath) if err != nil { log.Fatalf("reading config: %v", err) } var cfg Config if err := json.Unmarshal(data, &cfg); err != nil { log.Fatalf("parsing config: %v", err) } tlsCfg, err := newTLSConfig(cfg) if err != nil { log.Fatalf("TLS setup: %v", err) } // broker_ip overrides the TCP dial target (needed for IPv6 link-local with scope). // broker_tls_name sets the TLS ServerName so certificate verification still uses the hostname. if cfg.BrokerIP != "" { tlsCfg.ServerName = cfg.BrokerTLSName if tlsCfg.ServerName == "" { tlsCfg.ServerName = cfg.Broker } } h := newHandler(cfg) dialTarget := cfg.Broker if cfg.BrokerIP != "" { dialTarget = "[" + cfg.BrokerIP + "]" } brokerURL := fmt.Sprintf("ssl://%s:%d", dialTarget, cfg.Port) topic := cfg.TopicPrefix + "/#" opts := mqtt.NewClientOptions(). AddBroker(brokerURL). SetClientID(cfg.ClientID). SetTLSConfig(tlsCfg). SetCleanSession(true). SetAutoReconnect(true). SetOnConnectHandler(func(c mqtt.Client) { logger.Printf("connected to %s", brokerURL) token := c.Subscribe(topic, 0, h.Handle) token.Wait() if err := token.Error(); err != nil { logger.Printf("subscribe error: %v", err) } else { logger.Printf("subscribed to %s", topic) } }). SetConnectionLostHandler(func(_ mqtt.Client, err error) { logger.Printf("connection lost: %v", err) }) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { log.Fatalf("connect: %v", token.Error()) } influxURL := fmt.Sprintf("http://%s:%d", cfg.InfluxHost, cfg.InfluxPort) influxClient := influxdb2.NewClient(influxURL, cfg.InfluxToken) defer influxClient.Close() writeAPI := influxClient.WriteAPIBlocking(cfg.InfluxOrg, cfg.InfluxBucket) ctx, cancel := context.WithCancel(context.Background()) requester := NewRequester(client, cfg.TopicPrefix) pollDevices := cfg.Devices pollInterval := time.Duration(cfg.SampleRate) * time.Second if *debugMode { pollInterval = 5 * time.Second for _, d := range cfg.Devices { if d.SlaveAddress == 0x01 && len(d.Registers) > 0 { pollDevices = []DeviceConfig{{ SlaveAddress: d.SlaveAddress, Name: d.Name, Segment: d.Segment, Registers: d.Registers[:1], }} break } } logger.Printf("debug mode: polling %s/%s every %s", pollDevices[0].Name, pollDevices[0].Registers[0].Name, pollInterval) } poller := NewPoller(requester, h, writeAPI, cfg, pollDevices, pollInterval) go poller.Run(ctx) sig := make(chan os.Signal, 1) signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM) <-sig cancel() client.Disconnect(250) logger.Println("disconnected") }