190 lines
4.5 KiB
Go
190 lines
4.5 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"log"
|
|
"os"
|
|
"path/filepath"
|
|
"strconv"
|
|
"time"
|
|
|
|
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
|
|
"github.com/tormoder/fit"
|
|
)
|
|
|
|
type config struct {
|
|
InFolder string `json:"in_folder"`
|
|
OutFolder string `json:"out_folder"`
|
|
InfluxdbHost string `json:"influxdb_host"`
|
|
InfluxdbPort int `json:"influxdb_port"`
|
|
InfluxdbToken string `json:"influxdb_token"`
|
|
}
|
|
|
|
var (
|
|
logger log.Logger = *log.Default()
|
|
config_path string
|
|
config_cache = config{
|
|
InFolder: "",
|
|
OutFolder: "",
|
|
InfluxdbHost: "",
|
|
InfluxdbPort: 8086,
|
|
InfluxdbToken: "",
|
|
}
|
|
)
|
|
|
|
func read_config() {
|
|
data, err := os.ReadFile(config_path)
|
|
if err != nil {
|
|
logger.Printf("Unable to read %s", config_path)
|
|
return
|
|
}
|
|
err = json.Unmarshal(data, &config_cache)
|
|
if err != nil {
|
|
logger.Print("Unable to evaluate config data")
|
|
return
|
|
}
|
|
}
|
|
|
|
func init() {
|
|
logger.SetPrefix("FitDecoder: ")
|
|
logger.Println("Starting")
|
|
|
|
}
|
|
|
|
func move_file(source_path, dest_path string) error {
|
|
inputFile, err := os.Open(source_path)
|
|
if err != nil {
|
|
return fmt.Errorf("couldn't open source file: %s", err)
|
|
}
|
|
outputFile, err := os.Create(dest_path)
|
|
if err != nil {
|
|
inputFile.Close()
|
|
return fmt.Errorf("couldn't open dest file: %s", err)
|
|
}
|
|
defer outputFile.Close()
|
|
_, err = io.Copy(outputFile, inputFile)
|
|
inputFile.Close()
|
|
if err != nil {
|
|
return fmt.Errorf("writing to output file failed: %s", err)
|
|
}
|
|
// The copy was successful, so now delete the original file
|
|
err = os.Remove(source_path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed removing original file: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func fit_decode(path string) ([]fit.SessionMsg, error) {
|
|
var res []fit.SessionMsg
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
logger.Printf("cannot read file %s (%s).", path, err.Error())
|
|
return res, err
|
|
}
|
|
|
|
// Decode the FIT file data
|
|
fit, err := fit.Decode(bytes.NewReader(data))
|
|
if err != nil {
|
|
logger.Printf("cannot decode file %s (%s).", path, err.Error())
|
|
return res, err
|
|
}
|
|
// Get the actual activity
|
|
activity, err := fit.Activity()
|
|
if err != nil {
|
|
logger.Printf("cannot get activity from %s (%s).", path, err.Error())
|
|
return res, err
|
|
}
|
|
|
|
for _, session := range activity.Sessions {
|
|
if session.Sport == 2 {
|
|
res = append(res, *session)
|
|
}
|
|
}
|
|
return res, nil
|
|
}
|
|
|
|
func main() {
|
|
|
|
flag.StringVar(&config_path, "c", "./config/config.json", "Specify path to find the config file. Default is ./config/config.json")
|
|
flag.Parse()
|
|
read_config()
|
|
|
|
influxdb_url := "http://" + config_cache.InfluxdbHost + ":" + strconv.Itoa(config_cache.InfluxdbPort)
|
|
client := influxdb2.NewClient(influxdb_url, config_cache.InfluxdbToken)
|
|
// always close client at the end
|
|
defer client.Close()
|
|
|
|
// get blocking write client
|
|
writeAPI := client.WriteAPIBlocking("tkl", "home")
|
|
|
|
ctx := context.Background()
|
|
|
|
// create out folder
|
|
err := os.MkdirAll(config_cache.OutFolder, os.ModePerm)
|
|
if err != nil {
|
|
logger.Fatalf("cannot create directory %s (%s)", config_cache.OutFolder, err.Error())
|
|
}
|
|
|
|
for {
|
|
time.Sleep(time.Second)
|
|
|
|
// open in folder
|
|
dir, err := os.Open(config_cache.InFolder)
|
|
if err != nil {
|
|
logger.Fatalf("cannot open directory %s (%s)", config_cache.InFolder, err.Error())
|
|
}
|
|
|
|
var files []fs.FileInfo
|
|
files, err = dir.Readdir(0)
|
|
if err != nil {
|
|
logger.Printf("cannot read %s (%s).", config_cache.InFolder, err)
|
|
}
|
|
dir.Close()
|
|
for _, file := range files {
|
|
// Read our FIT test file data
|
|
path := filepath.Join(config_cache.InFolder, file.Name())
|
|
var sessions []fit.SessionMsg
|
|
sessions, err = fit_decode(path)
|
|
if err != nil {
|
|
log.Printf("cannot process %s (%s).", path, err.Error())
|
|
} else {
|
|
for _, session := range sessions {
|
|
ser, _ := json.Marshal(&session)
|
|
var data map[string]interface{}
|
|
json.Unmarshal(ser, &data)
|
|
|
|
point := influxdb2.NewPointWithMeasurement("activity")
|
|
point.AddTag("activity", "bicycle")
|
|
for k, v := range data {
|
|
point.AddField(k, v)
|
|
}
|
|
point.SetTime(session.Timestamp)
|
|
err = writeAPI.WritePoint(ctx, point)
|
|
if err != nil {
|
|
logger.Print(err)
|
|
continue
|
|
}
|
|
}
|
|
err = writeAPI.Flush(ctx)
|
|
if err != nil {
|
|
logger.Print(err)
|
|
continue
|
|
}
|
|
new_path := filepath.Join(config_cache.OutFolder, file.Name())
|
|
err = move_file(path, new_path)
|
|
if err != nil {
|
|
logger.Printf("cannot move file %s to %s (%s).", path, new_path, err.Error())
|
|
}
|
|
logger.Printf("%s processed.", path)
|
|
}
|
|
}
|
|
}
|
|
}
|