package main import ( "context" "encoding/json" "flag" "log" "os" "strconv" "time" mqtt "github.com/eclipse/paho.mqtt.golang" influxdb2 "github.com/influxdata/influxdb-client-go/v2" "github.com/influxdata/influxdb-client-go/v2/api" ) type config struct { InfluxdbHost string `json:"influxdb_host"` InfluxdbPort int `json:"influxdb_port"` InfluxdbToken string `json:"influxdb_token"` MqttBroker string `json:"mqtt_broker"` MqttPort int `json:"mqtt_port"` } type data struct { Data struct { Dry bool `json:"dry"` Debug struct { Value float64 `json:"value"` Unit string `json:"unit"` } `json:"debug"` } `json:"data"` } var ( logger log.Logger = *log.Default() config_path string write_api api.WriteAPIBlocking ctx context.Context config_cache config ) var message_pub_handler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { logger.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) } var soil_handler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { logger.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) var dry_data data err := json.Unmarshal(msg.Payload(), &dry_data) if err != nil { logger.Print(err) return } point := influxdb2.NewPointWithMeasurement("garden") point.AddTag("soil", "moisture") point.AddField("value", dry_data.Data.Debug.Value) point.AddField("unit", dry_data.Data.Debug.Unit) point.SetTime(time.Now()) err = write_api.WritePoint(ctx, point) if err != nil { logger.Print(err.Error()) } } var connect_handler mqtt.OnConnectHandler = func(client mqtt.Client) { logger.Println("Connected") subscribe(client) } var connect_lost_handler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { logger.Printf("Connect lost: %v", err) } func init() { logger.SetPrefix("soil2db: ") } func read_config() { data, err := os.ReadFile(config_path) if err != nil { logger.Printf("Unable to read %s", config_path) return } err = json.Unmarshal(data, &config_cache) if err != nil { logger.Print("Unable to evaluate config data") return } } func connect() mqtt.Client { // MQTT connection opts := mqtt.NewClientOptions() broker := "tcp://" + config_cache.MqttBroker + ":" + strconv.Itoa(config_cache.MqttPort) opts.AddBroker(broker) opts.SetClientID("soil2db") opts.SetDefaultPublishHandler(message_pub_handler) opts.OnConnect = connect_handler opts.OnConnectionLost = connect_lost_handler client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { logger.Panic(token.Error()) } return client } func subscribe(client mqtt.Client) { // MQTT subscribtion topic := "garden/soil/moisture" token := client.Subscribe(topic, 1, soil_handler) token.Wait() logger.Printf("Subscribed to topic %s", topic) } func main() { flag.StringVar(&config_path, "c", "./config/config.json", "Specify path to find the config file. Default is ./config/config.json") flag.Parse() read_config() mqtt_client := connect() subscribe(mqtt_client) influxdb_url := "http://" + config_cache.InfluxdbHost + ":" + strconv.Itoa(config_cache.InfluxdbPort) client := influxdb2.NewClient(influxdb_url, config_cache.InfluxdbToken) // always close client at the end defer client.Close() write_api = client.WriteAPIBlocking("tkl", "home") ctx = context.Background() for { time.Sleep(time.Second) } }