chickenhouse/gate_guard/gate.py

179 lines
6.3 KiB
Python
Raw Normal View History

'''
Created on Dec 19, 2016
@author: klaehn
'''
import time
import mqtt
import data_buffer
import light_sensor
import engine
import power_sensor
STATE_INIT = "init"
STATE_OPENED = "open"
STATE_CLOSED = "close"
STATE_OPENING = "opening"
STATE_CLOSING = "closing"
STATE_ERROR = "error"
LIGHT_READ_DELAY_S = 30
LIGHT_CONSECUTIVE_READS = 10
LIGHT_LX_THRESHOLD = {"open":10, "close":5}
MQTT_HOST = "proxy"
MQTT_TOPIC = "outdoor/chickenhouse/newgate"
LIGHT_SENSOR_I2C_BUS = 1
LIGHT_SENSOR_I2C_ADDRESS = 0x23
POWER_SENSOR_I2C_BUS = 1
POWER_SENSOR_I2C_ADDRESS = 0x40
POWER_CONSECUTIVE_READS = 1000
MAX_ENGINE_POWER = {"up":330, "down":280}
MAX_GATE_RUNTIME = {"open":300, "close":300}
MIN_GATE_RUNTIME = {"open":10, "close":10}
class Gate(object):
def __init__(self):
self.__state_handler = {STATE_INIT:self.__init_handler, \
STATE_OPENED:self.__opened_handler, \
STATE_CLOSED:self.__closed_handler, \
STATE_OPENING:self.__opening_handler, \
STATE_CLOSING:self.__closing_handler, \
STATE_ERROR:self.__error_handler}
self.__next_state = STATE_INIT
self.__last_state = STATE_ERROR
self.__light_sensor = light_sensor.LightSensor(LIGHT_SENSOR_I2C_BUS, \
LIGHT_SENSOR_I2C_ADDRESS)
self.__light_data = data_buffer.DataBuffer(LIGHT_CONSECUTIVE_READS)
self.__comserver = mqtt.Mqtt(MQTT_HOST)
self.__engine = engine.Engine(gpio_1=13, gpio_2=19)
self.__power_sensor = power_sensor.PowerSensor(POWER_SENSOR_I2C_BUS, \
POWER_SENSOR_I2C_ADDRESS)
self.__power_data = data_buffer.DataBuffer(POWER_CONSECUTIVE_READS)
self.__gate_move_timeout = 0
self.__light_read_timeout = 0
self.__error_count = 0
self.__runtime_open = 0
self.__runtime_close = 0
def poll(self):
current_time = time.time()
if current_time >= self.__light_read_timeout:
self.__light_read_timeout = current_time + LIGHT_READ_DELAY_S
self.__light_data.push(self.__light_sensor.read())
self.__state_handler[self.__next_state](self.__light_data.average())
def __update_state(self, new_state):
self.__last_state = self.__next_state
self.__next_state = new_state
def __is_transition(self):
if self.__last_state != self.__next_state:
return True
return False
def __init_handler(self, light_avg):
'''
In init we don't know anything neither about gate state nor about
light. So first we try to reach STATE_OPENED.
'''
#pylint: disable=unused-argument
self.__comserver.connect()
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" gate gard initiated")
self.__comserver.disconnect()
self.__update_state(STATE_OPENING)
def __opened_handler(self, light_avg):
next_state = self.__next_state
if self.__is_transition():
self.__engine.down()
time.sleep(5)
self.__engine.stop()
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" Opened " + \
str(self.__power_data.average()) + " mW")
self.__power_data.clear()
if (light_avg != None) and (light_avg <= LIGHT_LX_THRESHOLD["close"]):
next_state = STATE_CLOSING
self.__update_state(next_state)
def __closed_handler(self, light_avg):
next_state = self.__next_state
if self.__is_transition():
self.__engine.up()
time.sleep(5)
self.__engine.stop()
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" Closed " + \
str(self.__power_data.average()) + " mW")
self.__power_data.clear()
if (light_avg != None) and (light_avg > LIGHT_LX_THRESHOLD["open"]):
next_state = STATE_OPENING
self.__update_state(next_state)
def __opening_handler(self, light_avg):
next_state = self.__next_state
if self.__is_transition():
self.__runtime_open = time.time() + MIN_GATE_RUNTIME["open"]
self.__engine.up()
self.__gate_move_timeout = time.time() + MAX_GATE_RUNTIME["open"]
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" Opening " + str(light_avg) + " lx")
if time.time() > self.__gate_move_timeout:
next_state = STATE_ERROR
else:
self.__power_data.push(self.__power_sensor.power_mw())
current_avg = self.__power_data.average()
if current_avg != None:
if current_avg > MAX_ENGINE_POWER["up"]:
if time.time() > self.__runtime_open:
next_state = STATE_OPENED
self.__update_state(next_state)
def __closing_handler(self, light_avg):
next_state = self.__next_state
if self.__is_transition():
self.__runtime_close = time.time() + MIN_GATE_RUNTIME["close"]
self.__engine.down()
self.__gate_move_timeout = time.time() + MAX_GATE_RUNTIME["close"]
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" Closing " + str(light_avg) + " lx")
if time.time() > self.__gate_move_timeout:
next_state = STATE_ERROR
else:
self.__power_data.push(self.__power_sensor.power_mw())
current_avg = self.__power_data.average()
if current_avg != None:
if current_avg > MAX_ENGINE_POWER["down"]:
if time.time() > self.__runtime_close:
next_state = STATE_CLOSED
self.__update_state(next_state)
def __error_handler(self, light_avg):
#pylint: disable=unused-argument
if self.__is_transition():
self.__engine.stop()
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" Error handler!!!")
self.__update_state(STATE_INIT)