import datetime import json import logging import os import shutil import threading import time from w1thermsensor import W1ThermSensor import RPi.GPIO as GPIO import heat class Control(threading.Thread): def __init__(self, configfile): threading.Thread.__init__(self) self.__run_condition = True self.__config_file = configfile self.__config = None self.__log = logging.getLogger() self.__sensor = W1ThermSensor() self.__heat = None self.__trigger_read_config = True self.__water_state = [] self.__temperature = None def reload_config(self): self.__log.info("Reloading configuration triggered") self.__trigger_read_config = True def load_config(self): try: with open(self.__config_file, "r", encoding="UTF-8") as handle: self.__config = json.load(handle) except FileNotFoundError: # create default config os.makedirs(os.path.dirname(self.__config_file), exist_ok=True) shutil.copyfile("config/config.json", self.__config_file) with open(self.__config_file, "r", encoding="UTF-8") as handle: self.__config = json.load(handle) if self.__heat is not None: self.__heat.stop() self.__heat = heat.Heat(int(self.__config['heat'][0]['pin'])) for _ in range(len(self.__config['water'])): self.__water_state.append(False) # Configure all water pins GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) for entry in self.__config['water']: pin = int(entry['pin']) GPIO.setup(pin, GPIO.OUT) GPIO.output(pin, 1) def run(self): while self.__run_condition: if self.__trigger_read_config: self.__trigger_read_config = False self.load_config() self.__heat.start() tmp = round(float(self.__sensor.get_temperature()), 1) if tmp != self.__temperature: self.__temperature = tmp self.__log.info("Temperature: %.1f °C", self.__temperature) # handle heat if self.__config['heat'][0]['autostate']: on_temperature = float(self.__config['heat'][0]['on_temperature']) off_temperature = float(self.__config['heat'][0]['off_temperature']) if self.__temperature < on_temperature and not self.__heat.state(): self.__heat.on() self.__log.info("heat on") elif self.__temperature > off_temperature and self.__heat.state(): self.__heat.off() self.__log.info("heat off") # handle water entries water = self.__config['water'] water_index = 0 for entry in water: now = datetime.datetime.now() if entry['autostate']: idx = 0 if int(now.hour) >= 12: idx = 1 on_time_pattern = entry['times'][idx]['on_time'] on_time_pattern = on_time_pattern.split(':') on_time = now.replace(hour=int(on_time_pattern[0]), minute=int(on_time_pattern[1]), second=0, microsecond=0) off_time_pattern = entry['times'][idx]['off_time'] off_time_pattern = off_time_pattern.split(':') off_time = now.replace(hour=int(off_time_pattern[0]), minute=int(off_time_pattern[1]), second=0, microsecond=0) pin = int(entry['pin']) if now > on_time and now <= off_time and not self.__water_state[water_index]: GPIO.output(pin, 0) self.__water_state[water_index] = True self.__log.info("water on") elif now > off_time and self.__water_state[water_index]: GPIO.output(pin, 1) self.__water_state[water_index] = False self.__log.info("water off") water_index += 1 time.sleep(1) self.__heat.stop() def stop(self): self.__run_condition = False self.join() def get_current_temperature(self, ident: int): return f"{self.__temperature:.1f}" def get_current_heat_state(self, ident: int): return self.__heat.state() def get_current_water_state(self, ident: int): if ident > 0 and ident < len(self.__water_state): return self.__water_state[ident -1] return None def set_heat_state(self, ident: str): self.__heat.on() self.__log.info("heat on by button") def clear_heat_state(self, ident: str): self.__heat.off() self.__log.info("heat off by button") def set_water_state(self, ident: str): ident = int(ident) if ident > 0 and ident < len(self.__water_state): pin = int(self.__config['water'][ident - 1]['pin']) self.__water_state[ident - 1] = True self.__log.info("water on by button") GPIO.output(pin, 0) def clear_water_state(self, ident: str): ident = int(ident) if ident > 0 and ident < len(self.__water_state): pin = int(self.__config['water'][ident - 1]['pin']) self.__water_state[ident - 1] = False self.__log.info("water off by button") GPIO.output(pin, 1)