powercollect/main.go

182 lines
4.5 KiB
Go
Raw Permalink Normal View History

package main
import (
"context"
"encoding/json"
"flag"
"log"
"os"
"strconv"
"sync"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
)
type sdm630Register struct {
Address uint16
Unit string
}
type measure struct {
Value float32 `json:"value"`
Unit string `json:"unit"`
}
type sampleCache struct {
sample map[string]measure
mutex sync.Mutex
}
type config struct {
RtuAddress int `json:"rtu_address"`
SampleRate int `json:"sample_rate"`
ModbusHost string `json:"modbus_host"`
ModbusPort int `json:"modbus_port"`
InfluxdbHost string `json:"influxdb_host"`
InfluxdbPort int `json:"influxdb_port"`
InfluxdbToken string `json:"influxdb_token"`
}
var (
// SDM630 input registers
sdm_registers = map[string]sdm630Register{
"L1Voltage": {0x00, "V"},
"L2Voltage": {0x02, "V"},
"L3Voltage": {0x04, "V"},
"L1Current": {0x06, "A"},
"L2Current": {0x08, "A"},
"L3Current": {0x0A, "A"},
"L1PowerW": {0x0C, "W"},
"L2PowerW": {0x0E, "W"},
"L3PowerW": {0x10, "W"},
"L1PhaseAngle": {0x24, "°"},
"L2PhaseAngle": {0x26, "°"},
"L3PhaseAngle": {0x28, "°"},
"TotalImport": {0x48, "kWh"},
"TotalExport": {0x4A, "kWh"},
}
logger log.Logger = *log.Default()
last_total_import float32 = 0.0
last_total_export float32 = 0.0
config_path string
config_cache = config{
RtuAddress: 1, // modbus device slave address (0x01)
SampleRate: 1, // sec
ModbusHost: "", // hostname of the modbus tcp to rtu bridge
ModbusPort: 502, // port of the modbus tcp to rtu bridge
InfluxdbHost: "", // hostname of the influxdb server
InfluxdbPort: 8086, // port of the influxdb server
InfluxdbToken: "", // access token for the influx db
}
)
func read_config() {
data, err := os.ReadFile(config_path)
if err != nil {
logger.Printf("Unable to read %s", config_path)
return
}
err = json.Unmarshal(data, &config_cache)
if err != nil {
logger.Print("Unable to evaluate config data")
return
}
}
func init() {
logger.SetPrefix("PowerMeterSDM630: ")
logger.Println("Starting")
}
func sample_is_valid(sample map[string]measure) bool {
if len(sample) == 0 {
logger.Print("Sample invalid - contains no data")
return false
}
result := float32(0.0)
// sample is invalid when all values equals 0.0
for _, value := range sample {
result += value.Value
}
if result == 0.0 {
logger.Print("Sample invalid - all values equals 0.0")
return false
}
// total import and total export can never decrease
for key, value := range sample {
if key == "TotalImport" {
if value.Value < last_total_import {
logger.Printf("Sample invalid - Total import lower than previous one (%f vs. %f)", value.Value, last_total_import)
return false
}
last_total_import = value.Value
}
if key == "TotalExport" {
if value.Value < last_total_export {
logger.Printf("Sample invalid - Total export lower than previous one (%f vs. %f)", value.Value, last_total_export)
return false
}
last_total_export = value.Value
}
}
return true
}
func main() {
flag.StringVar(&config_path, "c", "./config/config.json", "Specify path to find the config file. Default is ./config/config.json")
flag.Parse()
read_config()
go keep_mbus_connection()
go collect_mbus_samples()
influxdb_url := "http://" + config_cache.InfluxdbHost + ":" + strconv.Itoa(config_cache.InfluxdbPort)
client := influxdb2.NewClient(influxdb_url, config_cache.InfluxdbToken)
// always close client at the end
defer client.Close()
// get non-blocking write client
writeAPI := client.WriteAPIBlocking("tkl", "home")
ctx := context.Background()
for {
time.Sleep(time.Duration(config_cache.SampleRate * 1000000000))
sample_cache.mutex.Lock()
sample := sample_cache.sample
sample_cache.mutex.Unlock()
if sample_is_valid(sample) {
point := influxdb2.NewPointWithMeasurement("power")
point.AddTag("sensor", "powermeter")
for key, value := range sample {
point.AddField(key, value.Value)
point.AddField("unit", value.Unit)
}
// calculate TotalPowerW
total_power := sample["L1PowerW"].Value + sample["L2PowerW"].Value + sample["L3PowerW"].Value
point.AddField("TotalPowerW", total_power)
point.AddField("unit", "W")
point.SetTime(time.Now())
err := writeAPI.WritePoint(ctx, point)
if err != nil {
logger.Print(err.Error())
continue
}
err = writeAPI.Flush(ctx)
if err != nil {
logger.Print(err.Error())
continue
}
}
}
}