282 lines
11 KiB
Python
282 lines
11 KiB
Python
#!/usr/bin/env python3
|
||
"""Migrate historical energy data from InfluxDB to TimescaleDB.
|
||
|
||
Reads AlphaEss, Power_House, Power_Barn measurements from InfluxDB,
|
||
pivots them to wide format, and bulk-inserts into TimescaleDB via
|
||
`docker exec timescaledb psql`.
|
||
|
||
Usage:
|
||
python3 migrate_influx.py
|
||
"""
|
||
|
||
import csv
|
||
import io
|
||
import subprocess
|
||
import sys
|
||
from datetime import datetime, timezone, timedelta
|
||
|
||
import requests
|
||
|
||
# ── Config ────────────────────────────────────────────────────────────────────
|
||
|
||
INFLUX_URL = "http://localhost:8086"
|
||
INFLUX_TOKEN = "hDu4JYvxciHsohn7zE0nyZfejZDik3s8fqxCkTebW1LRekckyGX_U0-wsfEcDuDV5WZER3MjQDss01jJJCeZBA=="
|
||
INFLUX_ORG = "tkl"
|
||
INFLUX_BUCKET = "home"
|
||
|
||
PG_CONTAINER = "timescaledb"
|
||
PG_DSN = "postgres://energy:changeme@localhost/energy"
|
||
|
||
# Process this much data per Flux query (keeps memory reasonable).
|
||
CHUNK_DAYS = 30
|
||
|
||
# ── Field mappings ────────────────────────────────────────────────────────────
|
||
|
||
INVERTER_FIELDS = {
|
||
"Pv1Power": "pv1_power",
|
||
"Pv2Power": "pv2_power",
|
||
"InverterPowerL1": "pv_l1_power",
|
||
"InverterPowerL2": "pv_l2_power",
|
||
"InverterPowerL3": "pv_l3_power",
|
||
"BatteryStateOfCharge": "battery_soc",
|
||
"TotalEnergyConsumeFromGridGrid": "grid_import_kwh",
|
||
"TotalEnergyFeedToGridGrid": "grid_export_kwh",
|
||
"InverterTotalPvEnergy": "pv_energy_kwh",
|
||
}
|
||
|
||
METER_FIELDS = {
|
||
"L1PowerW": "l1_power",
|
||
"L2PowerW": "l2_power",
|
||
"L3PowerW": "l3_power",
|
||
"TotalImport": "import_kwh",
|
||
"TotalExport": "export_kwh",
|
||
}
|
||
|
||
# ── InfluxDB helpers ──────────────────────────────────────────────────────────
|
||
|
||
def flux_query(flux: str) -> str:
|
||
resp = requests.post(
|
||
f"{INFLUX_URL}/api/v2/query",
|
||
params={"org": INFLUX_ORG},
|
||
headers={
|
||
"Authorization": f"Token {INFLUX_TOKEN}",
|
||
"Content-Type": "application/vnd.flux",
|
||
"Accept": "application/csv",
|
||
},
|
||
data=flux,
|
||
timeout=300,
|
||
)
|
||
resp.raise_for_status()
|
||
return resp.text
|
||
|
||
|
||
def time_range_of(measurement: str) -> tuple[datetime, datetime]:
|
||
"""Return (first, last) timestamps for a measurement."""
|
||
flux = f'''
|
||
from(bucket:"{INFLUX_BUCKET}")
|
||
|> range(start: 2024-01-01T00:00:00Z)
|
||
|> filter(fn:(r) => r._measurement == "{measurement}")
|
||
|> first()
|
||
|> keep(columns: ["_time"])
|
||
|> min(column: "_time")
|
||
'''
|
||
text = flux_query(flux)
|
||
rows = [r for r in csv.DictReader(io.StringIO(text)) if r.get("_time")]
|
||
first = datetime.fromisoformat(rows[0]["_time"].replace("Z", "+00:00")) if rows else None
|
||
|
||
flux2 = f'''
|
||
from(bucket:"{INFLUX_BUCKET}")
|
||
|> range(start: 2024-01-01T00:00:00Z)
|
||
|> filter(fn:(r) => r._measurement == "{measurement}")
|
||
|> last()
|
||
|> keep(columns: ["_time"])
|
||
|> max(column: "_time")
|
||
'''
|
||
text2 = flux_query(flux2)
|
||
rows2 = [r for r in csv.DictReader(io.StringIO(text2)) if r.get("_time")]
|
||
last = datetime.fromisoformat(rows2[0]["_time"].replace("Z", "+00:00")) if rows2 else None
|
||
|
||
return first, last
|
||
|
||
|
||
def fetch_pivoted(measurement: str, start: datetime, stop: datetime, fields: dict) -> list[dict]:
|
||
"""Fetch measurement data for [start, stop) and return list of wide-format dicts."""
|
||
field_filter = " or ".join(
|
||
f'r._field == "{f}"' for f in fields
|
||
)
|
||
flux = f'''
|
||
from(bucket:"{INFLUX_BUCKET}")
|
||
|> range(start: {start.strftime("%Y-%m-%dT%H:%M:%SZ")},
|
||
stop: {stop.strftime("%Y-%m-%dT%H:%M:%SZ")})
|
||
|> filter(fn:(r) => r._measurement == "{measurement}")
|
||
|> filter(fn:(r) => {field_filter})
|
||
|> pivot(rowKey:["_time"], columnKey:["_field"], valueColumn:"_value")
|
||
|> keep(columns: ["_time", {", ".join('"' + f + '"' for f in fields)}])
|
||
'''
|
||
text = flux_query(flux)
|
||
rows = []
|
||
for row in csv.DictReader(io.StringIO(text)):
|
||
if not row.get("_time"):
|
||
continue
|
||
rows.append(row)
|
||
return rows
|
||
|
||
# ── PostgreSQL helpers ────────────────────────────────────────────────────────
|
||
|
||
def psql(sql: str, copy_data: str | None = None):
|
||
"""Run SQL (and optionally COPY data) via docker exec psql."""
|
||
cmd = ["docker", "exec", "-i", PG_CONTAINER,
|
||
"psql", PG_DSN, "-v", "ON_ERROR_STOP=1", "-c", sql]
|
||
proc = subprocess.run(cmd, input=copy_data, capture_output=True, text=True)
|
||
if proc.returncode != 0:
|
||
print(f"psql error: {proc.stderr}", file=sys.stderr)
|
||
raise RuntimeError(f"psql failed: {proc.returncode}")
|
||
return proc.stdout
|
||
|
||
|
||
def copy_csv(table: str, columns: list[str], rows: list[list]):
|
||
"""COPY rows into table using psql stdin."""
|
||
buf = io.StringIO()
|
||
writer = csv.writer(buf)
|
||
writer.writerows(rows)
|
||
csv_data = buf.getvalue()
|
||
|
||
col_list = ", ".join(columns)
|
||
cmd = [
|
||
"docker", "exec", "-i", PG_CONTAINER,
|
||
"psql", PG_DSN, "-v", "ON_ERROR_STOP=1",
|
||
"-c", f"\\COPY {table} ({col_list}) FROM STDIN WITH (FORMAT CSV)",
|
||
]
|
||
proc = subprocess.run(cmd, input=csv_data, capture_output=True, text=True)
|
||
if proc.returncode != 0:
|
||
print(f"COPY error: {proc.stderr}", file=sys.stderr)
|
||
raise RuntimeError(f"COPY failed")
|
||
return proc.stdout
|
||
|
||
# ── Migration tasks ───────────────────────────────────────────────────────────
|
||
|
||
def migrate_inverter():
|
||
print("── AlphaEss → inverter ──────────────────────────────────")
|
||
first, last = time_range_of("AlphaEss")
|
||
if not first or not last:
|
||
print(" no data found"); return
|
||
print(f" range: {first.date()} → {last.date()}")
|
||
|
||
db_cols = ["time", "pv1_power", "pv2_power", "pv_l1_power", "pv_l2_power",
|
||
"pv_l3_power", "battery_soc", "grid_import_kwh",
|
||
"grid_export_kwh", "pv_energy_kwh"]
|
||
|
||
total = 0
|
||
start = first.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
while start <= last:
|
||
stop = min(start + timedelta(days=CHUNK_DAYS), last + timedelta(seconds=1))
|
||
rows_raw = fetch_pivoted("AlphaEss", start, stop, INVERTER_FIELDS)
|
||
|
||
batch = []
|
||
for r in rows_raw:
|
||
try:
|
||
batch.append([
|
||
r["_time"],
|
||
r.get("Pv1Power") or None,
|
||
r.get("Pv2Power") or None,
|
||
r.get("InverterPowerL1") or None,
|
||
r.get("InverterPowerL2") or None,
|
||
r.get("InverterPowerL3") or None,
|
||
r.get("BatteryStateOfCharge") or None,
|
||
r.get("TotalEnergyConsumeFromGridGrid") or None,
|
||
r.get("TotalEnergyFeedToGridGrid") or None,
|
||
r.get("InverterTotalPvEnergy") or None,
|
||
])
|
||
except Exception as e:
|
||
print(f" skip row: {e}")
|
||
|
||
if batch:
|
||
copy_csv("inverter", db_cols, batch)
|
||
total += len(batch)
|
||
print(f" {start.date()} – {stop.date()}: {len(batch)} rows (total {total})")
|
||
start = stop
|
||
|
||
print(f" done: {total} rows inserted")
|
||
|
||
|
||
def migrate_meter(measurement: str, device: str):
|
||
print(f"── {measurement} → power_meter ({device}) ──────────────")
|
||
first, last = time_range_of(measurement)
|
||
if not first or not last:
|
||
print(" no data found"); return
|
||
print(f" range: {first.date()} → {last.date()}")
|
||
|
||
db_cols = ["time", "device", "l1_power", "l2_power", "l3_power",
|
||
"import_kwh", "export_kwh"]
|
||
|
||
total = 0
|
||
start = first.replace(hour=0, minute=0, second=0, microsecond=0)
|
||
while start <= last:
|
||
stop = min(start + timedelta(days=CHUNK_DAYS), last + timedelta(seconds=1))
|
||
rows_raw = fetch_pivoted(measurement, start, stop, METER_FIELDS)
|
||
|
||
batch = []
|
||
for r in rows_raw:
|
||
try:
|
||
batch.append([
|
||
r["_time"],
|
||
device,
|
||
r.get("L1PowerW") or None,
|
||
r.get("L2PowerW") or None,
|
||
r.get("L3PowerW") or None,
|
||
r.get("TotalImport") or None,
|
||
r.get("TotalExport") or None,
|
||
])
|
||
except Exception as e:
|
||
print(f" skip row: {e}")
|
||
|
||
if batch:
|
||
copy_csv("power_meter", db_cols, batch)
|
||
total += len(batch)
|
||
print(f" {start.date()} – {stop.date()}: {len(batch)} rows (total {total})")
|
||
start = stop
|
||
|
||
print(f" done: {total} rows inserted")
|
||
|
||
|
||
def refresh_aggregates():
|
||
print("── Refreshing continuous aggregates ─────────────────────")
|
||
views = [
|
||
("inverter_10m", "2024-01-01", None),
|
||
("power_meter_10m", "2024-01-01", None),
|
||
("inverter_1h", "2024-01-01", None),
|
||
("power_meter_1h", "2024-01-01", None),
|
||
("inverter_daily", "2024-01-01", None),
|
||
("power_meter_daily", "2024-01-01", None),
|
||
]
|
||
for view, start, stop in views:
|
||
stop_clause = f"'{stop}'" if stop else "NULL"
|
||
sql = (f"CALL refresh_continuous_aggregate('{view}', "
|
||
f"'{start}', {stop_clause});")
|
||
print(f" {view}...", end=" ", flush=True)
|
||
cmd = ["docker", "exec", "-i", PG_CONTAINER,
|
||
"psql", "-U", "fitdata", "-d", "energy",
|
||
"-v", "ON_ERROR_STOP=1", "-c", sql]
|
||
proc = subprocess.run(cmd, capture_output=True, text=True)
|
||
if proc.returncode != 0:
|
||
print(f"error: {proc.stderr}", file=sys.stderr)
|
||
raise RuntimeError(f"refresh failed: {proc.returncode}")
|
||
print("ok")
|
||
print(" done")
|
||
|
||
|
||
# ── Main ──────────────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
# Skip rows already in TimescaleDB to avoid duplicates.
|
||
# The simplest approach: delete nothing, use ON CONFLICT DO NOTHING.
|
||
# TimescaleDB hypertables don't have a unique constraint on time alone,
|
||
# so we rely on the data not being present (fresh DB) or accept duplicates
|
||
# for any overlap period, which the retention policy will eventually clean.
|
||
#
|
||
# If you need to re-run safely, truncate first:
|
||
# docker exec timescaledb psql <DSN> -c "TRUNCATE inverter, power_meter;"
|
||
|
||
refresh_aggregates()
|
||
print("\nMigration complete.")
|