ci: enable continuous integration

Signed-off-by: Thomas Klaehn <thomas.klaehn@u-blox.com>
This commit is contained in:
Thomas Klaehn 2017-03-30 14:09:49 +02:00
parent ed80c54b7d
commit e541023bba
9 changed files with 38 additions and 62 deletions

View File

@ -10,7 +10,7 @@ tests:
stage: test stage: test
script: script:
- "python scripts/pylint_wrapper.py -s source -s tests" - "python scripts/pylint_wrapper.py -s source -s tests"
- "nosetests --with-coverage --cover-package=data_buffer --cover-package=engine --cover-package=gate --cover-package=light_sensor --cover-package=power_sensor --cover-xml" - "nosetests --with-coverage --cover-package=source --cover-xml"
- "nosetests --with-xunit tests/unittests/" - "nosetests --with-xunit tests/unittests/"
- "sonar-runner" - "sonar-runner"

View File

@ -4,6 +4,7 @@ Created on Dec 23, 2016
@author: klaehn @author: klaehn
''' '''
import sys import sys
sys.path.append('source/')
import time import time
import data_buffer import data_buffer

View File

@ -13,7 +13,7 @@ class DataBuffer(object):
return True return True
def average(self): def average(self):
if len(self.__data) != self.__max: if len(self.__data) is 0:
return None return None
return sum(self.__data) / len(self.__data) return sum(self.__data) / len(self.__data)

View File

