Initial commit

Signed-off-by: Thomas Klaehn <thomas.klaehn@perinet.io>
This commit is contained in:
Thomas Klaehn
2026-02-10 11:47:49 +01:00
commit f496eebe2a
17 changed files with 1835 additions and 0 deletions

90
internal/client/auth.go Normal file
View File

@@ -0,0 +1,90 @@
package client
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"time"
"golang.org/x/oauth2"
)
// TokenCache handles token persistence
type TokenCache struct {
filePath string
}
// NewTokenCache creates a new token cache
func NewTokenCache(filePath string) *TokenCache {
return &TokenCache{filePath: filePath}
}
// Load loads a token from cache
func (tc *TokenCache) Load() (*oauth2.Token, error) {
// Check if file exists
if _, err := os.Stat(tc.filePath); os.IsNotExist(err) {
return nil, nil // No cached token
}
// Read file
data, err := os.ReadFile(tc.filePath)
if err != nil {
return nil, fmt.Errorf("failed to read token cache: %w", err)
}
// Unmarshal token
var token oauth2.Token
if err := json.Unmarshal(data, &token); err != nil {
return nil, fmt.Errorf("failed to unmarshal token: %w", err)
}
// Check if token is expired
if token.Expiry.Before(time.Now()) {
return nil, nil // Token expired
}
return &token, nil
}
// Save saves a token to cache
func (tc *TokenCache) Save(token *oauth2.Token) error {
// Create directory if it doesn't exist
dir := filepath.Dir(tc.filePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create token cache directory: %w", err)
}
// Marshal token
data, err := json.MarshalIndent(token, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal token: %w", err)
}
// Write to file with restricted permissions
if err := os.WriteFile(tc.filePath, data, 0600); err != nil {
return fmt.Errorf("failed to write token cache: %w", err)
}
return nil
}
// Authenticate authenticates with Zwift using username and password
func Authenticate(ctx context.Context, username, password string, tokenCache *TokenCache) (*oauth2.Token, error) {
// Try to load cached token first
if tokenCache != nil {
token, err := tokenCache.Load()
if err == nil && token != nil && token.Valid() {
return token, nil
}
}
// Authenticate with username/password
// Note: The gravl library handles the actual OAuth2 flow
// We'll use the credentials directly with the client
// For now, we'll return nil as the gravl client handles authentication
// This function serves as a placeholder for explicit token management
return nil, fmt.Errorf("authentication handled by gravl client")
}

125
internal/client/client.go Normal file
View File

@@ -0,0 +1,125 @@
package client
import (
"context"
"fmt"
"io"
"net/http"
"time"
"github.com/bzimmer/activity"
"github.com/bzimmer/activity/zwift"
"golang.org/x/time/rate"
)
// Client wraps the Zwift API client with rate limiting and retry logic
type Client struct {
client *zwift.Client
rateLimiter *rate.Limiter
maxRetries int
athleteID int64 // Cached athlete ID
}
// NewClient creates a new Zwift client
func NewClient(username, password string, rateLimit, maxRetries int) (*Client, error) {
// Create HTTP client
httpClient := &http.Client{
Timeout: 30 * time.Second,
}
// Create rate limiter (requests per second)
limiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit*2)
// Create Zwift client with credentials
zwiftClient, err := zwift.NewClient(
zwift.WithHTTPClient(httpClient),
zwift.WithRateLimiter(limiter),
zwift.WithTokenRefresh(username, password),
)
if err != nil {
return nil, fmt.Errorf("failed to create Zwift client: %w", err)
}
return &Client{
client: zwiftClient,
rateLimiter: limiter,
maxRetries: maxRetries,
}, nil
}
// GetProfile gets the authenticated user's profile and caches the athlete ID
func (c *Client) GetProfile(ctx context.Context) (int64, error) {
// Return cached athlete ID if available
if c.athleteID != 0 {
return c.athleteID, nil
}
// Get profile from Zwift (using "me" as the profile ID)
profile, err := c.client.Profile.Profile(ctx, zwift.Me)
if err != nil {
return 0, fmt.Errorf("failed to get profile: %w", err)
}
// Cache athlete ID
c.athleteID = profile.ID
return profile.ID, nil
}
// ListRecentActivities lists recent activities for the athlete
func (c *Client) ListRecentActivities(ctx context.Context, athleteID int64, limit int) ([]*zwift.Activity, error) {
// Create pagination spec
spec := activity.Pagination{
Total: limit,
}
// List activities from Zwift
activities, err := c.client.Activity.Activities(ctx, athleteID, spec)
if err != nil {
return nil, fmt.Errorf("failed to list activities: %w", err)
}
return activities, nil
}
// DownloadFIT downloads the FIT file for an activity
func (c *Client) DownloadFIT(ctx context.Context, athleteID, activityID int64) ([]byte, string, error) {
// Retry logic
var lastErr error
for attempt := 0; attempt < c.maxRetries; attempt++ {
if attempt > 0 {
// Exponential backoff
backoff := time.Duration(1<<uint(attempt)) * time.Second
select {
case <-ctx.Done():
return nil, "", ctx.Err()
case <-time.After(backoff):
}
}
// Export FIT file
export, err := c.client.Activity.Export(ctx, activityID)
if err != nil {
lastErr = err
continue
}
// Read data from reader
data, err := io.ReadAll(export.Reader)
if err != nil {
lastErr = err
continue
}
// Return data and filename
return data, export.Filename, nil
}
return nil, "", fmt.Errorf("failed to download FIT file after %d attempts: %w", c.maxRetries, lastErr)
}
// Close closes the client
func (c *Client) Close() error {
// Nothing to close for now
return nil
}

