Companion device for perinode-modbusbridge
Signed-off-by: Thomas Klaehn <thomas.klaehn@perinet.io>
This commit is contained in:
		
							
								
								
									
										2
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.vscode/launch.json
									
									
									
									
										vendored
									
									
								
							@@ -10,7 +10,7 @@
 | 
			
		||||
            "request": "launch",
 | 
			
		||||
            "mode": "auto",
 | 
			
		||||
            "program": "${fileDirname}",
 | 
			
		||||
            "args": ["-c", "/etc/powercollect/config.json"]
 | 
			
		||||
            // "args": ["-c", "/etc/powercollect/config.json"]
 | 
			
		||||
        }
 | 
			
		||||
    ]
 | 
			
		||||
}
 | 
			
		||||
@@ -1,7 +1,7 @@
 | 
			
		||||
{
 | 
			
		||||
    "rtu_address": 1,
 | 
			
		||||
    "sample_rate": 10,
 | 
			
		||||
    "modbus_host": "wels.local",
 | 
			
		||||
    "modbus_host": "periCORE-i7dv5.local",
 | 
			
		||||
    "modbus_port": 502,
 | 
			
		||||
    "influxdb_host": "barsch.local",
 | 
			
		||||
    "influxdb_port": 8086,
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										15
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										15
									
								
								go.mod
									
									
									
									
									
								
							@@ -2,17 +2,6 @@ module powercollect
 | 
			
		||||
 | 
			
		||||
go 1.19
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	actshad.dev/modbus v0.2.0
 | 
			
		||||
	github.com/influxdata/influxdb-client-go/v2 v2.12.0
 | 
			
		||||
)
 | 
			
		||||
require actshad.dev/modbus v0.2.0
 | 
			
		||||
 | 
			
		||||
require (
 | 
			
		||||
	github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
 | 
			
		||||
	github.com/deepmap/oapi-codegen v1.12.2 // indirect
 | 
			
		||||
	github.com/goburrow/serial v0.1.0 // indirect
 | 
			
		||||
	github.com/google/uuid v1.3.0 // indirect
 | 
			
		||||
	github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf // indirect
 | 
			
		||||
	github.com/pkg/errors v0.9.1 // indirect
 | 
			
		||||
	golang.org/x/net v0.1.0 // indirect
 | 
			
		||||
)
 | 
			
		||||
require github.com/goburrow/serial v0.1.0 // indirect
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										26
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										26
									
								
								go.sum
									
									
									
									
									
								
							@@ -1,30 +1,4 @@
 | 
			
		||||
actshad.dev/modbus v0.2.0 h1:saCi1iAZOHlmhDiFAY2a8fjouS63mWQVoVVhitEkFFQ=
 | 
			
		||||
actshad.dev/modbus v0.2.0/go.mod h1:8IHuLxLYjoX4NomWdAJkEOsAVJnncMAoppE7wRlIYME=
 | 
			
		||||
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
 | 
			
		||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
 | 
			
		||||
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
 | 
			
		||||
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
 | 
			
		||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 | 
			
		||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 | 
			
		||||
github.com/deepmap/oapi-codegen v1.12.2 h1:F7SMEn0UMpJV6kWwDYqfDmnnOYHIcU7ETV8qTVFdyI0=
 | 
			
		||||
github.com/deepmap/oapi-codegen v1.12.2/go.mod h1:ao2aFwsl/muMHbez870+KelJ1yusV01RznwAFFrVjDc=
 | 
			
		||||
github.com/goburrow/serial v0.1.0 h1:v2T1SQa/dlUqQiYIT8+Cu7YolfqAi3K96UmhwYyuSrA=
 | 
			
		||||
github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA=
 | 
			
		||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
 | 
			
		||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 | 
			
		||||
github.com/influxdata/influxdb-client-go/v2 v2.12.0 h1:LGct9uIp36IT+8RAJdmJGQbNonGi26YfYYSpDIyq8fI=
 | 
			
		||||
