gate: post open/close msg via telegram
Signed-off-by: Thomas Klaehn <thomas.klaehn@u-blox.com>
This commit is contained in:
parent
69d37ce5d9
commit
16931c50ba
17
.project
17
.project
@ -1,17 +0,0 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8"?>
|
|
||||||
<projectDescription>
|
|
||||||
<name>chickenhouse</name>
|
|
||||||
<comment></comment>
|
|
||||||
<projects>
|
|
||||||
</projects>
|
|
||||||
<buildSpec>
|
|
||||||
<buildCommand>
|
|
||||||
<name>org.python.pydev.PyDevBuilder</name>
|
|
||||||
<arguments>
|
|
||||||
</arguments>
|
|
||||||
</buildCommand>
|
|
||||||
</buildSpec>
|
|
||||||
<natures>
|
|
||||||
<nature>org.python.pydev.pythonNature</nature>
|
|
||||||
</natures>
|
|
||||||
</projectDescription>
|
|
@ -1,10 +0,0 @@
|
|||||||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
|
||||||
<?eclipse-pydev version="1.0"?><pydev_project>
|
|
||||||
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
|
|
||||||
<path>/${PROJECT_DIR_NAME}/scripts</path>
|
|
||||||
<path>/${PROJECT_DIR_NAME}/tests</path>
|
|
||||||
<path>/${PROJECT_DIR_NAME}/gate_guard</path>
|
|
||||||
</pydev_pathproperty>
|
|
||||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
|
|
||||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
|
|
||||||
</pydev_project>
|
|
@ -9,7 +9,7 @@ import logging
|
|||||||
import gate_guard.gate
|
import gate_guard.gate
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
logging.basicConfig(filename='/var/log/gate_guard.log', level=logging.DEBUG)
|
logging.basicConfig(filename='/var/log/gate_guard.log', level=logging.DEBUG, format='%(asctime)s %(message)s')
|
||||||
gate_state = gate_guard.gate.Gate()
|
gate_state = gate_guard.gate.Gate()
|
||||||
try:
|
try:
|
||||||
while True:
|
while True:
|
||||||
|
@ -17,13 +17,13 @@ class Engine(object):
|
|||||||
self.stop()
|
self.stop()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
self.gpio_1.set(0)
|
self.gpio_1.write(0)
|
||||||
self.gpio_2.set(0)
|
self.gpio_2.write(0)
|
||||||
|
|
||||||
def up(self):
|
def up(self):
|
||||||
self.gpio_1.set(1)
|
self.gpio_1.write(1)
|
||||||
self.gpio_2.set(0)
|
self.gpio_2.write(0)
|
||||||
|
|
||||||
def down(self):
|
def down(self):
|
||||||
self.gpio_1.set(0)
|
self.gpio_1.write(0)
|
||||||
self.gpio_2.set(1)
|
self.gpio_2.write(1)
|
||||||
|
@ -3,28 +3,33 @@ Created on Dec 19, 2016
|
|||||||
|
|
||||||
@author: klaehn
|
@author: klaehn
|
||||||
'''
|
'''
|
||||||
|
import datetime
|
||||||
|
import logging
|
||||||
|
import socket
|
||||||
|
import ssl
|
||||||
import time
|
import time
|
||||||
import mqtt
|
import paho.mqtt.client as mqtt
|
||||||
|
import scipy.stats
|
||||||
import gate_guard.data_buffer
|
import gate_guard.data_buffer
|
||||||
import gate_guard.light_sensor
|
import gate_guard.light_sensor
|
||||||
import gate_guard.engine
|
import gate_guard.engine
|
||||||
import gate_guard.power_sensor
|
import gate_guard.power_sensor
|
||||||
import scipy
|
|
||||||
import scipy.stats
|
|
||||||
import logging
|
|
||||||
|
|
||||||
STATE_INIT = "init"
|
STATE_INIT_1 = "init_1"
|
||||||
|
STATE_INIT_2 = "init_2"
|
||||||
|
|
||||||
STATE_OPENED = "open"
|
STATE_OPENED = "open"
|
||||||
STATE_CLOSED = "close"
|
STATE_CLOSED = "close"
|
||||||
STATE_OPENING = "opening"
|
STATE_OPENING = "opening"
|
||||||
STATE_CLOSING = "closing"
|
STATE_CLOSING = "closing"
|
||||||
STATE_ERROR = "error"
|
|
||||||
|
|
||||||
LIGHT_READ_DELAY_S = 30
|
LIGHT_READ_DELAY_S = 30
|
||||||
LIGHT_CONSECUTIVE_READS = 10
|
LIGHT_CONSECUTIVE_READS = 10
|
||||||
LIGHT_LX_THRESHOLD = {"open":10, "close":5}
|
LIGHT_LX_THRESHOLD = {"open":1, "close":0}
|
||||||
|
|
||||||
MQTT_HOST = "proxy"
|
MQTT_HOST = "mqtt.blackfinn.de"
|
||||||
|
MQTT_PORT = 8883
|
||||||
|
MQTT_CERTS = "/etc/ssl/certs/DST_Root_CA_X3.pem"
|
||||||
MQTT_TOPIC = "outdoor/chickenhouse/gate"
|
MQTT_TOPIC = "outdoor/chickenhouse/gate"
|
||||||
|
|
||||||
LIGHT_SENSOR_I2C_BUS = 1
|
LIGHT_SENSOR_I2C_BUS = 1
|
||||||
@ -33,50 +38,52 @@ LIGHT_SENSOR_I2C_ADDRESS = 0x23
|
|||||||
POWER_SENSOR_I2C_BUS = 1
|
POWER_SENSOR_I2C_BUS = 1
|
||||||
POWER_SENSOR_I2C_ADDRESS = 0x40
|
POWER_SENSOR_I2C_ADDRESS = 0x40
|
||||||
|
|
||||||
|
POWER_CONSECUTIVE_READS = 10
|
||||||
|
|
||||||
SLOPE_COUNT = 10
|
SLOPE_COUNT = 10
|
||||||
SLOPE_CNT_MIN = 2
|
SLOPE_CNT_MIN = 2
|
||||||
|
MAX_POWER = 400.0
|
||||||
MAX_GATE_RUNTIME = {"open":70, "close":60}
|
|
||||||
MAX_POWER_SLOPE = {"up":40, "down":15}
|
|
||||||
|
|
||||||
class Gate(object):
|
class Gate(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.__state_handler = {STATE_INIT:self.__init_handler, \
|
self.__state_handler = {STATE_INIT_1:self.__init1_handler, \
|
||||||
|
STATE_INIT_2:self.__init2_handler, \
|
||||||
STATE_OPENED:self.__opened_handler, \
|
STATE_OPENED:self.__opened_handler, \
|
||||||
STATE_CLOSED:self.__closed_handler, \
|
STATE_CLOSED:self.__closed_handler, \
|
||||||
STATE_OPENING:self.__opening_handler, \
|
STATE_OPENING:self.__opening_handler, \
|
||||||
STATE_CLOSING:self.__closing_handler, \
|
STATE_CLOSING:self.__closing_handler}
|
||||||
STATE_ERROR:self.__error_handler}
|
self.__next_state = STATE_INIT_1
|
||||||
self.__next_state = STATE_INIT
|
self.__last_state = STATE_OPENED
|
||||||
self.__last_state = STATE_ERROR
|
|
||||||
|
|
||||||
self.__light_sensor = gate_guard.light_sensor.LightSensor(
|
self.__light_sensor = gate_guard.light_sensor.LightSensor(
|
||||||
LIGHT_SENSOR_I2C_BUS, LIGHT_SENSOR_I2C_ADDRESS)
|
LIGHT_SENSOR_I2C_BUS, LIGHT_SENSOR_I2C_ADDRESS)
|
||||||
self.__light_data = gate_guard.data_buffer.DataBuffer(
|
self.__light_data = gate_guard.data_buffer.DataBuffer(
|
||||||
LIGHT_CONSECUTIVE_READS)
|
LIGHT_CONSECUTIVE_READS)
|
||||||
self.__comserver = mqtt.Mqtt(MQTT_HOST)
|
|
||||||
self.__engine = gate_guard.engine.Engine(gpio_1=13, gpio_2=19)
|
self.__engine = gate_guard.engine.Engine(gpio_1=13, gpio_2=19)
|
||||||
self.__power_sensor = gate_guard.power_sensor.PowerSensor(
|
self.__power_sensor = gate_guard.power_sensor.PowerSensor(
|
||||||
POWER_SENSOR_I2C_BUS, POWER_SENSOR_I2C_ADDRESS)
|
POWER_SENSOR_I2C_BUS, POWER_SENSOR_I2C_ADDRESS)
|
||||||
self.__gate_move_timeout = 0
|
self.__power_data = gate_guard.data_buffer.DataBuffer(POWER_CONSECUTIVE_READS)
|
||||||
self.__light_read_timeout = 0
|
self.__light_read_timeout = 0
|
||||||
self.__error_count = 0
|
self.__down_run_time = 0
|
||||||
|
self.__gate_run_time = 0
|
||||||
self.slope_power = gate_guard.data_buffer.DataBuffer(SLOPE_COUNT)
|
self.__client = mqtt.Client()
|
||||||
self.slope_time = gate_guard.data_buffer.DataBuffer(SLOPE_COUNT)
|
self.__client.tls_set(MQTT_CERTS)
|
||||||
|
|
||||||
def poll(self):
|
def poll(self):
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
if current_time >= self.__light_read_timeout:
|
if current_time >= self.__light_read_timeout:
|
||||||
self.__light_read_timeout = current_time + LIGHT_READ_DELAY_S
|
self.__light_read_timeout = current_time + LIGHT_READ_DELAY_S
|
||||||
self.__light_data.push(self.__light_sensor.read())
|
light_read = self.__light_sensor.read()
|
||||||
|
self.__light_data.push(light_read)
|
||||||
|
logging.info('light - abs: ' + str(light_read) + ', avg: ' + \
|
||||||
|
str(self.__light_data.average()))
|
||||||
|
power_read = self.__power_sensor.power_mw()
|
||||||
|
self.__power_data.push(power_read)
|
||||||
self.__state_handler[self.__next_state](self.__light_data.average())
|
self.__state_handler[self.__next_state](self.__light_data.average())
|
||||||
|
|
||||||
def __update_state(self, new_state):
|
def __update_state(self, new_state):
|
||||||
self.__last_state = self.__next_state
|
self.__last_state = self.__next_state
|
||||||
self.__next_state = new_state
|
self.__next_state = new_state
|
||||||
|
|
||||||
|
|
||||||
def __is_transition(self):
|
def __is_transition(self):
|
||||||
if self.__last_state != self.__next_state:
|
if self.__last_state != self.__next_state:
|
||||||
logging.info('STATE: ' + self.__last_state + ' -> ' + \
|
logging.info('STATE: ' + self.__last_state + ' -> ' + \
|
||||||
@ -84,80 +91,123 @@ class Gate(object):
|
|||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def __init1_handler(self, _):
|
||||||
|
next_state = self.__next_state
|
||||||
|
if self.__is_transition():
|
||||||
|
self.__engine.down()
|
||||||
|
# workaround for high power after starting engine
|
||||||
|
time.sleep(1)
|
||||||
|
msg = str(time.time()) + " Initialization"
|
||||||
|
try:
|
||||||
|
self.__client.connect(MQTT_HOST, MQTT_PORT)
|
||||||
|
self.__client.loop_start()
|
||||||
|
self.__client.publish(MQTT_TOPIC, msg, qos=2, retain=True)
|
||||||
|
self.__client.loop_stop()
|
||||||
|
except (ValueError, TypeError, socket.error, ssl.CertificateError):
|
||||||
|
logging.info('unable to publish to mqtt')
|
||||||
|
pwr = self.__power_sensor.power_mw()
|
||||||
|
logging.info('pwr: ' + str(pwr) + ' mW')
|
||||||
|
if pwr > MAX_POWER:
|
||||||
|
next_state = STATE_INIT_2
|
||||||
|
self.__update_state(next_state)
|
||||||
|
|
||||||
def __init_handler(self, light_avg):
|
def __init2_handler(self, _):
|
||||||
'''
|
next_state = self.__next_state
|
||||||
In init we don't know anything neither about gate state nor about
|
if self.__is_transition():
|
||||||
light. So first we try to reach STATE_OPENED.
|
self.__engine.up()
|
||||||
'''
|
self.__down_run_time = time.time()
|
||||||
#pylint: disable=unused-argument
|
pwr = self.__power_sensor.power_mw()
|
||||||
self.__comserver.connect()
|
logging.info('pwr: ' + str(pwr) + ' mW')
|
||||||
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
|
|
||||||
" gate gard initiated")
|
|
||||||
self.__comserver.disconnect()
|
|
||||||
self.__update_state(STATE_OPENING)
|
|
||||||
|
|
||||||
|
if pwr > MAX_POWER:
|
||||||
|
self.__down_run_time = (time.time() - self.__down_run_time) / 2
|
||||||
|
logging.info('calculated down time: ' + str(self.__down_run_time) + ' s')
|
||||||
|
if self.__down_run_time > 30:
|
||||||
|
next_state = STATE_CLOSING
|
||||||
|
else:
|
||||||
|
logging.info("That's not very relaistic. Another try...")
|
||||||
|
next_state = STATE_INIT_1
|
||||||
|
self.__update_state(next_state)
|
||||||
|
|
||||||
|
def __check_to_open(self, light_avg):
|
||||||
|
ret = False
|
||||||
|
if (light_avg != None) and (light_avg > LIGHT_LX_THRESHOLD["open"]):
|
||||||
|
ret = True
|
||||||
|
current_date = datetime.datetime.now()
|
||||||
|
if (current_date.hour <= 12) and (current_date.hour >= 8):
|
||||||
|
ret = True
|
||||||
|
return ret
|
||||||
|
|
||||||
|
def __check_to_close(self, light_avg):
|
||||||
|
ret = False
|
||||||
|
if (light_avg != None) and (light_avg <= LIGHT_LX_THRESHOLD["close"]):
|
||||||
|
current_date = datetime.datetime.now()
|
||||||
|
if (current_date.hour >= 16) and (current_date.minute >= 0):
|
||||||
|
ret = True
|
||||||
|
return ret
|
||||||
|
|
||||||
def __opened_handler(self, light_avg):
|
def __opened_handler(self, light_avg):
|
||||||
next_state = self.__next_state
|
next_state = self.__next_state
|
||||||
if self.__is_transition():
|
if self.__is_transition():
|
||||||
self.__engine.down()
|
self.__engine.down()
|
||||||
time.sleep(0.5)
|
time.sleep(1)
|
||||||
self.__engine.stop()
|
self.__engine.stop()
|
||||||
slope, _, _, _, _ = scipy.stats.linregress(self.slope_time.get(),
|
msg = str(time.time()) + " Opened"
|
||||||
self.slope_power.get())
|
try:
|
||||||
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
|
self.__client.connect(MQTT_HOST, MQTT_PORT)
|
||||||
" Opened " + \
|
self.__client.loop_start()
|
||||||
str(slope))
|
self.__client.publish(MQTT_TOPIC, msg, qos=2, retain=True)
|
||||||
self.slope_power.clear()
|
self.__client.loop_stop()
|
||||||
self.slope_time.clear()
|
except (ValueError, TypeError, socket.error, ssl.CertificateError):
|
||||||
|
logging.info('unable to publish to mqtt')
|
||||||
if (light_avg != None) and (light_avg <= LIGHT_LX_THRESHOLD["close"]):
|
if self.__check_to_close(light_avg) is True:
|
||||||
next_state = STATE_CLOSING
|
next_state = STATE_CLOSING
|
||||||
|
|
||||||
self.__update_state(next_state)
|
self.__update_state(next_state)
|
||||||
|
|
||||||
|
|
||||||
def __closed_handler(self, light_avg):
|
def __closed_handler(self, light_avg):
|
||||||
next_state = self.__next_state
|
next_state = self.__next_state
|
||||||
if self.__is_transition():
|
if self.__is_transition():
|
||||||
self.__engine.up()
|
self.__engine.up()
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
self.__engine.stop()
|
self.__engine.stop()
|
||||||
slope, _, _, _, _ = scipy.stats.linregress(self.slope_time.get(),
|
msg = str(time.time()) + " Closed"
|
||||||
self.slope_power.get())
|
try:
|
||||||
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
|
self.__client.connect(MQTT_HOST, MQTT_PORT)
|
||||||
" Closed " + \
|
self.__client.loop_start()
|
||||||
str(slope))
|
self.__client.publish(MQTT_TOPIC, msg, qos=2, retain=True)
|
||||||
self.slope_power.clear()
|
self.__client.loop_stop()
|
||||||
self.slope_time.clear()
|
except (ValueError, TypeError, socket.error, ssl.CertificateError):
|
||||||
|
logging.info('unable to publish to mqtt')
|
||||||
if (light_avg != None) and (light_avg > LIGHT_LX_THRESHOLD["open"]):
|
if self.__check_to_open(light_avg) is True:
|
||||||
next_state = STATE_OPENING
|
next_state = STATE_OPENING
|
||||||
|
|
||||||
self.__update_state(next_state)
|
self.__update_state(next_state)
|
||||||
|
|
||||||
|
|
||||||
def __opening_handler(self, light_avg):
|
def __opening_handler(self, light_avg):
|
||||||
next_state = self.__next_state
|
next_state = self.__next_state
|
||||||
if self.__is_transition():
|
if self.__is_transition():
|
||||||
self.__engine.up()
|
self.__engine.up()
|
||||||
self.__gate_move_timeout = time.time() + MAX_GATE_RUNTIME["open"]
|
msg = str(time.time()) + " Opening " + str(light_avg) + " lx"
|
||||||
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
|
try:
|
||||||
" Opening " + str(light_avg) + " lx")
|
self.__client.connect(MQTT_HOST, MQTT_PORT)
|
||||||
tm = time.time()
|
self.__client.loop_start()
|
||||||
if tm > self.__gate_move_timeout:
|
self.__client.publish(MQTT_TOPIC, msg, qos=2, retain=True)
|
||||||
next_state = STATE_ERROR
|
self.__client.loop_stop()
|
||||||
else:
|
except (ValueError, TypeError, socket.error, ssl.CertificateError):
|
||||||
pwr = self.__power_sensor.power_mw()
|
logging.info('unable to publish to mqtt')
|
||||||
self.slope_power.push(pwr)
|
self.__gate_run_time = time.time()
|
||||||
self.slope_time.push(tm)
|
|
||||||
slope = 0
|
# workaround for high power after starting engine
|
||||||
if self.slope_power.length() >= SLOPE_CNT_MIN:
|
time.sleep(1)
|
||||||
slope, _, _, _, _ = scipy.stats.linregress(
|
|
||||||
self.slope_time.get(), self.slope_power.get())
|
pwr = self.__power_sensor.power_mw()
|
||||||
logging.debug('up: ' + str(tm) + ' ' + str(pwr) + ' ' + str(slope))
|
logging.info('pwr - abs: ' + str(pwr) + ' mW\tavg: ' + str(self.__power_data.average()) + ' mW')
|
||||||
if slope > MAX_POWER_SLOPE["up"]:
|
if pwr > MAX_POWER:
|
||||||
|
deviation = abs(time.time() - self.__gate_run_time - self.__down_run_time)
|
||||||
|
logging.info('runtime deviation: ' + str(deviation))
|
||||||
|
if deviation > (self.__down_run_time / 10):
|
||||||
|
logging.info('Deviation too big. Re-initializing...')
|
||||||
|
next_state = STATE_INIT_1
|
||||||
|
else:
|
||||||
next_state = STATE_OPENED
|
next_state = STATE_OPENED
|
||||||
self.__update_state(next_state)
|
self.__update_state(next_state)
|
||||||
|
|
||||||
@ -165,31 +215,27 @@ class Gate(object):
|
|||||||
next_state = self.__next_state
|
next_state = self.__next_state
|
||||||
if self.__is_transition():
|
if self.__is_transition():
|
||||||
self.__engine.down()
|
self.__engine.down()
|
||||||
self.__gate_move_timeout = time.time() + MAX_GATE_RUNTIME["close"]
|
msg = str(time.time()) + " Closing " + str(light_avg) + " lx"
|
||||||
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
|
try:
|
||||||
" Closing " + str(light_avg) + " lx")
|
self.__client.connect(MQTT_HOST, MQTT_PORT)
|
||||||
|
self.__client.loop_start()
|
||||||
|
self.__client.publish(MQTT_TOPIC, msg, qos=2, retain=True)
|
||||||
|
self.__client.loop_stop()
|
||||||
|
except (ValueError, TypeError, socket.error, ssl.CertificateError):
|
||||||
|
logging.info('unable to publish to mqtt')
|
||||||
|
self.__gate_run_time = time.time()
|
||||||
|
|
||||||
tm = time.time()
|
# workaround for high power after starting engine
|
||||||
if tm > self.__gate_move_timeout:
|
time.sleep(1)
|
||||||
next_state = STATE_ERROR
|
|
||||||
else:
|
pwr = self.__power_sensor.power_mw()
|
||||||
pwr = self.__power_sensor.power_mw()
|
logging.info('pwr: ' + str(pwr) + ' mW')
|
||||||
self.slope_power.push(pwr)
|
opening_time = time.time() - self.__gate_run_time
|
||||||
self.slope_time.push(tm)
|
if opening_time > self.__down_run_time:
|
||||||
slope = 0
|
logging.info("actual running time bigger than calculated (" + str(opening_time) + " vs. " + str(self.__down_run_time) + ").")
|
||||||
if self.slope_power.length() >= SLOPE_CNT_MIN:
|
next_state = STATE_CLOSED
|
||||||
slope, _, _, _, _ = scipy.stats.linregress(
|
if pwr > MAX_POWER:
|
||||||
self.slope_time.get(), self.slope_power.get())
|
self.__engine.stop()
|
||||||
logging.debug('dw: ' + str(tm) + ' ' + str(pwr) + ' ' + str(slope))
|
next_state = STATE_INIT_1
|
||||||
if slope > MAX_POWER_SLOPE["down"]:
|
|
||||||
next_state = STATE_CLOSED
|
|
||||||
self.__update_state(next_state)
|
self.__update_state(next_state)
|
||||||
|
|
||||||
|
|
||||||
def __error_handler(self, light_avg):
|
|
||||||
#pylint: disable=unused-argument
|
|
||||||
if self.__is_transition():
|
|
||||||
self.__engine.stop()
|
|
||||||
self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
|
|
||||||
" Error handler!!!")
|
|
||||||
self.__update_state(STATE_INIT)
|
|
||||||
|
@ -10,7 +10,7 @@ import sys
|
|||||||
DAEMON_START_SCRIPT_SRC = 'gate_guard.service'
|
DAEMON_START_SCRIPT_SRC = 'gate_guard.service'
|
||||||
DAEMON_START_SCRIPT_DST = '/lib/systemd/system/' + DAEMON_START_SCRIPT_SRC
|
DAEMON_START_SCRIPT_DST = '/lib/systemd/system/' + DAEMON_START_SCRIPT_SRC
|
||||||
|
|
||||||
def main(argv):
|
def main(_):
|
||||||
project_version = ''
|
project_version = ''
|
||||||
project_name = ''
|
project_name = ''
|
||||||
project_namespace = ''
|
project_namespace = ''
|
||||||
@ -62,4 +62,3 @@ def main(argv):
|
|||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
sys.exit(main(sys.argv[1:]))
|
sys.exit(main(sys.argv[1:]))
|
||||||
|
|
||||||
|
@ -10,7 +10,6 @@ import gate_guard.data_buffer
|
|||||||
|
|
||||||
class Test(unittest.TestCase):
|
class Test(unittest.TestCase):
|
||||||
|
|
||||||
|
|
||||||
def test_zero_space(self):
|
def test_zero_space(self):
|
||||||
dut = gate_guard.data_buffer.DataBuffer(0)
|
dut = gate_guard.data_buffer.DataBuffer(0)
|
||||||
result = dut.push(10)
|
result = dut.push(10)
|
||||||
@ -26,5 +25,4 @@ class Test(unittest.TestCase):
|
|||||||
str(buffer_size) + " res: " + str(result))
|
str(buffer_size) + " res: " + str(result))
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
#import sys;sys.argv = ['', 'Test.testName']
|
|
||||||
unittest.main()
|
unittest.main()
|
Loading…
Reference in New Issue
Block a user