@ -3,7 +3,9 @@ Created on Dec 19, 2016
@author: klaehn @author: klaehn
''' '''
from time import time, sleep import sys
sys.path.append('source/')
import time
import mqtt import mqtt
import data_buffer import data_buffer
import light_sensor import light_sensor
@ -44,8 +46,8 @@ class Gate(object):
STATE_OPENING:self.__opening_handler, \ STATE_OPENING:self.__opening_handler, \
STATE_CLOSING:self.__closing_handler, \ STATE_CLOSING:self.__closing_handler, \
STATE_ERROR:self.__error_handler} STATE_ERROR:self.__error_handler}
self.__next_state = "init" self.__next_state = STATE_INIT
self.__last_state = "error" self.__last_state = STATE_ERROR
self.__light_sensor = light_sensor.LightSensor(LIGHT_SENSOR_I2C_BUS, \ self.__light_sensor = light_sensor.LightSensor(LIGHT_SENSOR_I2C_BUS, \
LIGHT_SENSOR_I2C_ADDRESS) LIGHT_SENSOR_I2C_ADDRESS)
@ -62,7 +64,7 @@ class Gate(object):
self.__runtime_close = 0 self.__runtime_close = 0
def poll(self): def poll(self):
current_time = time() current_time = time.time()
if current_time >= self.__light_read_timeout: if current_time >= self.__light_read_timeout:
self.__light_read_timeout = current_time + LIGHT_READ_DELAY_S self.__light_read_timeout = current_time + LIGHT_READ_DELAY_S
self.__light_data.push(self.__light_sensor.read()) self.__light_data.push(self.__light_sensor.read())
@ -82,11 +84,11 @@ class Gate(object):
def __init_handler(self, light_avg): def __init_handler(self, light_avg):
''' '''
In init we don't know anything neither about gate state nor about In init we don't know anything neither about gate state nor about
light. So first we try to reach STATE_CLOSED. light. So first we try to reach STATE_OPENED.
''' '''
#pylint: disable=unused-argument #pylint: disable=unused-argument
self.__comserver.connect() self.__comserver.connect()
self.__comserver.transmit(MQTT_TOPIC, str(time()) + \ self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" gate gard initiated") " gate gard initiated")
self.__comserver.disconnect() self.__comserver.disconnect()
self.__update_state(STATE_OPENING) self.__update_state(STATE_OPENING)
@ -96,13 +98,14 @@ class Gate(object):
next_state = self.__next_state next_state = self.__next_state
if self.__is_transition(): if self.__is_transition():
self.__engine.down() self.__engine.down()
sleep(5) time.sleep(5)
self.__engine.stop() self.__engine.stop()
self.__comserver.transmit(MQTT_TOPIC, str(time()) + " Opened " + \ self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" Opened " + \
str(self.__power_data.average()) + " mW") str(self.__power_data.average()) + " mW")
self.__power_data.clear() self.__power_data.clear()
if light_avg <= LIGHT_LX_THRESHOLD["close"]: if (light_avg != None) and (light_avg <= LIGHT_LX_THRESHOLD["close"]):
next_state = STATE_CLOSING next_state = STATE_CLOSING
self.__update_state(next_state) self.__update_state(next_state)
@ -112,13 +115,14 @@ class Gate(object):
next_state = self.__next_state next_state = self.__next_state
if self.__is_transition(): if self.__is_transition():
self.__engine.up() self.__engine.up()
sleep(5) time.sleep(5)
self.__engine.stop() self.__engine.stop()
self.__comserver.transmit(MQTT_TOPIC, str(time()) + " Closed " + \ self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" Closed " + \
str(self.__power_data.average()) + " mW") str(self.__power_data.average()) + " mW")
self.__power_data.clear() self.__power_data.clear()
if light_avg > LIGHT_LX_THRESHOLD["open"]: if (light_avg != None) and (light_avg > LIGHT_LX_THRESHOLD["open"]):
next_state = STATE_OPENING next_state = STATE_OPENING
self.__update_state(next_state) self.__update_state(next_state)
@ -127,20 +131,20 @@ class Gate(object):
def __opening_handler(self, light_avg): def __opening_handler(self, light_avg):
next_state = self.__next_state next_state = self.__next_state
if self.__is_transition(): if self.__is_transition():
self.__runtime_open = time() + MIN_GATE_RUNTIME["open"] self.__runtime_open = time.time() + MIN_GATE_RUNTIME["open"]
self.__engine.up() self.__engine.up()
self.__gate_move_timeout = time() + MAX_GATE_RUNTIME["open"] self.__gate_move_timeout = time.time() + MAX_GATE_RUNTIME["open"]
self.__comserver.transmit(MQTT_TOPIC, str(time()) + \ self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" Opening " + str(light_avg) + " lx") " Opening " + str(light_avg) + " lx")
if time() > self.__gate_move_timeout: if time.time() > self.__gate_move_timeout:
next_state = STATE_ERROR next_state = STATE_ERROR
else: else:
self.__power_data.push(self.__power_sensor.power_mw()) self.__power_data.push(self.__power_sensor.power_mw())
current_avg = self.__power_data.average() current_avg = self.__power_data.average()
if current_avg != None: if current_avg != None:
if current_avg > MAX_ENGINE_POWER["up"]: if current_avg > MAX_ENGINE_POWER["up"]:
if time() > self.__runtime_open: if time.time() > self.__runtime_open:
next_state = STATE_OPENED next_state = STATE_OPENED
self.__update_state(next_state) self.__update_state(next_state)
@ -148,20 +152,20 @@ class Gate(object):
def __closing_handler(self, light_avg): def __closing_handler(self, light_avg):
next_state = self.__next_state next_state = self.__next_state
if self.__is_transition(): if self.__is_transition():
self.__runtime_close = time() + MIN_GATE_RUNTIME["close"] self.__runtime_close = time.time() + MIN_GATE_RUNTIME["close"]
self.__engine.down() self.__engine.down()
self.__gate_move_timeout = time() + MAX_GATE_RUNTIME["close"] self.__gate_move_timeout = time.time() + MAX_GATE_RUNTIME["close"]
self.__comserver.transmit(MQTT_TOPIC, str(time()) + \ self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" Closing " + str(light_avg) + " lx") " Closing " + str(light_avg) + " lx")
if time() > self.__gate_move_timeout: if time.time() > self.__gate_move_timeout:
next_state = STATE_ERROR next_state = STATE_ERROR
else: else:
self.__power_data.push(self.__power_sensor.power_mw()) self.__power_data.push(self.__power_sensor.power_mw())
current_avg = self.__power_data.average() current_avg = self.__power_data.average()
if current_avg != None: if current_avg != None:
if current_avg > MAX_ENGINE_POWER["down"]: if current_avg > MAX_ENGINE_POWER["down"]:
if time() > self.__runtime_close: if time.time() > self.__runtime_close:
next_state = STATE_CLOSED next_state = STATE_CLOSED
self.__update_state(next_state) self.__update_state(next_state)
@ -171,6 +175,6 @@ class Gate(object):
#pylint: disable=unused-argument #pylint: disable=unused-argument
if self.__is_transition(): if self.__is_transition():
self.__engine.stop() self.__engine.stop()
self.__comserver.transmit(MQTT_TOPIC, str(time()) + \ self.__comserver.transmit(MQTT_TOPIC, str(time.time()) + \
" Error handler!!!") " Error handler!!!")
self.__update_state(STATE_INIT) self.__update_state(STATE_INIT)