123
internal/config/config.go Normal file
View File

@@ -0,0 +1,123 @@
package config
import (
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/spf13/viper"
)
// Config holds all configuration for the Zwift monitor service
type Config struct {
// Authentication
Username string `mapstructure:"username"`
Password string `mapstructure:"password"`
// Storage
OutputDir string `mapstructure:"output_dir"`
StateFile string `mapstructure:"state_file"`
TokenCachePath string `mapstructure:"token_cache"`
// Monitoring
PollInterval time.Duration `mapstructure:"poll_interval"`
// API Configuration
RateLimit int `mapstructure:"rate_limit"`
MaxRetries int `mapstructure:"max_retries"`
// Logging
LogLevel string `mapstructure:"log_level"`
LogFile string `mapstructure:"log_file"`
}
// Load loads configuration from environment variables and optional config file
func Load() (*Config, error) {
v := viper.New()
// Set defaults
v.SetDefault("output_dir", "/data/activities")
v.SetDefault("state_file", "/data/zwift-state.json")
v.SetDefault("token_cache", "/data/.zwift-token.json")
v.SetDefault("poll_interval", "5m")
v.SetDefault("rate_limit", 5)
v.SetDefault("max_retries", 3)
v.SetDefault("log_level", "info")
v.SetDefault("log_file", "")
// Bind environment variables
v.SetEnvPrefix("ZWIFT")
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.AutomaticEnv()
// Try to load config file if it exists
configPaths := []string{
"config.yaml",
"/app/config.yaml",
filepath.Join(os.Getenv("HOME"), ".zwift-monitor.yaml"),
}
for _, path := range configPaths {
if _, err := os.Stat(path); err == nil {
v.SetConfigFile(path)
if err := v.ReadInConfig(); err != nil {
return nil, fmt.Errorf("failed to read config file %s: %w", path, err)
}
break
}
}
var cfg Config
if err := v.Unmarshal(&cfg); err != nil {
return nil, fmt.Errorf("failed to unmarshal config: %w", err)
}
// Validate required fields
if err := cfg.Validate(); err != nil {
return nil, err
}
// Expand paths (handle ~)
cfg.OutputDir = expandPath(cfg.OutputDir)
cfg.StateFile = expandPath(cfg.StateFile)
cfg.TokenCachePath = expandPath(cfg.TokenCachePath)
if cfg.LogFile != "" {
cfg.LogFile = expandPath(cfg.LogFile)
}
return &cfg, nil
}
// Validate checks if required configuration is present
func (c *Config) Validate() error {
if c.Username == "" {
return fmt.Errorf("username is required (set ZWIFT_USERNAME environment variable)")
}
if c.Password == "" {
return fmt.Errorf("password is required (set ZWIFT_PASSWORD environment variable)")
}
if c.PollInterval < time.Minute {
return fmt.Errorf("poll_interval must be at least 1 minute")
}
if c.RateLimit < 1 {
return fmt.Errorf("rate_limit must be at least 1")
}
if c.MaxRetries < 1 {
return fmt.Errorf("max_retries must be at least 1")
}
return nil
}
// expandPath expands ~ to home directory
func expandPath(path string) string {
if strings.HasPrefix(path, "~") {
home, err := os.UserHomeDir()
if err != nil {
return path
}
return filepath.Join(home, path[1:])
}
return path
}

