166 lines
4.1 KiB
Go
166 lines
4.1 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/simonvetter/modbus"
|
|
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
)
|
|
|
|
type mod_bus struct {
|
|
Protocol string `json:"protocol"`
|
|
BaudRate int `json:"baud_rate"`
|
|
SlaveAddress int `json:"slave_address"`
|
|
Host string `json:"host"`
|
|
Port int `json:"port"`
|
|
}
|
|
|
|
type register struct {
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
Address int `json:"address"`
|
|
Quantity int `json:"quantity"`
|
|
Factor float32 `json:"factor"`
|
|
Unit string `json:"unit"`
|
|
}
|
|
|
|
type device struct {
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
Modbus mod_bus `json:"modbus"`
|
|
Registers []register `json:"registers"`
|
|
}
|
|
type config struct {
|
|
SampleRate int `json:"sample_rate"`
|
|
InfluxdbHost string `json:"influxdb_host"`
|
|
InfluxdbPort int `json:"influxdb_port"`
|
|
InfluxdbToken string `json:"influxdb_token"`
|
|
Devices []device `json:"devices"`
|
|
}
|
|
|
|
var (
|
|
logger log.Logger = *log.Default()
|
|
config_path string
|
|
config_cache config
|
|
)
|
|
|
|
func init() {
|
|
logger.SetFlags(log.Llongfile | log.Ltime)
|
|
}
|
|
|
|
func read_config() {
|
|
data, err := os.ReadFile(config_path)
|
|
if err != nil {
|
|
logger.Printf("Unable to read %s", config_path)
|
|
return
|
|
}
|
|
err = json.Unmarshal(data, &config_cache)
|
|
if err != nil {
|
|
logger.Print("Unable to evaluate config data")
|
|
return
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
flag.StringVar(&config_path, "c", "../config/config.json", "Specify path to find the config file. Default is ../config/config.json")
|
|
flag.Parse()
|
|
read_config()
|
|
|
|
type remote struct {
|
|
client *modbus.ModbusClient
|
|
registers []register
|
|
name string
|
|
}
|
|
var err error
|
|
var remotes []remote
|
|
var client *modbus.ModbusClient
|
|
|
|
// setup influx connection
|
|
influxdb_url := "http://" + config_cache.InfluxdbHost + ":" + strconv.Itoa(config_cache.InfluxdbPort)
|
|
db_client := influxdb2.NewClient(influxdb_url, config_cache.InfluxdbToken)
|
|
defer db_client.Close()
|
|
ctx := context.Background()
|
|
write_api := db_client.WriteAPIBlocking("tkl", "home")
|
|
|
|
for _, v := range config_cache.Devices {
|
|
if v.Modbus.Protocol == "tcp" {
|
|
url := v.Modbus.Protocol + "://" + v.Modbus.Host + ":" + strconv.Itoa(v.Modbus.Port)
|
|
client, err = modbus.NewClient(&modbus.ClientConfiguration{URL: url})
|
|
if err != nil {
|
|
logger.Printf("failed to create modbus client: %v\n", err)
|
|
continue
|
|
}
|
|
client.SetUnitId(0x55)
|
|
err = client.Open()
|
|
if err != nil {
|
|
logger.Printf("failed to connect: %v\n", err)
|
|
continue
|
|
}
|
|
|
|
var tmp remote
|
|
tmp.client = client
|
|
tmp.registers = v.Registers
|
|
tmp.name = v.Name
|
|
remotes = append(remotes, tmp)
|
|
}
|
|
}
|
|
|
|
for { // ever
|
|
for _, remote := range remotes {
|
|
point := influxdb2.NewPointWithMeasurement(remote.name)
|
|
for _, reg := range remote.registers {
|
|
res, err := remote.client.ReadRegisters(uint16(reg.Address), uint16(reg.Quantity), modbus.HOLDING_REGISTER)
|
|
if err != nil {
|
|
logger.Printf("failed to read: %v\n", err)
|
|
error_strings := []string{
|
|
"connection reset by peer",
|
|
"broken pipe",
|
|
}
|
|
for _, error_string := range error_strings {
|
|
if strings.Contains(err.Error(), error_string) {
|
|
logger.Print("trying to restart modbus client")
|
|
remote.client.Close()
|
|
err = remote.client.Open()
|
|
if err != nil {
|
|
logger.Printf("failed to restart: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
var tmp int32
|
|
switch reg.Quantity {
|
|
case 1:
|
|
tmp = int32(res[0])
|
|
case 2:
|
|
tmp = ((int32(res[0]) << 16) | int32(res[1]))
|
|
default:
|
|
logger.Printf("Unsupported quantity of registers (%d)", reg.Quantity)
|
|
continue
|
|
}
|
|
result := float32(tmp) * reg.Factor
|
|
point.AddField(reg.Name, result)
|
|
}
|
|
point.SetTime(time.Now())
|
|
err := write_api.WritePoint(ctx, point)
|
|
if err != nil {
|
|
logger.Print(err)
|
|
}
|
|
err = write_api.Flush(ctx)
|
|
if err != nil {
|
|
logger.Print(err)
|
|
}
|
|
}
|
|
time.Sleep(time.Duration(config_cache.SampleRate * 1000000000))
|
|
}
|
|
}
|