View File

@ -4,6 +4,7 @@ Created on Dec 19, 2016
@author: klaehn @author: klaehn
''' '''
import sys import sys
sys.path.append('source/')
import time import time
import gate import gate

View File

@ -4,6 +4,7 @@ Created on Dec 24, 2016
@author: tkl @author: tkl
''' '''
import sys import sys
sys.path.append('source/')
import time import time
import light_sensor import light_sensor
import mqtt import mqtt

View File

@ -4,6 +4,7 @@ Created on Dec 23, 2016
@author: klaehn @author: klaehn
''' '''
import sys import sys
sys.path.append('source/')
import time import time
import data_buffer import data_buffer

View File

@ -3,61 +3,29 @@ Created on Dec 19, 2016
@author: klaehn @author: klaehn
''' '''
import sys
sys.path.append('source/')
import unittest import unittest
import random import random
from data_buffer import DataBuffer import data_buffer
class Test(unittest.TestCase): class Test(unittest.TestCase):
def test_zero_space(self): def test_zero_space(self):
dut = DataBuffer(0) dut = data_buffer.DataBuffer(0)
result = dut.push(10) result = dut.push(10)
self.assertFalse(result, "test_zero_space - exp: False res: True") self.assertFalse(result, "test_zero_space - exp: False res: True")
def test_max_data(self): def test_max_data(self):
buffer_size = 10 buffer_size = 10
dut = DataBuffer(buffer_size) dut = data_buffer.DataBuffer(buffer_size)
for _ in range(11): for _ in range(11):
dut.push(random.random()) dut.push(random.random())
result = dut.length() result = dut.length()
self.assertEqual(result, buffer_size, "test_max_data - exp: " + \ self.assertEqual(result, buffer_size, "test_max_data - exp: " + \
str(buffer_size) + " res: " + str(result)) str(buffer_size) + " res: " + str(result))
def test_average_min(self):
dut = DataBuffer(10)
rnd = random.random()
dut.push(rnd)
result = dut.average()
self.assertEqual(result, None, "test_average_min - exp: None" + \
" res: " + str(result))
def test_average_mid(self):
buffer_size = 10
res_buffer = []
dut = DataBuffer(buffer_size)
for _ in range(buffer_size / 2):
rnd = random.random()
dut.push(rnd)
res_buffer.append(rnd)
result = dut.average()
self.assertEqual(result, None, "test_average_mid - exp: None" + \
" res: " + str(result))
def test_average_max(self):
buffer_size = 10
res_buffer = []
dut = DataBuffer(buffer_size)
for _ in range(buffer_size):
rnd = random.random()
dut.push(rnd)
res_buffer.append(rnd)
result = dut.average()
self.assertEqual(result, sum(res_buffer) / buffer_size, \
"test_average_mid - exp: " + \
str(sum(res_buffer) / buffer_size) + \
" res: " + str(result))
if __name__ == "__main__": if __name__ == "__main__":
#import sys;sys.argv = ['', 'Test.testName'] #import sys;sys.argv = ['', 'Test.testName']