149
internal/monitor/monitor.go Normal file
View File

@@ -0,0 +1,149 @@
package monitor
import (
"context"
"fmt"
"time"
"zwift-activity-loader/internal/client"
"zwift-activity-loader/internal/state"
"zwift-activity-loader/internal/storage"
"github.com/bzimmer/activity/zwift"
"go.uber.org/zap"
)
// Monitor handles activity monitoring and downloading
type Monitor struct {
client *client.Client
state *state.State
storage *storage.Storage
logger *zap.Logger
athleteID int64 // Cached athlete ID
}
// New creates a new Monitor instance
func New(client *client.Client, state *state.State, storage *storage.Storage, logger *zap.Logger) *Monitor {
return &Monitor{
client: client,
state: state,
storage: storage,
logger: logger,
}
}
// CheckAndDownload checks for new activities and downloads them
func (m *Monitor) CheckAndDownload(ctx context.Context) error {
m.logger.Info("Checking for new activities...")
// Get athlete ID (cached after first call)
if m.athleteID == 0 {
athleteID, err := m.client.GetProfile(ctx)
if err != nil {
return fmt.Errorf("failed to get profile: %w", err)
}
m.athleteID = athleteID
m.logger.Debug("Got athlete profile", zap.Int64("athlete_id", athleteID))
}
// List recent activities (last 100)
activities, err := m.client.ListRecentActivities(ctx, m.athleteID, 100)
if err != nil {
return fmt.Errorf("failed to list activities: %w", err)
}
m.logger.Debug("Listed recent activities", zap.Int("count", len(activities)))
// Filter new activities
newActivities := m.filterNewActivities(activities)
if len(newActivities) == 0 {
m.logger.Info("No new activities found")
if err := m.state.UpdateLastCheck(); err != nil {
m.logger.Warn("Failed to update last check time", zap.Error(err))
}
return nil
}
m.logger.Info("Found new activities", zap.Int("count", len(newActivities)))
// Download new activities
downloaded := 0
for _, act := range newActivities {
if err := m.downloadActivity(ctx, act); err != nil {
m.logger.Error("Failed to download activity",
zap.Int64("activity_id", act.ID),
zap.String("name", act.Name),
zap.Error(err))
continue
}
downloaded++
}
m.logger.Info("Download complete",
zap.Int("downloaded", downloaded),
zap.Int("total_new", len(newActivities)))
return nil
}
// filterNewActivities filters activities that haven't been downloaded yet
func (m *Monitor) filterNewActivities(activities []*zwift.Activity) []*zwift.Activity {
var newActivities []*zwift.Activity
for _, act := range activities {
if !m.state.IsDownloaded(act.ID) {
newActivities = append(newActivities, act)
}
}
return newActivities
}
// downloadActivity downloads a single activity
func (m *Monitor) downloadActivity(ctx context.Context, act *zwift.Activity) error {
m.logger.Info("Downloading activity",
zap.Int64("activity_id", act.ID),
zap.String("name", act.Name),
zap.Time("date", act.StartDate.Time))
// Download FIT file
data, filename, err := m.client.DownloadFIT(ctx, m.athleteID, act.ID)
if err != nil {
return fmt.Errorf("failed to download FIT file: %w", err)
}
m.logger.Debug("Downloaded FIT file",
zap.Int64("activity_id", act.ID),
zap.String("filename", filename),
zap.Int("size_bytes", len(data)))
// Save to storage
filepath, err := m.storage.Save(act.ID, act.Name, act.StartDate.Time, data)
if err != nil {
return fmt.Errorf("failed to save FIT file: %w", err)
}
// Mark as downloaded in state
info := state.ActivityInfo{
ID: act.ID,
Name: act.Name,
Date: act.StartDate.Time,
FilePath: filepath,
DownloadedAt: time.Now(),
SportType: act.Sport,
}
if err := m.state.MarkDownloaded(info); err != nil {
m.logger.Error("Failed to mark activity as downloaded",
zap.Int64("activity_id", act.ID),
zap.Error(err))
// Don't return error here - file is saved, just state update failed
}
m.logger.Info("Successfully downloaded activity",
zap.Int64("activity_id", act.ID),
zap.String("name", act.Name),
zap.String("filepath", filepath))
return nil
}

