package api import ( "context" "fmt" "sort" "time" "github.com/jackc/pgx/v5/pgxpool" ) type EnergyBar struct { Time time.Time `json:"time"` Buy float64 `json:"buy"` Sell float64 `json:"sell"` Produce float64 `json:"produce"` Consume float64 `json:"consume"` } type PowerPoint struct { Time time.Time `json:"time"` PV float64 `json:"pv"` Barn float64 `json:"barn"` House float64 `json:"house"` } type BatteryPoint struct { Time time.Time `json:"time"` Level float64 `json:"level"` } var periodDefaults = map[string]int{ "day": 30, "week": 12, "month": 12, } // viewForRange picks the right continuous aggregate views for the given time range. func viewForRange(timeRange string) (invView, meterView, interval string) { switch timeRange { case "1h": return "inverter_10m", "power_meter_10m", "1 hour" case "6h": return "inverter_10m", "power_meter_10m", "6 hours" case "7d": return "inverter_1h", "power_meter_1h", "7 days" default: // "24h" and anything else return "inverter_10m", "power_meter_10m", "24 hours" } } func sortedUnion(maps ...map[time.Time]float64) []time.Time { seen := make(map[time.Time]bool) for _, m := range maps { for t := range m { seen[t] = true } } times := make([]time.Time, 0, len(seen)) for t := range seen { times = append(times, t) } sort.Slice(times, func(i, j int) bool { return times[i].Before(times[j]) }) return times } func getPower(ctx context.Context, pool *pgxpool.Pool, timeRange string) ([]PowerPoint, error) { invView, meterView, interval := viewForRange(timeRange) type colRow struct { bucket time.Time value float64 } queryCol := func(sql string) (map[time.Time]float64, error) { rows, err := pool.Query(ctx, sql) if err != nil { return nil, err } defer rows.Close() m := make(map[time.Time]float64) for rows.Next() { var r colRow if err := rows.Scan(&r.bucket, &r.value); err != nil { return nil, err } m[r.bucket] = r.value } return m, rows.Err() } pvSQL := fmt.Sprintf( `SELECT bucket, COALESCE(pv_power, 0) FROM %s WHERE bucket >= NOW() - '%s'::interval ORDER BY bucket`, invView, interval) houseSQL := fmt.Sprintf( `SELECT bucket, COALESCE(total_power, 0) FROM %s WHERE device = 'house' AND bucket >= NOW() - '%s'::interval ORDER BY bucket`, meterView, interval) barnSQL := fmt.Sprintf( `SELECT bucket, COALESCE(total_power, 0) FROM %s WHERE device = 'barn' AND bucket >= NOW() - '%s'::interval ORDER BY bucket`, meterView, interval) type result struct { m map[time.Time]float64 err error } pvCh := make(chan result, 1) houseCh := make(chan result, 1) barnCh := make(chan result, 1) go func() { m, e := queryCol(pvSQL); pvCh <- result{m, e} }() go func() { m, e := queryCol(houseSQL); houseCh <- result{m, e} }() go func() { m, e := queryCol(barnSQL); barnCh <- result{m, e} }() pvRes := <-pvCh houseRes := <-houseCh barnRes := <-barnCh if pvRes.err != nil { return nil, pvRes.err } if houseRes.err != nil { return nil, houseRes.err } if barnRes.err != nil { return nil, barnRes.err } times := sortedUnion(pvRes.m, houseRes.m, barnRes.m) pts := make([]PowerPoint, 0, len(times)) for _, t := range times { pts = append(pts, PowerPoint{ Time: t, PV: pvRes.m[t], House: houseRes.m[t], Barn: barnRes.m[t], }) } return pts, nil } func getBattery(ctx context.Context, pool *pgxpool.Pool, timeRange string) ([]BatteryPoint, error) { invView, _, interval := viewForRange(timeRange) sql := fmt.Sprintf( `SELECT bucket, COALESCE(battery_soc, 0) FROM %s WHERE bucket >= NOW() - '%s'::interval ORDER BY bucket`, invView, interval) rows, err := pool.Query(ctx, sql) if err != nil { return nil, err } defer rows.Close() var pts []BatteryPoint for rows.Next() { var p BatteryPoint if err := rows.Scan(&p.Time, &p.Level); err != nil { return nil, err } pts = append(pts, p) } return pts, rows.Err() } func getBars(ctx context.Context, pool *pgxpool.Pool, period string, count int) ([]EnergyBar, error) { var periodTrunc, rangeInterval string switch period { case "day": periodTrunc = "day" rangeInterval = fmt.Sprintf("%d days", count+1) case "week": periodTrunc = "week" rangeInterval = fmt.Sprintf("%d weeks", count+1) case "month": periodTrunc = "month" rangeInterval = fmt.Sprintf("%d months", count+1) default: return nil, fmt.Errorf("unknown period %q", period) } // DISTINCT ON picks the last-written daily row for each period bucket, // then LAG() computes the delta (energy consumed/produced/traded) over // that period. Fetching count+1 rows ensures LAG has a previous value // for the first bar; we trim to count at the end. sql := fmt.Sprintf(` WITH inv_last AS ( SELECT DISTINCT ON (date_trunc('%[1]s', bucket)) date_trunc('%[1]s', bucket) AS period, grid_import_kwh, grid_export_kwh, pv_energy_kwh FROM inverter_daily WHERE bucket >= NOW() - '%[2]s'::interval ORDER BY date_trunc('%[1]s', bucket), bucket DESC ), house_last AS ( SELECT DISTINCT ON (date_trunc('%[1]s', bucket)) date_trunc('%[1]s', bucket) AS period, import_kwh FROM power_meter_daily WHERE device = 'house' AND bucket >= NOW() - '%[2]s'::interval ORDER BY date_trunc('%[1]s', bucket), bucket DESC ), barn_last AS ( SELECT DISTINCT ON (date_trunc('%[1]s', bucket)) date_trunc('%[1]s', bucket) AS period, import_kwh FROM power_meter_daily WHERE device = 'barn' AND bucket >= NOW() - '%[2]s'::interval ORDER BY date_trunc('%[1]s', bucket), bucket DESC ), joined AS ( SELECT i.period, i.grid_import_kwh, LAG(i.grid_import_kwh) OVER (ORDER BY i.period) AS prev_import, i.grid_export_kwh, LAG(i.grid_export_kwh) OVER (ORDER BY i.period) AS prev_export, i.pv_energy_kwh, LAG(i.pv_energy_kwh) OVER (ORDER BY i.period) AS prev_pv, h.import_kwh AS house_kwh, LAG(h.import_kwh) OVER (ORDER BY i.period) AS prev_house, b.import_kwh AS barn_kwh, LAG(b.import_kwh) OVER (ORDER BY i.period) AS prev_barn FROM inv_last i LEFT JOIN house_last h ON h.period = i.period LEFT JOIN barn_last b ON b.period = i.period ) SELECT period, GREATEST(0, grid_import_kwh - prev_import) AS buy, GREATEST(0, grid_export_kwh - prev_export) AS sell, GREATEST(0, pv_energy_kwh - prev_pv) AS produce, GREATEST(0, COALESCE(house_kwh - prev_house, 0) + COALESCE(barn_kwh - prev_barn, 0)) AS consume FROM joined WHERE prev_import IS NOT NULL ORDER BY period `, periodTrunc, rangeInterval) rows, err := pool.Query(ctx, sql) if err != nil { return nil, err } defer rows.Close() var bars []EnergyBar for rows.Next() { var b EnergyBar if err := rows.Scan(&b.Time, &b.Buy, &b.Sell, &b.Produce, &b.Consume); err != nil { return nil, err } bars = append(bars, b) } if err := rows.Err(); err != nil { return nil, err } // Trim to the requested count (the LAG window requires one extra row) if len(bars) > count { bars = bars[len(bars)-count:] } return bars, nil }