136 lines
3.4 KiB
Go
136 lines
3.4 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"log"
|
|
"os"
|
|
"strconv"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
"github.com/influxdata/influxdb-client-go/v2/api"
|
|
)
|
|
|
|
type config struct {
|
|
InfluxdbHost string `json:"influxdb_host"`
|
|
InfluxdbPort int `json:"influxdb_port"`
|
|
InfluxdbToken string `json:"influxdb_token"`
|
|
MqttBroker string `json:"mqtt_broker"`
|
|
MqttPort int `json:"mqtt_port"`
|
|
}
|
|
|
|
type data struct {
|
|
Data struct {
|
|
Dry bool `json:"dry"`
|
|
Debug struct {
|
|
Value float64 `json:"value"`
|
|
Unit string `json:"unit"`
|
|
} `json:"debug"`
|
|
} `json:"data"`
|
|
}
|
|
|
|
var (
|
|
logger log.Logger = *log.Default()
|
|
config_path string
|
|
write_api api.WriteAPIBlocking
|
|
ctx context.Context
|
|
config_cache config
|
|
)
|
|
|
|
var message_pub_handler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
|
|
logger.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
|
|
}
|
|
|
|
var soil_handler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
|
|
logger.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
|
|
var dry_data data
|
|
err := json.Unmarshal(msg.Payload(), &dry_data)
|
|
if err != nil {
|
|
logger.Print(err)
|
|
return
|
|
}
|
|
point := influxdb2.NewPointWithMeasurement("garden")
|
|
point.AddTag("soil", "moisture")
|
|
point.AddField("value", dry_data.Data.Debug.Value)
|
|
point.AddField("unit", dry_data.Data.Debug.Unit)
|
|
point.SetTime(time.Now())
|
|
err = write_api.WritePoint(ctx, point)
|
|
if err != nil {
|
|
logger.Print(err.Error())
|
|
}
|
|
}
|
|
|
|
var connect_handler mqtt.OnConnectHandler = func(client mqtt.Client) {
|
|
logger.Println("Connected")
|
|
subscribe(client)
|
|
}
|
|
|
|
var connect_lost_handler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
|
|
logger.Printf("Connect lost: %v", err)
|
|
}
|
|
|
|
func init() {
|
|
logger.SetPrefix("soil2db: ")
|
|
}
|
|
|
|
func read_config() {
|
|
data, err := os.ReadFile(config_path)
|
|
if err != nil {
|
|
logger.Printf("Unable to read %s", config_path)
|
|
return
|
|
}
|
|
err = json.Unmarshal(data, &config_cache)
|
|
if err != nil {
|
|
logger.Print("Unable to evaluate config data")
|
|
return
|
|
}
|
|
}
|
|
|
|
func connect() mqtt.Client {
|
|
// MQTT connection
|
|
opts := mqtt.NewClientOptions()
|
|
broker := "tcp://" + config_cache.MqttBroker + ":" + strconv.Itoa(config_cache.MqttPort)
|
|
opts.AddBroker(broker)
|
|
opts.SetClientID("soil2db")
|
|
opts.SetDefaultPublishHandler(message_pub_handler)
|
|
opts.OnConnect = connect_handler
|
|
opts.OnConnectionLost = connect_lost_handler
|
|
client := mqtt.NewClient(opts)
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
logger.Panic(token.Error())
|
|
}
|
|
return client
|
|
}
|
|
|
|
func subscribe(client mqtt.Client) {
|
|
// MQTT subscribtion
|
|
topic := "garden/soil/moisture"
|
|
token := client.Subscribe(topic, 1, soil_handler)
|
|
token.Wait()
|
|
logger.Printf("Subscribed to topic %s", topic)
|
|
}
|
|
|
|
func main() {
|
|
flag.StringVar(&config_path, "c", "./config/config.json", "Specify path to find the config file. Default is ./config/config.json")
|
|
flag.Parse()
|
|
read_config()
|
|
|
|
mqtt_client := connect()
|
|
subscribe(mqtt_client)
|
|
|
|
influxdb_url := "http://" + config_cache.InfluxdbHost + ":" + strconv.Itoa(config_cache.InfluxdbPort)
|
|
client := influxdb2.NewClient(influxdb_url, config_cache.InfluxdbToken)
|
|
// always close client at the end
|
|
defer client.Close()
|
|
|
|
write_api = client.WriteAPIBlocking("tkl", "home")
|
|
ctx = context.Background()
|
|
|
|
for {
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|