soil2db/main.go
2024-05-07 10:43:41 +02:00

136 lines
3.4 KiB
Go

package main
import (
"context"
"encoding/json"
"flag"
"log"
"os"
"strconv"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
)
type config struct {
InfluxdbHost string `json:"influxdb_host"`
InfluxdbPort int `json:"influxdb_port"`
InfluxdbToken string `json:"influxdb_token"`
MqttBroker string `json:"mqtt_broker"`
MqttPort int `json:"mqtt_port"`
}
type data struct {
Data struct {
Dry bool `json:"dry"`
Debug struct {
Value float64 `json:"value"`
Unit string `json:"unit"`
} `json:"debug"`
} `json:"data"`
}
var (
logger log.Logger = *log.Default()
config_path string
write_api api.WriteAPIBlocking
ctx context.Context
config_cache config
)
var message_pub_handler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var soil_handler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
logger.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
var dry_data data
err := json.Unmarshal(msg.Payload(), &dry_data)
if err != nil {
logger.Print(err)
return
}
point := influxdb2.NewPointWithMeasurement("garden")
point.AddTag("soil", "moisture")
point.AddField("value", dry_data.Data.Debug.Value)
point.AddField("unit", dry_data.Data.Debug.Unit)
point.SetTime(time.Now())
err = write_api.WritePoint(ctx, point)
if err != nil {
logger.Print(err.Error())
}
}
var connect_handler mqtt.OnConnectHandler = func(client mqtt.Client) {
logger.Println("Connected")
subscribe(client)
}
var connect_lost_handler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
logger.Printf("Connect lost: %v", err)
}
func init() {
logger.SetPrefix("soil2db: ")
}
func read_config() {
data, err := os.ReadFile(config_path)
if err != nil {
logger.Printf("Unable to read %s", config_path)
return
}
err = json.Unmarshal(data, &config_cache)
if err != nil {
logger.Print("Unable to evaluate config data")
return
}
}
func connect() mqtt.Client {
// MQTT connection
opts := mqtt.NewClientOptions()
broker := "tcp://" + config_cache.MqttBroker + ":" + strconv.Itoa(config_cache.MqttPort)
opts.AddBroker(broker)
opts.SetClientID("soil2db")
opts.SetDefaultPublishHandler(message_pub_handler)
opts.OnConnect = connect_handler
opts.OnConnectionLost = connect_lost_handler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
logger.Panic(token.Error())
}
return client
}
func subscribe(client mqtt.Client) {
// MQTT subscribtion
topic := "garden/soil/moisture"
token := client.Subscribe(topic, 1, soil_handler)
token.Wait()
logger.Printf("Subscribed to topic %s", topic)
}
func main() {
flag.StringVar(&config_path, "c", "./config/config.json", "Specify path to find the config file. Default is ./config/config.json")
flag.Parse()
read_config()
mqtt_client := connect()
subscribe(mqtt_client)
influxdb_url := "http://" + config_cache.InfluxdbHost + ":" + strconv.Itoa(config_cache.InfluxdbPort)
client := influxdb2.NewClient(influxdb_url, config_cache.InfluxdbToken)
// always close client at the end
defer client.Close()
write_api = client.WriteAPIBlocking("tkl", "home")
ctx = context.Background()
for {
time.Sleep(time.Second)
}
}