chickenhouse/gate_guard/gate.py
2022-02-21 17:34:38 +01:00

366 lines
14 KiB
Python

'''
Created on Dec 19, 2016
@author: klaehn
'''
import datetime
import logging
import pytz
import socket
import ssl
import time
import paho.mqtt.client as mqtt
import gate_guard.data_buffer
import gate_guard.light_sensor
import gate_guard.engine
import gate_guard.power_sensor
from astral import Astral
STATE_INIT_1 = "init_1"
STATE_INIT_2 = "init_2"
STATE_INIT_3 = "init_3"
STATE_INIT_4 = "init_4"
STATE_OPENED = "open"
STATE_CLOSED = "close"
STATE_OPENING_1 = "opening_1"
STATE_OPENING_2 = "opening_2"
STATE_CLOSING_1 = "closing_1"
STATE_CLOSING_2 = "closing_2"
LIGHT_READ_DELAY_S = 30
LIGHT_CONSECUTIVE_READS = 10
LIGHT_LX_THRESHOLD = 0
MQTT_HOST = "mqtt.blackfinn.de"
MQTT_PORT = 8883
MQTT_CERTS = "/etc/ssl/certs/DST_Root_CA_X3.pem"
MQTT_TOPIC = "outdoor/chickenhouse/gate"
LIGHT_SENSOR_I2C_BUS = 1
LIGHT_SENSOR_I2C_ADDRESS = 0x23
POWER_SENSOR_I2C_BUS = 1
POWER_SENSOR_I2C_ADDRESS = 0x40
POWER_CONSECUTIVE_READS = 10
SLOPE_COUNT = 10
SLOPE_CNT_MIN = 2
MAX_POWER_1 = 450.0
MAX_POWER_2 = 300.0
ENGINE_1_PIN_1 = 13
ENGINE_1_PIN_2 = 19
ENGINE_2_PIN_1 = 5
ENGINE_2_PIN_2 = 6
def check_to_open(light_avg):
'''Check if gate needs to be opened.'''
ret = False
current_date = datetime.datetime.now()
if (current_date.hour >= 8) and (light_avg > LIGHT_LX_THRESHOLD):
ret = True
return ret
def check_to_close(light_avg):
'''Check if gate needs to be closed.'''
ret = False
current_date = datetime.datetime.now()
if (current_date.hour >= 16) and (light_avg <= LIGHT_LX_THRESHOLD):
ret = True
return ret
class FakeLight(object):
'''Simulate light sensor time based in order to eliminate defect light sensor.'''
def __init__(self, bus, addr):
self.astral = Astral()
self.astral.solar_depression = 'civil'
self.city = self.astral['Berlin']
self.opening_hour = 8
def read(self):
fake_light = 0
now = datetime.datetime.now(pytz.timezone(self.city.timezone))
sun = self.city.sun(now, local=True)
sunset = sun['sunset']
close_time = now - datetime.timedelta(minutes=30)
if now.hour >= self.opening_hour and close_time < sunset:
fake_light = 1
return fake_light
class Gate(object):
'''Main class of the chickenhouse gates.'''
def __init__(self):
self.__state_handler = {STATE_INIT_1:self.__init1_handler, \
STATE_INIT_2:self.__init2_handler, \
STATE_INIT_3:self.__init3_handler, \
STATE_INIT_4:self.__init4_handler, \
STATE_OPENED:self.__opened_handler, \
STATE_CLOSED:self.__closed_handler, \
STATE_OPENING_1:self.__opening_1_handler, \
STATE_OPENING_2:self.__opening_2_handler, \
STATE_CLOSING_1:self.__closing_1_handler, \
STATE_CLOSING_2:self.__closing_2_handler}
self.__next_state = STATE_INIT_1
self.__last_state = STATE_OPENED
# self.__light_sensor = gate_guard.light_sensor.LightSensor(
# LIGHT_SENSOR_I2C_BUS, LIGHT_SENSOR_I2C_ADDRESS)
self.__light_sensor = FakeLight(LIGHT_SENSOR_I2C_BUS, LIGHT_SENSOR_I2C_ADDRESS)
self.__light_data = gate_guard.data_buffer.DataBuffer(
LIGHT_CONSECUTIVE_READS)
self.__engine_1 = gate_guard.engine.Engine(gpio_1=ENGINE_1_PIN_1, gpio_2=ENGINE_1_PIN_2)
self.__engine_2 = gate_guard.engine.Engine(gpio_1=ENGINE_2_PIN_1, gpio_2=ENGINE_2_PIN_2)
self.__power_sensor = gate_guard.power_sensor.PowerSensor(
POWER_SENSOR_I2C_BUS, POWER_SENSOR_I2C_ADDRESS)
self.__power_data = gate_guard.data_buffer.DataBuffer(POWER_CONSECUTIVE_READS)
self.__light_read_timeout = 0
self.__down_run_time_1 = 0
self.__down_run_time_2 = 0
self.__gate_run_time = 0
self.__client = mqtt.Client()
self.__client.tls_set(MQTT_CERTS)
def poll(self):
'''Poll function of the state machine.'''
current_time = time.time()
if current_time >= self.__light_read_timeout:
self.__light_read_timeout = current_time + LIGHT_READ_DELAY_S
light_read = self.__light_sensor.read()
self.__light_data.push(light_read)
# logging.info('light - abs: ' + str(light_read) + ', avg: ' + \
# str(self.__light_data.average()))
power_read = self.__power_sensor.power_mw()
self.__power_data.push(power_read)
self.__state_handler[self.__next_state](self.__light_data.average())
def __update_state(self, new_state):
self.__last_state = self.__next_state
self.__next_state = new_state
def __is_transition(self):
if self.__last_state != self.__next_state:
logging.info('STATE: ' + self.__last_state + ' -> ' + \
self.__next_state)
return True
return False
def __init1_handler(self, _):
next_state = self.__next_state
if self.__is_transition():
self.__engine_1.down()
# workaround for high power after starting engine
time.sleep(1)
# msg = str(time.time()) + " Initialization"
# try:
# self.__client.connect(MQTT_HOST, MQTT_PORT)
# self.__client.loop_start()
# self.__client.publish(MQTT_TOPIC, msg, qos=2, retain=True)
# self.__client.loop_stop()
# except (ValueError, TypeError, socket.error, ssl.CertificateError):
# logging.info('unable to publish to mqtt')
pwr = self.__power_sensor.power_mw()
msg = 'e1: {} mW'.format(pwr)
logging.debug(msg)
if pwr > MAX_POWER_1:
next_state = STATE_INIT_2
self.__update_state(next_state)
def __init2_handler(self, _):
next_state = self.__next_state
if self.__is_transition():
self.__engine_1.up()
self.__down_run_time_1 = time.time()
pwr = self.__power_sensor.power_mw()
msg = 'e1: {} mW'.format(pwr)
logging.debug(msg)
if pwr > MAX_POWER_1:
self.__down_run_time_1 = (time.time() - self.__down_run_time_1) / 2
msg = 'calculated down time for engine 1: {} s'.format(self.__down_run_time_1)
logging.info(msg)
if self.__down_run_time_1 > 30:
next_state = STATE_INIT_3
else:
logging.info("That's not very relaistic. Another try...")
next_state = STATE_INIT_1
self.__update_state(next_state)
def __init3_handler(self, _):
next_state = self.__next_state
if self.__is_transition():
self.__engine_1.down()
time.sleep(1)
self.__engine_1.stop()
self.__engine_2.down()
# workaround for high power after starting engine
time.sleep(1)
pwr = self.__power_sensor.power_mw()
msg = 'e2: {} mW'.format(pwr)
logging.debug(msg)
if pwr > MAX_POWER_2:
next_state = STATE_INIT_4
self.__update_state(next_state)
def __init4_handler(self, _):
next_state = self.__next_state
if self.__is_transition():
self.__engine_2.up()
self.__down_run_time_2 = time.time()
pwr = self.__power_sensor.power_mw()
msg = 'e2: {} mW'.format(pwr)
logging.debug(msg)
if pwr > MAX_POWER_2:
self.__down_run_time_2 = (time.time() - self.__down_run_time_2) / 2
msg = 'calculated down time for engine 2: {} s'.format(self.__down_run_time_2)
logging.info(msg)
if self.__down_run_time_2 > 30:
next_state = STATE_OPENED
else:
logging.info("That's not very relaistic. Another try...")
next_state = STATE_INIT_3
self.__update_state(next_state)
def __opened_handler(self, light_avg):
next_state = self.__next_state
if self.__is_transition():
self.__engine_2.down()
time.sleep(1)
self.__engine_2.stop()
msg = str(time.time()) + " Opened"
# try:
# self.__client.connect(MQTT_HOST, MQTT_PORT)
# self.__client.loop_start()
# self.__client.publish(MQTT_TOPIC, msg, qos=2, retain=True)
# self.__client.loop_stop()
# except (ValueError, TypeError, socket.error, ssl.CertificateError) as err:
# logging.info('unable to publish to mqtt ({})'.format(err))
if check_to_close(light_avg) is True:
next_state = STATE_CLOSING_1
self.__update_state(next_state)
def __closed_handler(self, light_avg):
next_state = self.__next_state
if self.__is_transition():
self.__engine_2.stop()
# msg = str(time.time()) + " Closed"
# try:
# self.__client.connect(MQTT_HOST, MQTT_PORT)
# self.__client.loop_start()
# self.__client.publish(MQTT_TOPIC, msg, qos=2, retain=True)
# self.__client.loop_stop()
# except (ValueError, TypeError, socket.error, ssl.CertificateError) as err:
# logging.info('unable to publish to mqtt ({})'.format(err))
if check_to_open(light_avg) is True:
next_state = STATE_OPENING_1
self.__update_state(next_state)
def __opening_1_handler(self, light_avg):
next_state = self.__next_state
if self.__is_transition():
self.__engine_1.up()
# msg = str(time.time()) + " Opening" + str(light_avg) + " lx"
# try:
# self.__client.connect(MQTT_HOST, MQTT_PORT)
# self.__client.loop_start()
# self.__client.publish(MQTT_TOPIC, msg, qos=2, retain=True)
# self.__client.loop_stop()
# except (ValueError, TypeError, socket.error, ssl.CertificateError) as err:
# logging.info('unable to publish to mqtt ({})'.format(err))
self.__gate_run_time = time.time()
# workaround for high power after starting engine
time.sleep(1)
pwr = self.__power_sensor.power_mw()
msg = 'e1 - abs: {} mW\tavg: {} mW'.format(pwr, self.__power_data.average())
logging.debug(msg)
if pwr > MAX_POWER_1:
deviation = abs(time.time() - self.__gate_run_time - self.__down_run_time_1)
msg = 'runtime deviation of engine 1: {} s'.format(deviation)
logging.info(msg)
next_state = STATE_OPENING_2
self.__update_state(next_state)
def __opening_2_handler(self, _):
next_state = self.__next_state
if self.__is_transition():
self.__engine_1.down()
time.sleep(1)
self.__engine_1.stop()
self.__engine_2.up()
self.__gate_run_time = time.time()
# workaround for high power after starting engine
time.sleep(1)
pwr = self.__power_sensor.power_mw()
msg = 'e2 - abs: {} mW\tavg: {} mW'.format(pwr, self.__power_data.average())
logging.debug(msg)
if pwr > MAX_POWER_2:
deviation = abs(time.time() - self.__gate_run_time - self.__down_run_time_2)
msg = 'runtime deviation: {}'.format(deviation)
logging.info(msg)
next_state = STATE_OPENED
self.__update_state(next_state)
def __closing_1_handler(self, light_avg):
next_state = self.__next_state
if self.__is_transition():
self.__engine_1.down()
# msg = str(time.time()) + " Closing " + str(light_avg) + " lx"
# try:
# self.__client.connect(MQTT_HOST, MQTT_PORT)
# self.__client.loop_start()
# self.__client.publish(MQTT_TOPIC, msg, qos=2, retain=True)
# self.__client.loop_stop()
# except (ValueError, TypeError, socket.error, ssl.CertificateError) as err:
# logging.info('unable to publish to mqtt ({})'.format(err))
self.__gate_run_time = time.time()
# workaround for high power after starting engine
time.sleep(1)
pwr = self.__power_sensor.power_mw()
msg = 'e1: {} mW'.format(pwr)
logging.debug(msg)
opening_time = time.time() - self.__gate_run_time
if opening_time > self.__down_run_time_1:
msg = "Run time of gate 1 is bigger than calculated ({} vs. {}).".format(
opening_time, self.__down_run_time_1)
logging.info(msg)
next_state = STATE_CLOSING_2
if pwr > MAX_POWER_1:
self.__engine_1.stop()
next_state = STATE_INIT_1
self.__update_state(next_state)
def __closing_2_handler(self, _):
next_state = self.__next_state
if self.__is_transition():
self.__engine_1.stop()
self.__engine_2.down()
self.__gate_run_time = time.time()
# workaround for high power after starting engine
time.sleep(1)
pwr = self.__power_sensor.power_mw()
msg = 'e2: {} mW'.format(pwr)
logging.debug(msg)
opening_time = time.time() - self.__gate_run_time
if opening_time > self.__down_run_time_2:
msg = "Run time of gate 1 is bigger than calculated ({} vs. {}).".format(
opening_time, self.__down_run_time_2)
logging.info(msg)
next_state = STATE_CLOSED
if pwr > MAX_POWER_2:
self.__engine_2.stop()
next_state = STATE_INIT_1
self.__update_state(next_state)