160
internal/service/service.go Normal file
View File

@@ -0,0 +1,160 @@
package service
import (
"context"
"fmt"
"time"
"zwift-activity-loader/internal/client"
"zwift-activity-loader/internal/config"
"zwift-activity-loader/internal/monitor"
"zwift-activity-loader/internal/state"
"zwift-activity-loader/internal/storage"
"go.uber.org/zap"
)
// Service represents the main monitoring service
type Service struct {
config *config.Config
client *client.Client
monitor *monitor.Monitor
state *state.State
storage *storage.Storage
logger *zap.Logger
ticker *time.Ticker
ctx context.Context
cancel context.CancelFunc
}
// New creates a new Service instance
func New(cfg *config.Config, logger *zap.Logger) (*Service, error) {
// Create client
zwiftClient, err := client.NewClient(cfg.Username, cfg.Password, cfg.RateLimit, cfg.MaxRetries)
if err != nil {
return nil, fmt.Errorf("failed to create Zwift client: %w", err)
}
// Create storage
stor, err := storage.New(cfg.OutputDir)
if err != nil {
return nil, fmt.Errorf("failed to create storage: %w", err)
}
// Create state
st := state.New(cfg.StateFile)
if err := st.Load(); err != nil {
return nil, fmt.Errorf("failed to load state: %w", err)
}
// Create monitor
mon := monitor.New(zwiftClient, st, stor, logger)
// Create context
ctx, cancel := context.WithCancel(context.Background())
return &Service{
config: cfg,
client: zwiftClient,
monitor: mon,
state: st,
storage: stor,
logger: logger,
ticker: time.NewTicker(cfg.PollInterval),
ctx: ctx,
cancel: cancel,
}, nil
}
// Run starts the service loop
func (s *Service) Run() error {
s.logger.Info("Starting Zwift Activity Monitor",
zap.Duration("poll_interval", s.config.PollInterval),
zap.String("output_dir", s.config.OutputDir))
// Get initial stats
totalDownloaded, lastCheck := s.state.GetStats()
s.logger.Info("Loaded state",
zap.Int("total_downloaded", totalDownloaded),
zap.Time("last_check", lastCheck))
// Run initial check immediately
s.logger.Info("Running initial check...")
if err := s.runCheck(); err != nil {
s.logger.Error("Initial check failed", zap.Error(err))
}
// Start ticker loop
for {
select {
case <-s.ctx.Done():
s.logger.Info("Service context cancelled, shutting down")
return nil
case <-s.ticker.C:
if err := s.runCheck(); err != nil {
s.logger.Error("Check failed", zap.Error(err))
}
}
}
}
// runCheck runs a single check for new activities
func (s *Service) runCheck() error {
// Recover from panics
defer func() {
if r := recover(); r != nil {
s.logger.Error("Panic recovered in runCheck",
zap.Any("panic", r),
zap.Stack("stack"))
}
}()
// Create context with timeout
ctx, cancel := context.WithTimeout(s.ctx, 5*time.Minute)
defer cancel()
// Run check and download
return s.monitor.CheckAndDownload(ctx)
}
// Shutdown gracefully shuts down the service
func (s *Service) Shutdown() error {
s.logger.Info("Shutting down gracefully...")
// Stop ticker
s.ticker.Stop()
// Cancel context (with timeout for current operation)
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Wait for current operation to complete
done := make(chan struct{})
go func() {
s.cancel()
close(done)
}()
select {
case <-done:
s.logger.Info("Service stopped")
case <-shutdownCtx.Done():
s.logger.Warn("Shutdown timeout, forcing exit")
}
// Save final state
if err := s.state.Save(); err != nil {
s.logger.Error("Failed to save state on shutdown", zap.Error(err))
return err
}
// Close client
if err := s.client.Close(); err != nil {
s.logger.Error("Failed to close client", zap.Error(err))
return err
}
s.logger.Info("Shutdown complete")
return nil
}

