package main import ( "context" "encoding/json" "flag" "log" "os" "strconv" "strings" "time" "github.com/simonvetter/modbus" influxdb2 "github.com/influxdata/influxdb-client-go/v2" ) type mod_bus struct { Protocol string `json:"protocol"` BaudRate int `json:"baud_rate"` SlaveAddress int `json:"slave_address"` Host string `json:"host"` Port int `json:"port"` } type register struct { Name string `json:"name"` Type string `json:"type"` Address int `json:"address"` Quantity int `json:"quantity"` Factor float32 `json:"factor"` Unit string `json:"unit"` } type device struct { Name string `json:"name"` Type string `json:"type"` Modbus mod_bus `json:"modbus"` Registers []register `json:"registers"` } type config struct { SampleRate int `json:"sample_rate"` InfluxdbHost string `json:"influxdb_host"` InfluxdbPort int `json:"influxdb_port"` InfluxdbToken string `json:"influxdb_token"` Devices []device `json:"devices"` } var ( logger log.Logger = *log.Default() config_path string config_cache config ) func init() { logger.SetFlags(log.Llongfile | log.Ltime) } func read_config() { data, err := os.ReadFile(config_path) if err != nil { logger.Printf("Unable to read %s", config_path) return } err = json.Unmarshal(data, &config_cache) if err != nil { logger.Print("Unable to evaluate config data") return } } func main() { flag.StringVar(&config_path, "c", "../config/config.json", "Specify path to find the config file. Default is ../config/config.json") flag.Parse() read_config() type remote struct { client *modbus.ModbusClient registers []register name string } var err error var remotes []remote var client *modbus.ModbusClient // setup influx connection influxdb_url := "http://" + config_cache.InfluxdbHost + ":" + strconv.Itoa(config_cache.InfluxdbPort) db_client := influxdb2.NewClient(influxdb_url, config_cache.InfluxdbToken) defer db_client.Close() ctx := context.Background() write_api := db_client.WriteAPIBlocking("tkl", "home") for _, v := range config_cache.Devices { if v.Modbus.Protocol == "tcp" { url := v.Modbus.Protocol + "://" + v.Modbus.Host + ":" + strconv.Itoa(v.Modbus.Port) client, err = modbus.NewClient(&modbus.ClientConfiguration{URL: url}) if err != nil { logger.Printf("failed to create modbus client: %v\n", err) continue } client.SetUnitId(0x55) err = client.Open() if err != nil { logger.Printf("failed to connect: %v\n", err) continue } var tmp remote tmp.client = client tmp.registers = v.Registers tmp.name = v.Name remotes = append(remotes, tmp) } } for { // ever for _, remote := range remotes { point := influxdb2.NewPointWithMeasurement(remote.name) for _, reg := range remote.registers { res, err := remote.client.ReadRegisters(uint16(reg.Address), uint16(reg.Quantity), modbus.HOLDING_REGISTER) if err != nil { logger.Printf("failed to read: %v\n", err) error_strings := []string{ "connection reset by peer", "broken pipe", } for _, error_string := range error_strings { if strings.Contains(err.Error(), error_string) { logger.Print("trying to restart modbus client") remote.client.Close() err = remote.client.Open() if err != nil { logger.Printf("failed to restart: %v\n", err) os.Exit(1) } } } continue } var tmp int32 switch reg.Quantity { case 1: tmp = int32(res[0]) case 2: tmp = ((int32(res[0]) << 16) | int32(res[1])) default: logger.Printf("Unsupported quantity of registers (%d)", reg.Quantity) continue } result := float32(tmp) * reg.Factor point.AddField(reg.Name, result) } point.SetTime(time.Now()) err := write_api.WritePoint(ctx, point) if err != nil { logger.Print(err) } err = write_api.Flush(ctx) if err != nil { logger.Print(err) } } time.Sleep(time.Duration(config_cache.SampleRate * 1000000000)) } }