150 lines
5.6 KiB
Python
150 lines
5.6 KiB
Python
|
|
||
|
import datetime
|
||
|
import json
|
||
|
import logging
|
||
|
import os
|
||
|
import shutil
|
||
|
import threading
|
||
|
import time
|
||
|
|
||
|
from w1thermsensor import W1ThermSensor
|
||
|
import RPi.GPIO as GPIO
|
||
|
|
||
|
import heat
|
||
|
|
||
|
class Control(threading.Thread):
|
||
|
def __init__(self, configfile):
|
||
|
super(Control, self).__init__()
|
||
|
self.__run_condition = True
|
||
|
self.__config_file = configfile
|
||
|
self.__config = None
|
||
|
self.__log = logging.getLogger()
|
||
|
self.__sensor = W1ThermSensor()
|
||
|
self.__heat = None
|
||
|
self.__trigger_read_config = True
|
||
|
self.__water_state = []
|
||
|
self.__temperature = None
|
||
|
|
||
|
def reload_config(self):
|
||
|
self.__trigger_read_config = True
|
||
|
|
||
|
def load_config(self):
|
||
|
try:
|
||
|
with open(self.__config_file, "r") as handle:
|
||
|
self.__config = json.load(handle)
|
||
|
except FileNotFoundError:
|
||
|
# create default config
|
||
|
os.makedirs(os.path.dirname(self.__config_file), exist_ok=True)
|
||
|
shutil.copyfile("config/config.json", self.__config_file)
|
||
|
with open(self.__config_file, "r") as handle:
|
||
|
self.__config = json.load(handle)
|
||
|
self.__heat = heat.Heat(int(self.__config['heat'][0]['pin']))
|
||
|
for _ in range(len(self.__config['water'])):
|
||
|
self.__water_state.append(False)
|
||
|
# Configure all water pins
|
||
|
GPIO.setwarnings(False)
|
||
|
GPIO.setmode(GPIO.BCM)
|
||
|
for entry in self.__config['water']:
|
||
|
pin = int(entry['pin'])
|
||
|
GPIO.setup(pin, GPIO.OUT)
|
||
|
GPIO.output(pin, 1)
|
||
|
|
||
|
def run(self):
|
||
|
self.load_config()
|
||
|
self.__heat.off()
|
||
|
self.__heat.start()
|
||
|
while self.__run_condition:
|
||
|
if self.__trigger_read_config:
|
||
|
self.__trigger_read_config = False
|
||
|
self.load_config()
|
||
|
|
||
|
self.__temperature = float(self.__sensor.get_temperature())
|
||
|
|
||
|
# handle heat
|
||
|
if self.__config['heat'][0]['autostate']:
|
||
|
on_temperature = float(self.__config['heat'][0]['on_temperature'])
|
||
|
off_temperature = float(self.__config['heat'][0]['off_temperature'])
|
||
|
if self.__temperature < on_temperature and not self.__heat.state():
|
||
|
self.__heat.on()
|
||
|
self.__log.info("Switch heat on by temperature level: %.1f °C", self.__temperature)
|
||
|
elif self.__temperature > off_temperature and self.__heat.state():
|
||
|
self.__heat.off()
|
||
|
self.__log.info("Switch heat off by temperature level: %.1f °C", self.__temperature)
|
||
|
|
||
|
# handle water entries
|
||
|
water = self.__config['water']
|
||
|
water_index = 0
|
||
|
for entry in water:
|
||
|
now = datetime.datetime.now()
|
||
|
if entry['autostate']:
|
||
|
idx = 0
|
||
|
if int(now.hour) >= 12:
|
||
|
idx = 1
|
||
|
on_time_pattern = entry['times'][idx]['on_time']
|
||
|
on_time_pattern = on_time_pattern.split(':')
|
||
|
on_time = now.replace(hour=int(on_time_pattern[0]),
|
||
|
minute=int(on_time_pattern[1]),
|
||
|
second=0,
|
||
|
microsecond=0)
|
||
|
off_time_pattern = entry['times'][idx]['off_time']
|
||
|
off_time_pattern = off_time_pattern.split(':')
|
||
|
off_time = now.replace(hour=int(off_time_pattern[0]),
|
||
|
minute=int(off_time_pattern[1]),
|
||
|
second=0,
|
||
|
microsecond=0)
|
||
|
pin = int(entry['pin'])
|
||
|
|
||
|
if now > on_time and now <= off_time and not self.__water_state[water_index]:
|
||
|
GPIO.output(pin, 0)
|
||
|
self.__water_state[water_index] = True
|
||
|
self.__log.info("Switch water on by time")
|
||
|
elif now > off_time and self.__water_state[water_index]:
|
||
|
GPIO.output(pin, 1)
|
||
|
self.__water_state[water_index] = False
|
||
|
self.__log.info("Switch water off by time")
|
||
|
water_index += 1
|
||
|
|
||
|
|
||
|
time.sleep(1)
|
||
|
self.__heat.stop()
|
||
|
|
||
|
|
||
|
def stop(self):
|
||
|
self.__run_condition = False
|
||
|
self.join()
|
||
|
|
||
|
def get_current_temperature(self, ident: int):
|
||
|
return f"{self.__temperature:.1f}"
|
||
|
|
||
|
def get_current_heat_state(self, ident: int):
|
||
|
return self.__heat.state()
|
||
|
|
||
|
def get_current_water_state(self, ident: int):
|
||
|
if ident > 0 and ident < len(self.__water_state):
|
||
|
return self.__water_state[ident -1]
|
||
|
return None
|
||
|
|
||
|
def set_heat_state(self, ident: str):
|
||
|
self.__heat.on()
|
||
|
self.__log.info("Switch heat on by button")
|
||
|
|
||
|
def clear_heat_state(self, ident: str):
|
||
|
self.__heat.off()
|
||
|
self.__log.info("Switch heat off by button")
|
||
|
|
||
|
def set_water_state(self, ident: str):
|
||
|
ident = int(ident)
|
||
|
if ident > 0 and ident < len(self.__water_state):
|
||
|
pin = int(self.__config['water'][ident - 1]['pin'])
|
||
|
self.__water_state[ident - 1] = True
|
||
|
self.__log.info("Switch water on by button")
|
||
|
GPIO.output(pin, 0)
|
||
|
|
||
|
def clear_water_state(self, ident: str):
|
||
|
ident = int(ident)
|
||
|
if ident > 0 and ident < len(self.__water_state):
|
||
|
pin = int(self.__config['water'][ident - 1]['pin'])
|
||
|
self.__water_state[ident - 1] = False
|
||
|
self.__log.info("Switch water off by button")
|
||
|
GPIO.output(pin, 1)
|