166
internal/state/state.go Normal file
View File

@@ -0,0 +1,166 @@
package state
import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
// ActivityInfo holds information about a downloaded activity
type ActivityInfo struct {
ID int64 `json:"id"`
Name string `json:"name"`
Date time.Time `json:"date"`
FilePath string `json:"file_path"`
DownloadedAt time.Time `json:"downloaded_at"`
SportType string `json:"sport_type,omitempty"`
}
// State manages the persistent state of downloaded activities
type State struct {
DownloadedActivities map[int64]ActivityInfo `json:"downloaded_activities"`
LastCheck time.Time `json:"last_check"`
TotalDownloaded int `json:"total_downloaded"`
filePath string
mu sync.RWMutex
}
// New creates a new State instance
func New(filePath string) *State {
return &State{
DownloadedActivities: make(map[int64]ActivityInfo),
LastCheck: time.Time{},
TotalDownloaded: 0,
filePath: filePath,
}
}
// Load loads state from JSON file
func (s *State) Load() error {
s.mu.Lock()
defer s.mu.Unlock()
// Check if file exists
if _, err := os.Stat(s.filePath); os.IsNotExist(err) {
// File doesn't exist, start with empty state
return nil
}
// Read file
data, err := os.ReadFile(s.filePath)
if err != nil {
return fmt.Errorf("failed to read state file: %w", err)
}
// Unmarshal JSON
if err := json.Unmarshal(data, s); err != nil {
// Try to backup corrupted file
backupPath := s.filePath + ".backup"
_ = os.WriteFile(backupPath, data, 0600)
return fmt.Errorf("failed to unmarshal state file (backed up to %s): %w", backupPath, err)
}
return nil
}
// Save saves state to JSON file
func (s *State) Save() error {
s.mu.RLock()
defer s.mu.RUnlock()
// Create directory if it doesn't exist
dir := filepath.Dir(s.filePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create state directory: %w", err)
}
// Marshal to JSON
data, err := json.MarshalIndent(s, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal state: %w", err)
}
// Write to temporary file first (atomic write)
tmpFile := s.filePath + ".tmp"
if err := os.WriteFile(tmpFile, data, 0600); err != nil {
return fmt.Errorf("failed to write temporary state file: %w", err)
}
// Rename temporary file to actual file (atomic operation)
if err := os.Rename(tmpFile, s.filePath); err != nil {
return fmt.Errorf("failed to rename temporary state file: %w", err)
}
return nil
}
// IsDownloaded checks if an activity has already been downloaded
func (s *State) IsDownloaded(activityID int64) bool {
s.mu.RLock()
defer s.mu.RUnlock()
_, exists := s.DownloadedActivities[activityID]
return exists
}
// MarkDownloaded marks an activity as downloaded
func (s *State) MarkDownloaded(info ActivityInfo) error {
s.mu.Lock()
defer s.mu.Unlock()
s.DownloadedActivities[info.ID] = info
s.TotalDownloaded++
s.LastCheck = time.Now()
// Save state after marking downloaded
return s.save()
}
// UpdateLastCheck updates the last check timestamp
func (s *State) UpdateLastCheck() error {
s.mu.Lock()
defer s.mu.Unlock()
s.LastCheck = time.Now()
return s.save()
}
// GetStats returns statistics about downloaded activities
func (s *State) GetStats() (totalDownloaded int, lastCheck time.Time) {
s.mu.RLock()
defer s.mu.RUnlock()
return s.TotalDownloaded, s.LastCheck
}
// save is an internal method that saves without locking (assumes lock is held)
func (s *State) save() error {
// Create directory if it doesn't exist
dir := filepath.Dir(s.filePath)
if err := os.MkdirAll(dir, 0755); err != nil {
return fmt.Errorf("failed to create state directory: %w", err)
}
// Marshal to JSON
data, err := json.MarshalIndent(s, "", " ")
if err != nil {
return fmt.Errorf("failed to marshal state: %w", err)
}
// Write to temporary file first (atomic write)
tmpFile := s.filePath + ".tmp"
if err := os.WriteFile(tmpFile, data, 0600); err != nil {
return fmt.Errorf("failed to write temporary state file: %w", err)
}
// Rename temporary file to actual file (atomic operation)
if err := os.Rename(tmpFile, s.filePath); err != nil {
return fmt.Errorf("failed to rename temporary state file: %w", err)
}
return nil
}

