gardencontrol/control/_Control.py
2022-04-12 09:23:38 +02:00

154 lines
5.7 KiB
Python

import datetime
import json
import logging
import os
import shutil
import threading
import time
from w1thermsensor import W1ThermSensor
import RPi.GPIO as GPIO
import heat
class Control(threading.Thread):
def __init__(self, configfile):
threading.Thread.__init__(self)
self.__run_condition = True
self.__config_file = configfile
self.__config = None
self.__log = logging.getLogger()
self.__sensor = W1ThermSensor()
self.__heat = None
self.__trigger_read_config = True
self.__water_state = []
self.__temperature = None
def reload_config(self):
self.__log.info("Reloading configuration triggered")
self.__trigger_read_config = True
def load_config(self):
try:
with open(self.__config_file, "r", encoding="UTF-8") as handle:
self.__config = json.load(handle)
except FileNotFoundError:
# create default config
os.makedirs(os.path.dirname(self.__config_file), exist_ok=True)
shutil.copyfile("config/config.json", self.__config_file)
with open(self.__config_file, "r", encoding="UTF-8") as handle:
self.__config = json.load(handle)
if self.__heat is not None:
self.__heat.stop()
self.__heat = heat.Heat(int(self.__config['heat'][0]['pin']))
for _ in range(len(self.__config['water'])):
self.__water_state.append(False)
# Configure all water pins
GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM)
for entry in self.__config['water']:
pin = int(entry['pin'])
GPIO.setup(pin, GPIO.OUT)
GPIO.output(pin, 1)
def run(self):
while self.__run_condition:
if self.__trigger_read_config:
self.__trigger_read_config = False
self.load_config()
self.__heat.start()
tmp = round(float(self.__sensor.get_temperature()), 1)
if tmp != self.__temperature:
self.__temperature = tmp
self.__log.info("Temperature: %.1f °C", self.__temperature)
# handle heat
if self.__config['heat'][0]['autostate']:
on_temperature = float(self.__config['heat'][0]['on_temperature'])
off_temperature = float(self.__config['heat'][0]['off_temperature'])
if self.__temperature < on_temperature and not self.__heat.state():
self.__heat.on()
self.__log.info("heat on")
elif self.__temperature > off_temperature and self.__heat.state():
self.__heat.off()
self.__log.info("heat off")
# handle water entries
water = self.__config['water']
water_index = 0
for entry in water:
now = datetime.datetime.now()
if entry['autostate']:
idx = 0
if int(now.hour) >= 12:
idx = 1
on_time_pattern = entry['times'][idx]['on_time']
on_time_pattern = on_time_pattern.split(':')
on_time = now.replace(hour=int(on_time_pattern[0]),
minute=int(on_time_pattern[1]),
second=0,
microsecond=0)
off_time_pattern = entry['times'][idx]['off_time']
off_time_pattern = off_time_pattern.split(':')
off_time = now.replace(hour=int(off_time_pattern[0]),
minute=int(off_time_pattern[1]),
second=0,
microsecond=0)
pin = int(entry['pin'])
if now > on_time and now <= off_time and not self.__water_state[water_index]:
GPIO.output(pin, 0)
self.__water_state[water_index] = True
self.__log.info("water on")
elif now > off_time and self.__water_state[water_index]:
GPIO.output(pin, 1)
self.__water_state[water_index] = False
self.__log.info("water off")
water_index += 1
time.sleep(1)
self.__heat.stop()
def stop(self):
self.__run_condition = False
self.join()
def get_current_temperature(self, ident: int):
return f"{self.__temperature:.1f}"
def get_current_heat_state(self, ident: int):
return self.__heat.state()
def get_current_water_state(self, ident: int):
if ident > 0 and ident <= len(self.__water_state):
return self.__water_state[ident -1]
return None
def set_heat_state(self, ident: str):
self.__heat.on()
self.__log.info("heat on by button")
def clear_heat_state(self, ident: str):
self.__heat.off()
self.__log.info("heat off by button")
def set_water_state(self, ident: str):
ident = int(ident)
if ident > 0 and ident <= len(self.__water_state):
pin = int(self.__config['water'][ident - 1]['pin'])
self.__water_state[ident - 1] = True
self.__log.info("water on by button")
GPIO.output(pin, 0)
def clear_water_state(self, ident: str):
ident = int(ident)
if ident > 0 and ident <= len(self.__water_state):
pin = int(self.__config['water'][ident - 1]['pin'])
self.__water_state[ident - 1] = False
self.__log.info("water off by button")
GPIO.output(pin, 1)