github.com/influxdata/influxdb-client-go/v2 v2.12.0/go.mod h1:YteV91FiQxRdccyJ2cHvj2f/5sq4y4Njqu1fQzsQCOU=
 | 
			
		||||
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf h1:7JTmneyiNEwVBOHSjoMxiWAqB992atOeepeFYegn5RU=
 | 
			
		||||
github.com/influxdata/line-protocol v0.0.0-20210922203350-b1ad95c89adf/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
 | 
			
		||||
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
 | 
			
		||||
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
 | 
			
		||||
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 | 
			
		||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 | 
			
		||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 | 
			
		||||
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
 | 
			
		||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 | 
			
		||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 | 
			
		||||
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
 | 
			
		||||
golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=
 | 
			
		||||
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
 | 
			
		||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										118
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										118
									
								
								main.go
									
									
									
									
									
								
							@@ -1,16 +1,15 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"flag"
 | 
			
		||||
	"log"
 | 
			
		||||
	"math"
 | 
			
		||||
	"os"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	influxdb2 "github.com/influxdata/influxdb-client-go/v2"
 | 
			
		||||
	"actshad.dev/modbus"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type sdm630Register struct {
 | 
			
		||||
@@ -18,16 +17,6 @@ type sdm630Register struct {
 | 
			
		||||
	Unit    string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type measure struct {
 | 
			
		||||
	Value float32 `json:"value"`
 | 
			
		||||
	Unit  string  `json:"unit"`
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type sampleCache struct {
 | 
			
		||||
	sample map[string]measure
 | 
			
		||||
	mutex  sync.Mutex
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type config struct {
 | 
			
		||||
	RtuAddress    int    `json:"rtu_address"`
 | 
			
		||||
	SampleRate    int    `json:"sample_rate"`
 | 
			
		||||
@@ -57,11 +46,9 @@ var (
 | 
			
		||||
		"TotalExport":  {0x4A, "kWh"},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	logger            log.Logger = *log.Default()
 | 
			
		||||
	last_total_import float32    = 0.0
 | 
			
		||||
	last_total_export float32    = 0.0
 | 
			
		||||
	config_path       string
 | 
			
		||||
	config_cache      = config{
 | 
			
		||||
	logger       log.Logger = *log.Default()
 | 
			
		||||
	config_path  string
 | 
			
		||||
	config_cache = config{
 | 
			
		||||
		RtuAddress:    1,    // modbus device slave address (0x01)
 | 
			
		||||
		SampleRate:    1,    // sec
 | 
			
		||||
		ModbusHost:    "",   // hostname of the modbus tcp to rtu bridge
 | 
			
		||||
@@ -86,45 +73,8 @@ func read_config() {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func init() {
 | 
			
		||||
	logger.SetPrefix("PowerMeterSDM630: ")
 | 
			
		||||
	logger.SetFlags(log.Llongfile | log.Ltime)
 | 
			
		||||
	logger.Println("Starting")
 | 
			
		||||
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func sample_is_valid(sample map[string]measure) bool {
 | 
			
		||||
	if len(sample) == 0 {
 | 
			
		||||
		logger.Print("Sample invalid - contains no data")
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	result := float32(0.0)
 | 
			
		||||
	// sample is invalid when all values equals 0.0
 | 
			
		||||
	for _, value := range sample {
 | 
			
		||||
		result += value.Value
 | 
			
		||||
	}
 | 
			
		||||
	if result == 0.0 {
 | 
			
		||||
		logger.Print("Sample invalid - all values equals 0.0")
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// total import and total export can never decrease
 | 
			
		||||
	for key, value := range sample {
 | 
			
		||||
		if key == "TotalImport" {
 | 
			
		||||
			if value.Value < last_total_import {
 | 
			
		||||
				logger.Printf("Sample invalid - Total import lower than previous one (%f vs. %f)", value.Value, last_total_import)
 | 
			
		||||
				return false
 | 
			
		||||
			}
 | 
			
		||||
			last_total_import = value.Value
 | 
			
		||||
		}
 | 
			
		||||
		if key == "TotalExport" {
 | 
			
		||||
			if value.Value < last_total_export {
 | 
			
		||||
				logger.Printf("Sample invalid - Total export lower than previous one (%f vs. %f)", value.Value, last_total_export)
 | 
			
		||||
				return false
 | 
			
		||||
			}
 | 
			
		||||
			last_total_export = value.Value
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
@@ -132,50 +82,30 @@ func main() {
 | 
			
		||||
	flag.Parse()
 | 
			
		||||
	read_config()
 | 
			
		||||
 | 
			
		||||
	go keep_mbus_connection()
 | 
			
		||||
	go collect_mbus_samples()
 | 
			
		||||
	tmp := config_cache.ModbusHost + ":" + strconv.Itoa(config_cache.ModbusPort)
 | 
			
		||||
	handler := modbus.NewTCPClientHandler(tmp)
 | 
			
		||||
	// handler.Timeout = 500 * time.Millisecond
 | 
			
		||||
	handler.Timeout = 500 * time.Second
 | 
			
		||||
	handler.SlaveId = byte(config_cache.RtuAddress)
 | 
			
		||||
 | 
			
		||||
	influxdb_url := "http://" + config_cache.InfluxdbHost + ":" + strconv.Itoa(config_cache.InfluxdbPort)
 | 
			
		||||
	client := influxdb2.NewClient(influxdb_url, config_cache.InfluxdbToken)
 | 
			
		||||
	// always close client at the end
 | 
			
		||||
	defer client.Close()
 | 
			
		||||
 | 
			
		||||
	// get non-blocking write client
 | 
			
		||||
	writeAPI := client.WriteAPIBlocking("tkl", "home")
 | 
			
		||||
 | 
			
		||||
	ctx := context.Background()
 | 
			
		||||
	err := handler.Connect()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.Panic(err)
 | 
			
		||||
	}
 | 
			
		||||
	defer handler.Close()
 | 
			
		||||
	modbus_client := modbus.NewClient(handler)
 | 
			
		||||
 | 
			
		||||
	for {
 | 
			
		||||
		time.Sleep(time.Duration(config_cache.SampleRate * 1000000000))
 | 
			
		||||
 | 
			
		||||
		sample_cache.mutex.Lock()
 | 
			
		||||
		sample := sample_cache.sample
 | 
			
		||||
		sample_cache.mutex.Unlock()
 | 
			
		||||
 | 
			
		||||
		if sample_is_valid(sample) {
 | 
			
		||||
			point := influxdb2.NewPointWithMeasurement("power")
 | 
			
		||||
			point.AddTag("sensor", "powermeter")
 | 
			
		||||
			for key, value := range sample {
 | 
			
		||||
				point.AddField(key, value.Value)
 | 
			
		||||
				point.AddField("unit", value.Unit)
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// calculate TotalPowerW
 | 
			
		||||
			total_power := sample["L1PowerW"].Value + sample["L2PowerW"].Value + sample["L3PowerW"].Value
 | 
			
		||||
			point.AddField("TotalPowerW", total_power)
 | 
			
		||||
			point.AddField("unit", "W")
 | 
			
		||||
 | 
			
		||||
			point.SetTime(time.Now())
 | 
			
		||||
			err := writeAPI.WritePoint(ctx, point)
 | 
			
		||||
		for key, register := range sdm_registers {
 | 
			
		||||
			res, err := modbus_client.ReadInputRegisters(register.Address, 2)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				logger.Print(err.Error())
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			err = writeAPI.Flush(ctx)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				logger.Print(err.Error())
 | 
			
		||||
				logger.Printf("Could not read from %s (%s)", key, err)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			var ieee754 uint32 = (uint32(res[0])<<24 + (uint32(res[1]) << 16) + (uint32(res[2]) << 8) + uint32(res[3]))
 | 
			
		||||
			result := math.Float32frombits(ieee754)
 | 
			
		||||
			logger.Printf("%s: %f %s\n", key, result, register.Unit)
 | 
			
		||||
		}
 | 
			
		||||
		time.Sleep(time.Second)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										91
									
								
								modbus.go
									
									
									
									
									
								
							
							
						
						
									
										91
									
								
								modbus.go
									
									
									
									
									
								
							@@ -1,91 +0,0 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"math"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"actshad.dev/modbus"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var (
 | 
			
		||||
	modbus_client modbus.Client
 | 
			
		||||
	sample_cache  sampleCache
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func modbus_is_connected() bool {
 | 
			
		||||
	return modbus_client != nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Read modbus register and convert result into float
 | 
			
		||||
func modbus_read_float_register(register_address uint16) (float32, error) {
 | 
			
		||||
	var result float32 = 0.0
 | 
			
		||||
	if modbus_client != nil {
 | 
			
		||||
		res, err := modbus_client.ReadInputRegisters(register_address, 2)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			logger.Print("Modbus connection lost")
 | 
			
		||||
			modbus_client = nil
 | 
			
		||||
			return 0.0, err
 | 
			
		||||
		}
 | 
			
		||||
		var ieee754 uint32 = (uint32(res[0])<<24 + (uint32(res[1]) << 16) + (uint32(res[2]) << 8) + uint32(res[3]))
 | 
			
		||||
		result = math.Float32frombits(ieee754)
 | 
			
		||||
	}
 | 
			
		||||
	return result, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Connect to modbus tcp bridge
 | 
			
		||||
func modbus_connect(host string, port uint, slave_id byte) error {
 | 
			
		||||
	connection := host + ":" + strconv.FormatUint(uint64(port), 10)
 | 
			
		||||
	handler := modbus.NewTCPClientHandler(connection)
 | 
			
		||||
	handler.Timeout = 10 * time.Second
 | 
			
		||||
	handler.SlaveId = slave_id
 | 
			
		||||
 | 
			
		||||
	err := handler.Connect()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		logger.Print(err)
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	logger.Print("Modbus connected")
 | 
			
		||||
 | 
			
		||||
	defer handler.Close()
 | 
			
		||||
	modbus_client = modbus.NewClient(handler)
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// connect to modbus tcp bridge host. If connection is lost try to re-connect.
 | 
			
		||||
func keep_mbus_connection() {
 | 
			
		||||
 | 
			
		||||
	for { // ever
 | 
			
		||||
		time.Sleep(time.Second)
 | 
			
		||||
		if !modbus_is_connected() {
 | 
			
		||||
			modbus_connect(config_cache.ModbusHost,
 | 
			
		||||
				uint(config_cache.ModbusPort),
 | 
			
		||||
				byte(config_cache.RtuAddress),
 | 
			
		||||
			)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Collect data of SDM-630 power meter
 | 
			
		||||
func collect_mbus_samples() {
 | 
			
		||||
	for { // ever
 | 
			
		||||
		if modbus_is_connected() {
 | 
			
		||||
			sample := make(map[string]measure)
 | 
			
		||||
			for key, register := range sdm_registers {
 | 
			
		||||
				res, err := modbus_read_float_register(register.Address)
 | 
			
		||||
				if err != nil {
 | 
			
		||||
					logger.Printf("Could not read from %s (%s)", key, err)
 | 
			
		||||
					continue
 | 
			
		||||
				}
 | 
			
		||||
				var mes measure
 | 
			
		||||
				mes.Value = res
 | 
			
		||||
				mes.Unit = register.Unit
 | 
			
		||||
				sample[key] = mes
 | 
			
		||||
			}
 | 
			
		||||
			sample_cache.mutex.Lock()
 | 
			
		||||
			sample_cache.sample = sample
 | 
			
		||||
			sample_cache.mutex.Unlock()
 | 
			
		||||
		}
 | 
			
		||||
		time.Sleep(time.Duration(config_cache.SampleRate * 1000000000))
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user