117
internal/storage/storage.go Normal file
View File

@@ -0,0 +1,117 @@
package storage
import (
"fmt"
"os"
"path/filepath"
"regexp"
"strings"
"time"
)
// Storage handles local file system operations for FIT files
type Storage struct {
outputDir string
}
// New creates a new Storage instance
func New(outputDir string) (*Storage, error) {
// Create output directory if it doesn't exist
if err := os.MkdirAll(outputDir, 0755); err != nil {
return nil, fmt.Errorf("failed to create output directory: %w", err)
}
return &Storage{
outputDir: outputDir,
}, nil
}
// Save saves a FIT file to the output directory
func (s *Storage) Save(activityID int64, activityName string, date time.Time, data []byte) (string, error) {
// Generate filename
filename := s.generateFilename(activityID, activityName, date)
filepath := filepath.Join(s.outputDir, filename)
// Check if file already exists
if _, err := os.Stat(filepath); err == nil {
return filepath, nil // File already exists, no need to save
}
// Write to temporary file first (atomic write)
tmpFile := filepath + ".tmp"
if err := os.WriteFile(tmpFile, data, 0644); err != nil {
return "", fmt.Errorf("failed to write temporary FIT file: %w", err)
}
// Verify file integrity (non-zero size)
if len(data) == 0 {
_ = os.Remove(tmpFile)
return "", fmt.Errorf("FIT file is empty")
}
// Rename temporary file to actual file (atomic operation)
if err := os.Rename(tmpFile, filepath); err != nil {
_ = os.Remove(tmpFile)
return "", fmt.Errorf("failed to rename temporary FIT file: %w", err)
}
return filepath, nil
}
// Exists checks if a FIT file already exists for an activity
func (s *Storage) Exists(activityID int64) bool {
// List files in output directory
files, err := os.ReadDir(s.outputDir)
if err != nil {
return false
}
// Check if any file starts with the activity ID
prefix := fmt.Sprintf("%d_", activityID)
for _, file := range files {
if strings.HasPrefix(file.Name(), prefix) {
return true
}
}
return false
}
// generateFilename generates a filename for a FIT file
// Format: {activityID}_{date}_{name}.fit
func (s *Storage) generateFilename(activityID int64, activityName string, date time.Time) string {
// Format date as YYYY-MM-DD
dateStr := date.Format("2006-01-02")
// Sanitize activity name (remove special characters)
name := sanitizeFilename(activityName)
if name == "" {
name = "activity"
}
// Limit name length
if len(name) > 50 {
name = name[:50]
}
return fmt.Sprintf("%d_%s_%s.fit", activityID, dateStr, name)
}
// sanitizeFilename removes special characters from filename
func sanitizeFilename(name string) string {
// Replace spaces with dashes
name = strings.ReplaceAll(name, " ", "-")
// Remove special characters (keep alphanumeric, dash, underscore)
re := regexp.MustCompile(`[^a-zA-Z0-9_-]`)
name = re.ReplaceAllString(name, "")
// Remove multiple consecutive dashes
re = regexp.MustCompile(`-+`)
name = re.ReplaceAllString(name, "-")
// Trim dashes from start and end
name = strings.Trim(name, "-")
return name
}