#!/usr/bin/env python3 """Migrate historical energy data from InfluxDB to TimescaleDB. Reads AlphaEss, Power_House, Power_Barn measurements from InfluxDB, pivots them to wide format, and bulk-inserts into TimescaleDB via `docker exec timescaledb psql`. Usage: python3 migrate_influx.py """ import csv import io import subprocess import sys from datetime import datetime, timezone, timedelta import requests # ── Config ──────────────────────────────────────────────────────────────────── INFLUX_URL = "http://localhost:8086" INFLUX_TOKEN = "hDu4JYvxciHsohn7zE0nyZfejZDik3s8fqxCkTebW1LRekckyGX_U0-wsfEcDuDV5WZER3MjQDss01jJJCeZBA==" INFLUX_ORG = "tkl" INFLUX_BUCKET = "home" PG_CONTAINER = "timescaledb" PG_DSN = "postgres://energy:changeme@localhost/energy" # Process this much data per Flux query (keeps memory reasonable). CHUNK_DAYS = 30 # ── Field mappings ──────────────────────────────────────────────────────────── INVERTER_FIELDS = { "Pv1Power": "pv1_power", "Pv2Power": "pv2_power", "InverterPowerL1": "pv_l1_power", "InverterPowerL2": "pv_l2_power", "InverterPowerL3": "pv_l3_power", "BatteryStateOfCharge": "battery_soc", "TotalEnergyConsumeFromGridGrid": "grid_import_kwh", "TotalEnergyFeedToGridGrid": "grid_export_kwh", "InverterTotalPvEnergy": "pv_energy_kwh", } METER_FIELDS = { "L1PowerW": "l1_power", "L2PowerW": "l2_power", "L3PowerW": "l3_power", "TotalImport": "import_kwh", "TotalExport": "export_kwh", } # ── InfluxDB helpers ────────────────────────────────────────────────────────── def flux_query(flux: str) -> str: resp = requests.post( f"{INFLUX_URL}/api/v2/query", params={"org": INFLUX_ORG}, headers={ "Authorization": f"Token {INFLUX_TOKEN}", "Content-Type": "application/vnd.flux", "Accept": "application/csv", }, data=flux, timeout=300, ) resp.raise_for_status() return resp.text def time_range_of(measurement: str) -> tuple[datetime, datetime]: """Return (first, last) timestamps for a measurement.""" flux = f''' from(bucket:"{INFLUX_BUCKET}") |> range(start: 2024-01-01T00:00:00Z) |> filter(fn:(r) => r._measurement == "{measurement}") |> first() |> keep(columns: ["_time"]) |> min(column: "_time") ''' text = flux_query(flux) rows = [r for r in csv.DictReader(io.StringIO(text)) if r.get("_time")] first = datetime.fromisoformat(rows[0]["_time"].replace("Z", "+00:00")) if rows else None flux2 = f''' from(bucket:"{INFLUX_BUCKET}") |> range(start: 2024-01-01T00:00:00Z) |> filter(fn:(r) => r._measurement == "{measurement}") |> last() |> keep(columns: ["_time"]) |> max(column: "_time") ''' text2 = flux_query(flux2) rows2 = [r for r in csv.DictReader(io.StringIO(text2)) if r.get("_time")] last = datetime.fromisoformat(rows2[0]["_time"].replace("Z", "+00:00")) if rows2 else None return first, last def fetch_pivoted(measurement: str, start: datetime, stop: datetime, fields: dict) -> list[dict]: """Fetch measurement data for [start, stop) and return list of wide-format dicts.""" field_filter = " or ".join( f'r._field == "{f}"' for f in fields ) flux = f''' from(bucket:"{INFLUX_BUCKET}") |> range(start: {start.strftime("%Y-%m-%dT%H:%M:%SZ")}, stop: {stop.strftime("%Y-%m-%dT%H:%M:%SZ")}) |> filter(fn:(r) => r._measurement == "{measurement}") |> filter(fn:(r) => {field_filter}) |> pivot(rowKey:["_time"], columnKey:["_field"], valueColumn:"_value") |> keep(columns: ["_time", {", ".join('"' + f + '"' for f in fields)}]) ''' text = flux_query(flux) rows = [] for row in csv.DictReader(io.StringIO(text)): if not row.get("_time"): continue rows.append(row) return rows # ── PostgreSQL helpers ──────────────────────────────────────────────────────── def psql(sql: str, copy_data: str | None = None): """Run SQL (and optionally COPY data) via docker exec psql.""" cmd = ["docker", "exec", "-i", PG_CONTAINER, "psql", PG_DSN, "-v", "ON_ERROR_STOP=1", "-c", sql] proc = subprocess.run(cmd, input=copy_data, capture_output=True, text=True) if proc.returncode != 0: print(f"psql error: {proc.stderr}", file=sys.stderr) raise RuntimeError(f"psql failed: {proc.returncode}") return proc.stdout def copy_csv(table: str, columns: list[str], rows: list[list]): """COPY rows into table using psql stdin.""" buf = io.StringIO() writer = csv.writer(buf) writer.writerows(rows) csv_data = buf.getvalue() col_list = ", ".join(columns) cmd = [ "docker", "exec", "-i", PG_CONTAINER, "psql", PG_DSN, "-v", "ON_ERROR_STOP=1", "-c", f"\\COPY {table} ({col_list}) FROM STDIN WITH (FORMAT CSV)", ] proc = subprocess.run(cmd, input=csv_data, capture_output=True, text=True) if proc.returncode != 0: print(f"COPY error: {proc.stderr}", file=sys.stderr) raise RuntimeError(f"COPY failed") return proc.stdout # ── Migration tasks ─────────────────────────────────────────────────────────── def migrate_inverter(): print("── AlphaEss → inverter ──────────────────────────────────") first, last = time_range_of("AlphaEss") if not first or not last: print(" no data found"); return print(f" range: {first.date()} → {last.date()}") db_cols = ["time", "pv1_power", "pv2_power", "pv_l1_power", "pv_l2_power", "pv_l3_power", "battery_soc", "grid_import_kwh", "grid_export_kwh", "pv_energy_kwh"] total = 0 start = first.replace(hour=0, minute=0, second=0, microsecond=0) while start <= last: stop = min(start + timedelta(days=CHUNK_DAYS), last + timedelta(seconds=1)) rows_raw = fetch_pivoted("AlphaEss", start, stop, INVERTER_FIELDS) batch = [] for r in rows_raw: try: batch.append([ r["_time"], r.get("Pv1Power") or None, r.get("Pv2Power") or None, r.get("InverterPowerL1") or None, r.get("InverterPowerL2") or None, r.get("InverterPowerL3") or None, r.get("BatteryStateOfCharge") or None, r.get("TotalEnergyConsumeFromGridGrid") or None, r.get("TotalEnergyFeedToGridGrid") or None, r.get("InverterTotalPvEnergy") or None, ]) except Exception as e: print(f" skip row: {e}") if batch: copy_csv("inverter", db_cols, batch) total += len(batch) print(f" {start.date()} – {stop.date()}: {len(batch)} rows (total {total})") start = stop print(f" done: {total} rows inserted") def migrate_meter(measurement: str, device: str): print(f"── {measurement} → power_meter ({device}) ──────────────") first, last = time_range_of(measurement) if not first or not last: print(" no data found"); return print(f" range: {first.date()} → {last.date()}") db_cols = ["time", "device", "l1_power", "l2_power", "l3_power", "import_kwh", "export_kwh"] total = 0 start = first.replace(hour=0, minute=0, second=0, microsecond=0) while start <= last: stop = min(start + timedelta(days=CHUNK_DAYS), last + timedelta(seconds=1)) rows_raw = fetch_pivoted(measurement, start, stop, METER_FIELDS) batch = [] for r in rows_raw: try: batch.append([ r["_time"], device, r.get("L1PowerW") or None, r.get("L2PowerW") or None, r.get("L3PowerW") or None, r.get("TotalImport") or None, r.get("TotalExport") or None, ]) except Exception as e: print(f" skip row: {e}") if batch: copy_csv("power_meter", db_cols, batch) total += len(batch) print(f" {start.date()} – {stop.date()}: {len(batch)} rows (total {total})") start = stop print(f" done: {total} rows inserted") def refresh_aggregates(): print("── Refreshing continuous aggregates ─────────────────────") views = [ ("inverter_10m", "2024-01-01", None), ("power_meter_10m", "2024-01-01", None), ("inverter_1h", "2024-01-01", None), ("power_meter_1h", "2024-01-01", None), ("inverter_daily", "2024-01-01", None), ("power_meter_daily", "2024-01-01", None), ] for view, start, stop in views: stop_clause = f"'{stop}'" if stop else "NULL" sql = (f"CALL refresh_continuous_aggregate('{view}', " f"'{start}', {stop_clause});") print(f" {view}...", end=" ", flush=True) cmd = ["docker", "exec", "-i", PG_CONTAINER, "psql", "-U", "fitdata", "-d", "energy", "-v", "ON_ERROR_STOP=1", "-c", sql] proc = subprocess.run(cmd, capture_output=True, text=True) if proc.returncode != 0: print(f"error: {proc.stderr}", file=sys.stderr) raise RuntimeError(f"refresh failed: {proc.returncode}") print("ok") print(" done") # ── Main ────────────────────────────────────────────────────────────────────── if __name__ == "__main__": # Skip rows already in TimescaleDB to avoid duplicates. # The simplest approach: delete nothing, use ON CONFLICT DO NOTHING. # TimescaleDB hypertables don't have a unique constraint on time alone, # so we rely on the data not being present (fresh DB) or accept duplicates # for any overlap period, which the retention policy will eventually clean. # # If you need to re-run safely, truncate first: # docker exec timescaledb psql -c "TRUNCATE inverter, power_meter;" refresh_aggregates() print("\nMigration complete.")