activitycollect/main.go
2022-11-11 14:00:54 +01:00

162 lines
3.8 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"io/fs"
"log"
"os"
"path/filepath"
"strconv"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/tormoder/fit"
)
type config struct {
InFolder string `json:"in_folder"`
OutFolder string `json:"out_folder"`
InfluxdbHost string `json:"influxdb_host"`
InfluxdbPort int `json:"influxdb_port"`
InfluxdbToken string `json:"influxdb_token"`
}
var (
logger log.Logger = *log.Default()
config_path string
config_cache = config{
InFolder: "",
OutFolder: "",
InfluxdbHost: "",
InfluxdbPort: 8086,
InfluxdbToken: "",
}
)
func read_config() {
data, err := os.ReadFile(config_path)
if err != nil {
logger.Printf("Unable to read %s", config_path)
return
}
err = json.Unmarshal(data, &config_cache)
if err != nil {
logger.Print("Unable to evaluate config data")
return
}
}
func init() {
logger.SetPrefix("FitDecoder: ")
logger.Println("Starting")
}
func fit_decode(path string) ([]fit.SessionMsg, error) {
var res []fit.SessionMsg
data, err := os.ReadFile(path)
if err != nil {
logger.Printf("Cannot read file %s (%s).", path, err.Error())
return res, err
}
// Decode the FIT file data
fit, err := fit.Decode(bytes.NewReader(data))
if err != nil {
logger.Printf("Cannot decode file %s (%s).", path, err.Error())
return res, err
}
// Get the actual activity
activity, err := fit.Activity()
if err != nil {
logger.Printf("Cannot get activity from %s (%s).", path, err.Error())
return res, err
}
for _, session := range activity.Sessions {
if session.Sport == 2 {
res = append(res, *session)
}
}
return res, nil
}
func main() {
logger.Print(os.Getwd())
flag.StringVar(&config_path, "c", "./config/config.json", "Specify path to find the config file. Default is ./config/config.json")
flag.Parse()
read_config()
influxdb_url := "http://" + config_cache.InfluxdbHost + ":" + strconv.Itoa(config_cache.InfluxdbPort)
client := influxdb2.NewClient(influxdb_url, config_cache.InfluxdbToken)
// always close client at the end
defer client.Close()
// get blocking write client
writeAPI := client.WriteAPIBlocking("tkl", "home")
ctx := context.Background()
// open in folder
dir, err := os.Open(config_cache.InFolder)
if err != nil {
logger.Fatalf("Cannot open directory %s (%s)", config_cache.InFolder, err.Error())
}
// create out folder
err = os.MkdirAll(config_cache.OutFolder, os.ModePerm)
if err != nil {
logger.Fatalf("Cannot create directory %s (%s)", config_cache.OutFolder, err.Error())
}
for {
var files []fs.FileInfo
files, err = dir.Readdir(0)
if err != nil {
logger.Printf("Cannot read %s (%s).", config_cache.InFolder, err)
}
for _, file := range files {
// Read our FIT test file data
path := filepath.Join(config_cache.InFolder, file.Name())
var sessions []fit.SessionMsg
sessions, err = fit_decode(path)
if err != nil {
log.Printf("Cannot process %s (%s).", path, err.Error())
} else {
for _, session := range sessions {
ser, _ := json.Marshal(&session)
var data map[string]interface{}
json.Unmarshal(ser, &data)
point := influxdb2.NewPointWithMeasurement("activity")
point.AddTag("activity", "bicycle")
for k, v := range data {
point.AddField(k, v)
log.Printf("key: %v, value: %v", k, v)
}
point.SetTime(session.Timestamp)
err = writeAPI.WritePoint(ctx, point)
if err != nil {
logger.Print(err)
}
}
err = writeAPI.Flush(ctx)
if err != nil {
logger.Print(err)
}
new_path := filepath.Join(config_cache.OutFolder, file.Name())
err = os.Rename(path, new_path)
if err != nil {
logger.Printf("Cannot move file %s to %s (%s).", path, new_path, err.Error())
}
}
}
time.Sleep(time.Second)
}
}