220 lines
6.7 KiB
220 lines
6.7 KiB
import os
import site
import json
import datetime
import logging
from threading import Thread
from time import sleep
from flask import Flask
from flask import render_template
from flask import make_response
from flask import request
from w1thermsensor import W1ThermSensor
import RPi.GPIO as GPIO
sensor = W1ThermSensor()
water_pin = 22 #17/27/22
heat_pin = 26
heat_state = False
PACKAGE_PATH = site.getsitepackages()[0]
CONFIG_FILE = os.path.join(PACKAGE_PATH, "greenhouse/config/greenhouse.json")
GPIO.setup(water_pin, GPIO.OUT)
log_level = logging.INFO
LOG_FILE = "/var/log/greenhouse.log"
LOG_FORMAT = "%(asctime)s %(levelname)s %(message)s"
# logging.basicConfig(format=LOG_FORMAT, level=log_level, filename=LOG_FILE)
logging.basicConfig(format=LOG_FORMAT, level=log_level)
log = logging.getLogger('greenhouse')
class Heat(Thread):
def __init__(self, pin):
super(Heat, self).__init__()
self.__pin = pin
self.__state = False
GPIO.setup(pin, GPIO.OUT)
if GPIO.input(pin):
self.__state = True
self.__run_condition = True
self.__next_update = datetime.datetime.now()
def on(self):
self.__state = True
GPIO.output(self.__pin, 0)
def off(self):
self.__state = False
GPIO.output(self.__pin, 1)
def run(self):
self.__next_update = datetime.datetime.now()
while self.__run_condition:
now = datetime.datetime.now()
if now >= self.__next_update:
if self.__state:
# Do a power cycle to prevent auto-poweroff
GPIO.output(self.__pin, 1)
GPIO.output(self.__pin, 0)
self.__next_update = now + datetime.timedelta(minutes=5)
def stop(self):
self.__run_condition = False
def state(self):
return self.__state
heat = Heat(heat_pin)
class GreenControl(Thread):
def __init__(self, cfg_file:str):
super(GreenControl, self).__init__()
self.cfg_file = cfg_file
self.config = {}
self.__run_condition = True
self.__water_state = False
self.__trigger_read_config = True
def reload_config(self):
self.__trigger_read_config = True
def run(self):
global sensor
global heat
next_update = datetime.datetime.now()
while self.__run_condition:
if self.__trigger_read_config:
self.__trigger_read_config = False
with open(self.cfg_file, "r") as f:
self.config = json.load(f)
now = datetime.datetime.now()
if now >= next_update:
if self.config['heat']['state']:
temperature = sensor.get_temperature()
if float(temperature) < float(self.config['heat']['on_temperature']) and not heat.state():
log.info("Switch heat on by temperature level: %.1f °C", float(temperature))
elif float(temperature) > float(self.config['heat']['off_temperature']) and heat.state():
log.info("Switch heat off by temperature level: %.1f °C", float(temperature))
elif heat.state():
# Do a power cycle to prevent auto-poweroff
next_update = now + datetime.timedelta(minutes=5)
if self.config['water']['state']:
index = 0
if int(now.hour) >= 12:
index = 1
on_time_pattern = self.config['water']['times'][index]['on_time']
on_time_pattern = on_time_pattern.split(':')
on_time = now.replace(hour=int(on_time_pattern[0]), minute=int(on_time_pattern[1]), second=0, microsecond=0)
off_time_pattern = self.config['water']['times'][index]['off_time']
off_time_pattern = off_time_pattern.split(':')
off_time = now.replace(hour=int(off_time_pattern[0]), minute=int(off_time_pattern[1]), second=0, microsecond=0)
if now > on_time and now <= off_time and not self.__water_state:
GPIO.output(water_pin, 0)
self.__water_state = True
log.info("Switch water on by time")
elif now > off_time and self.__water_state:
GPIO.output(water_pin, 1)
self.__water_state = False
log.info("Switch water off by time")
green_ctrl = GreenControl(CONFIG_FILE)
app = Flask(__name__)
def index():
return render_template('index.html')
def config():
return render_template('config.html')
@app.route('/cfg', methods=['GET'])
def get_config():
res = {}
with open(CONFIG_FILE, "r") as f:
res = json.load(f)
return res
@app.route('/cfg', methods=['PATCH'])
def patch_config():
global green_ctrl
res = make_response("", 500)
cfg = json.loads(request.data)
with open(CONFIG_FILE, "w") as f:
json.dump(cfg, f)
res = make_response("", 204)
return res
@app.route('/sample', methods=['GET'])
def get_sample():
global heat
global sensor
temperature = f"{float(sensor.get_temperature()):.1f}"
except Exception:
temperature = None
water_state = False
if not GPIO.input(water_pin):
water_state = True
res = {}
res["id"] = str(1)
if temperature:
res["temperature"] = temperature
res["water"] = water_state
res["heat"] = heat.state()
return res
@app.route('/sample', methods=['PATCH'])
def patch_sample():
global heat
record = json.loads(request.data)
if "water" in record:
if record["water"]:
GPIO.output(water_pin, 0)
log.info("Switch water on by button: {}".format(datetime.datetime.now()))
GPIO.output(water_pin, 1)
log.info("Switch water off by button: {}".format(datetime.datetime.now()))
if "heat" in record:
if record["heat"]:
log.info("Switch heat on by button: {}".format(datetime.datetime.now()))
log.info("Switch heat off by button: {}".format(datetime.datetime.now()))
res = make_response("", 204)
return res
if __name__ == '__main__':
app.run(debug=True, host='', port=8000)