math helper functions started
This commit is contained in:
parent
e311b47727
commit
7e96d60ed1
@ -31,9 +31,13 @@ class mqtt:
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
def transmit(self, topic, payload):
|
def transmit(self, topic, payload):
|
||||||
|
was_connected = True
|
||||||
if not self.__is_connected:
|
if not self.__is_connected:
|
||||||
return False
|
was_connected = False
|
||||||
|
self.connect()
|
||||||
result = self.__client.publish(topic, payload, self.__qos, self.__retain)
|
result = self.__client.publish(topic, payload, self.__qos, self.__retain)
|
||||||
|
if not was_connected:
|
||||||
|
self.disconnect()
|
||||||
if result == 0:
|
if result == 0:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
25
src/data_buffer/__init__.py
Normal file
25
src/data_buffer/__init__.py
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
import mymath
|
||||||
|
|
||||||
|
class DataBuffer(object):
|
||||||
|
def __init__(self, length):
|
||||||
|
self.__max = length
|
||||||
|
self.__data = []
|
||||||
|
|
||||||
|
def push(self, element):
|
||||||
|
if self.__max == 0:
|
||||||
|
return False
|
||||||
|
if len(self.__data) == self.__max:
|
||||||
|
_ = self.__data.pop(0)
|
||||||
|
self.__data.append(element)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def average(self):
|
||||||
|
if len(self.__data) != self.__max:
|
||||||
|
return None
|
||||||
|
return mymath.mean(self.__data)
|
||||||
|
|
||||||
|
def length(self):
|
||||||
|
return len(self.__data)
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
self.__data = []
|
@ -5,20 +5,20 @@ Created on Dec 19, 2016
|
|||||||
'''
|
'''
|
||||||
import unittest
|
import unittest
|
||||||
import random
|
import random
|
||||||
from light_data.light_data import light_data
|
from data_buffer import DataBuffer
|
||||||
|
|
||||||
|
|
||||||
class Test(unittest.TestCase):
|
class Test(unittest.TestCase):
|
||||||
|
|
||||||
|
|
||||||
def test_zero_space(self):
|
def test_zero_space(self):
|
||||||
dut = light_data(0)
|
dut = DataBuffer(0)
|
||||||
result = dut.push(10)
|
result = dut.push(10)
|
||||||
self.assertFalse(result, "test_zero_space - exp: False res: True")
|
self.assertFalse(result, "test_zero_space - exp: False res: True")
|
||||||
|
|
||||||
def test_max_data(self):
|
def test_max_data(self):
|
||||||
buffer_size = 10
|
buffer_size = 10
|
||||||
dut = light_data(buffer_size)
|
dut = DataBuffer(buffer_size)
|
||||||
for _ in range(11):
|
for _ in range(11):
|
||||||
dut.push(random.random())
|
dut.push(random.random())
|
||||||
result = dut.length()
|
result = dut.length()
|
||||||
@ -26,7 +26,7 @@ class Test(unittest.TestCase):
|
|||||||
str(buffer_size) + " res: " + str(result))
|
str(buffer_size) + " res: " + str(result))
|
||||||
|
|
||||||
def test_average_min(self):
|
def test_average_min(self):
|
||||||
dut = light_data(10)
|
dut = DataBuffer(10)
|
||||||
rnd = random.random()
|
rnd = random.random()
|
||||||
dut.push(rnd)
|
dut.push(rnd)
|
||||||
result = dut.average()
|
result = dut.average()
|
||||||
@ -36,7 +36,7 @@ class Test(unittest.TestCase):
|
|||||||
def test_average_mid(self):
|
def test_average_mid(self):
|
||||||
buffer_size = 10
|
buffer_size = 10
|
||||||
res_buffer = []
|
res_buffer = []
|
||||||
dut = light_data(buffer_size)
|
dut = DataBuffer(buffer_size)
|
||||||
for _ in range(buffer_size / 2):
|
for _ in range(buffer_size / 2):
|
||||||
rnd = random.random()
|
rnd = random.random()
|
||||||
dut.push(rnd)
|
dut.push(rnd)
|
||||||
@ -48,7 +48,7 @@ class Test(unittest.TestCase):
|
|||||||
def test_average_max(self):
|
def test_average_max(self):
|
||||||
buffer_size = 10
|
buffer_size = 10
|
||||||
res_buffer = []
|
res_buffer = []
|
||||||
dut = light_data(buffer_size)
|
dut = DataBuffer(buffer_size)
|
||||||
for _ in range(buffer_size):
|
for _ in range(buffer_size):
|
||||||
rnd = random.random()
|
rnd = random.random()
|
||||||
dut.push(rnd)
|
dut.push(rnd)
|
@ -0,0 +1,24 @@
|
|||||||
|
from gpio import Gpio
|
||||||
|
|
||||||
|
class Engine:
|
||||||
|
def __init__(self, gpio_1 = 13, gpio_2 = 19):
|
||||||
|
self.gpio_1 = Gpio(gpio_1)
|
||||||
|
self.gpio_2 = Gpio(gpio_2)
|
||||||
|
|
||||||
|
self.gpio_1.export()
|
||||||
|
self.gpio_1.direction(Gpio.DIRECTION_OUT)
|
||||||
|
self.gpio_2.export()
|
||||||
|
self.gpio_2.direction(Gpio.DIRECTION_OUT)
|
||||||
|
self.stop()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
self.gpio_1.set(0)
|
||||||
|
self.gpio_2.set(0)
|
||||||
|
|
||||||
|
def up(self):
|
||||||
|
self.gpio_1.set(1)
|
||||||
|
self.gpio_2.set(0)
|
||||||
|
|
||||||
|
def down(self):
|
||||||
|
self.gpio_1.set(0)
|
||||||
|
self.gpio_2.set(1)
|
@ -1,24 +0,0 @@
|
|||||||
from gpio.gpio import gpio
|
|
||||||
|
|
||||||
class engine:
|
|
||||||
def __init__(self, gpio_1 = 13, gpio_2 = 19):
|
|
||||||
self.gpio_1 = gpio(gpio_1)
|
|
||||||
self.gpio_2 = gpio(gpio_2)
|
|
||||||
|
|
||||||
self.gpio_1.export()
|
|
||||||
self.gpio_1.direction("out")
|
|
||||||
self.gpio_2.export()
|
|
||||||
self.gpio_2.direction("out")
|
|
||||||
self.stop()
|
|
||||||
|
|
||||||
def stop(self):
|
|
||||||
self.gpio_1.set(0)
|
|
||||||
self.gpio_2.set(0)
|
|
||||||
|
|
||||||
def run_up(self):
|
|
||||||
self.gpio_1.set(1)
|
|
||||||
self.gpio_2.set(0)
|
|
||||||
|
|
||||||
def run_down(self):
|
|
||||||
self.gpio_1.set(0)
|
|
||||||
self.gpio_2.set(1)
|
|
@ -4,37 +4,21 @@ Created on Dec 19, 2016
|
|||||||
@author: klaehn
|
@author: klaehn
|
||||||
'''
|
'''
|
||||||
|
|
||||||
from time import time
|
from engine import Engine
|
||||||
import numpy as np
|
|
||||||
from engine.engine import engine
|
|
||||||
from power_sensor.power_sensor import power_sensor
|
|
||||||
|
|
||||||
class GateHandler(object):
|
class GateHandler(object):
|
||||||
"""Gate handler class"""
|
"""Gate handler class"""
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.__engine = engine(13, 19)
|
self.__engine = Engine(13, 19)
|
||||||
self.__power_sensor = power_sensor(1, 0x40)
|
|
||||||
self.__max_current = {"up":315, "down":280}
|
|
||||||
self.__max_time = {"open":250, "close":250}
|
|
||||||
return
|
|
||||||
|
|
||||||
def open(self):
|
def open(self):
|
||||||
"""Open the gate"""
|
"""Open the gate"""
|
||||||
timeout_open = time() + self.__max_time["open"]
|
self.__engine.up()
|
||||||
self.__engine.run_up()
|
|
||||||
power = np.array([])
|
|
||||||
timestamps = np.array([])
|
|
||||||
while timeout_open > time():
|
|
||||||
timestamps = np.append(timestamps, time())
|
|
||||||
power = np.append(power, self.__power_sensor.power_mw())
|
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
"""Close the gate"""
|
"""Close the gate"""
|
||||||
timeout_open = time() + self.__max_time["close"]
|
self.__engine.down()
|
||||||
self.__engine.run_down()
|
|
||||||
power = np.array([])
|
def stop(self):
|
||||||
timestamps = np.array([])
|
"""Stop the Engine"""
|
||||||
while timeout_open > time():
|
self.__engine.stop()
|
||||||
timestamps = np.append(timestamps, time())
|
|
||||||
power = np.append(power, self.__power_sensor.power_mw())
|
|
||||||
|
|
||||||
|
@ -3,17 +3,168 @@ Created on Dec 19, 2016
|
|||||||
|
|
||||||
@author: klaehn
|
@author: klaehn
|
||||||
'''
|
'''
|
||||||
class gate_state:
|
from time import time
|
||||||
def __init__(self):
|
from communiate.protocol import mqtt
|
||||||
self.possible_states = ["unknown", "open", "close", "opening", "closing"]
|
from data_buffer import DataBuffer
|
||||||
self.state = "unknown"
|
from light_sensor.light_sensor import light_sensor
|
||||||
return
|
from gate.gate_handler import GateHandler
|
||||||
|
from power_sensor import PowerSensor
|
||||||
|
|
||||||
def set_state(self, new_state):
|
STATE_INIT = "init"
|
||||||
if new_state in self.possible_states:
|
STATE_OPENED = "open"
|
||||||
self.state = new_state
|
STATE_CLOSED = "close"
|
||||||
|
STATE_OPENING = "opening"
|
||||||
|
STATE_CLOSING = "closing"
|
||||||
|
STATE_ERROR = "error"
|
||||||
|
|
||||||
|
LIGHT_READ_DELAY_S = 60
|
||||||
|
LIGHT_CONSECUTIVE_READS = 10
|
||||||
|
LIGHT_LX_THRESHOLD = {"open":0, "close":0}
|
||||||
|
|
||||||
|
MQTT_HOST = "gitlab"
|
||||||
|
MQTT_TOPIC = "outdoor/chickenhouse/gate"
|
||||||
|
|
||||||
|
LIGHT_SENSOR_I2C_BUS = 1
|
||||||
|
LIGHT_SENSOR_I2C_ADDRESS = 0x23
|
||||||
|
|
||||||
|
POWER_SENSOR_I2C_BUS = 1
|
||||||
|
POWER_SENSOR_I2C_ADDRESS = 0x40
|
||||||
|
CONSECUTIVE_CURRENT_READS = 100
|
||||||
|
|
||||||
|
MAX_ENGINE_POWER = {"up":320, "down":290}
|
||||||
|
MAX_GATE_RUNTIME = {"open":250, "close":250}
|
||||||
|
|
||||||
|
|
||||||
|
class GateState(object):
|
||||||
|
def __init__(self):
|
||||||
|
self.__state_handler = {STATE_INIT:self.__init_handler, \
|
||||||
|
STATE_OPENED:self.__opened_handler, \
|
||||||
|
STATE_CLOSED:self.__closed_handler, \
|
||||||
|
STATE_OPENING:self.__opening_handler, \
|
||||||
|
STATE_CLOSING:self.__closing_handler, \
|
||||||
|
STATE_ERROR:self.__error_handler}
|
||||||
|
self.__next_state = "init"
|
||||||
|
self.__last_state = "error"
|
||||||
|
|
||||||
|
self.__light_sensor = light_sensor(LIGHT_SENSOR_I2C_BUS, \
|
||||||
|
LIGHT_SENSOR_I2C_ADDRESS)
|
||||||
|
self.__light_data = DataBuffer(LIGHT_CONSECUTIVE_READS)
|
||||||
|
self.__comserver = mqtt(MQTT_HOST)
|
||||||
|
self.__gate_handler = GateHandler()
|
||||||
|
self.__power_sensor = PowerSensor(POWER_SENSOR_I2C_BUS, \
|
||||||
|
POWER_SENSOR_I2C_ADDRESS)
|
||||||
|
self.__power_data = DataBuffer(CONSECUTIVE_CURRENT_READS)
|
||||||
|
self.__gate_move_timeout = 0
|
||||||
|
self.__light_read_timeout = 0
|
||||||
|
|
||||||
|
def poll(self):
|
||||||
|
current_time = time()
|
||||||
|
if current_time >= self.__light_read_timeout:
|
||||||
|
self.__light_read_timeout = current_time + LIGHT_READ_DELAY_S
|
||||||
|
self.__light_data.push(self.__light_sensor.read())
|
||||||
|
self.__state_handler[self.__next_state](self.__light_data.average())
|
||||||
|
|
||||||
|
def __update_state(self, new_state):
|
||||||
|
self.__last_state = self.__next_state
|
||||||
|
self.__next_state = new_state
|
||||||
|
|
||||||
|
|
||||||
|
def __is_transition(self):
|
||||||
|
if self.__last_state != self.__next_state:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_state(self):
|
|
||||||
return self.state
|
def __init_handler(self, light_avg):
|
||||||
|
'''
|
||||||
|
In init we don't know anything neither about gate state nor about
|
||||||
|
light. So first we try to reach STATE_CLOSED.
|
||||||
|
'''
|
||||||
|
#pylint: disable=unused-argument
|
||||||
|
self.__comserver.connect()
|
||||||
|
self.__comserver.transmit(MQTT_TOPIC, str(time()) + \
|
||||||
|
" gate gard initiated")
|
||||||
|
self.__comserver.disconnect()
|
||||||
|
self.__update_state(STATE_CLOSING)
|
||||||
|
|
||||||
|
|
||||||
|
def __opened_handler(self, light_avg):
|
||||||
|
next_state = self.__next_state
|
||||||
|
if self.__is_transition():
|
||||||
|
self.__gate_handler.stop()
|
||||||
|
self.__power_data.clear()
|
||||||
|
self.__comserver.transmit(MQTT_TOPIC, str(time()) + \
|
||||||
|
" Opening gate finished")
|
||||||
|
|
||||||
|
if light_avg <= LIGHT_LX_THRESHOLD["close"]:
|
||||||
|
next_state = STATE_CLOSING
|
||||||
|
|
||||||
|
self.__update_state(next_state)
|
||||||
|
|
||||||
|
|
||||||
|
def __closed_handler(self, light_avg):
|
||||||
|
next_state = self.__next_state
|
||||||
|
if self.__is_transition():
|
||||||
|
self.__gate_handler.stop()
|
||||||
|
self.__power_data.clear()
|
||||||
|
self.__comserver.transmit(MQTT_TOPIC, str(time()) + \
|
||||||
|
" Closing gate finished")
|
||||||
|
|
||||||
|
if light_avg > LIGHT_LX_THRESHOLD["open"]:
|
||||||
|
next_state = STATE_OPENING
|
||||||
|
|
||||||
|
self.__update_state(next_state)
|
||||||
|
|
||||||
|
|
||||||
|
def __opening_handler(self, light_avg):
|
||||||
|
#pylint: disable=unused-argument
|
||||||
|
next_state = self.__next_state
|
||||||
|
if self.__is_transition():
|
||||||
|
self.__gate_handler.open()
|
||||||
|
self.__gate_move_timeout = time() + MAX_GATE_RUNTIME["open"]
|
||||||
|
self.__comserver.transmit(MQTT_TOPIC, str(time()) + \
|
||||||
|
" Opening gate beginning")
|
||||||
|
|
||||||
|
if time() > self.__gate_move_timeout:
|
||||||
|
next_state = STATE_ERROR
|
||||||
|
else:
|
||||||
|
self.__power_data.push(self.__power_sensor.power_mw())
|
||||||
|
current_avg = self.__power_data.average()
|
||||||
|
if current_avg != None:
|
||||||
|
print "current_avg: " + str(current_avg)
|
||||||
|
if current_avg > MAX_ENGINE_POWER["up"]:
|
||||||
|
next_state = STATE_OPENED
|
||||||
|
|
||||||
|
self.__update_state(next_state)
|
||||||
|
|
||||||
|
def __closing_handler(self, light_avg):
|
||||||
|
#pylint: disable=unused-argument
|
||||||
|
next_state = self.__next_state
|
||||||
|
if self.__is_transition():
|
||||||
|
self.__gate_handler.close()
|
||||||
|
self.__gate_move_timeout = time() + MAX_GATE_RUNTIME["close"]
|
||||||
|
self.__comserver.transmit(MQTT_TOPIC, str(time()) + \
|
||||||
|
" Closing gate beginning")
|
||||||
|
|
||||||
|
if time() > self.__gate_move_timeout:
|
||||||
|
next_state = STATE_ERROR
|
||||||
|
else:
|
||||||
|
self.__power_data.push(self.__power_sensor.power_mw())
|
||||||
|
current_avg = self.__power_data.average()
|
||||||
|
if current_avg != None:
|
||||||
|
print "current_avg: " + str(current_avg)
|
||||||
|
if current_avg > MAX_ENGINE_POWER["down"]:
|
||||||
|
next_state = STATE_CLOSED
|
||||||
|
elif current_avg > MAX_ENGINE_POWER["up"]:
|
||||||
|
''' TODO: Screwed up! while closing we are reaching the upper limit!!!'''
|
||||||
|
|
||||||
|
self.__update_state(next_state)
|
||||||
|
|
||||||
|
|
||||||
|
def __error_handler(self, light_avg):
|
||||||
|
#pylint: disable=unused-argument
|
||||||
|
if self.__is_transition():
|
||||||
|
self.__gate_handler.stop()
|
||||||
|
self.__comserver.transmit(MQTT_TOPIC, str(time()) + \
|
||||||
|
" Error handler!!!")
|
||||||
|
self.__update_state(STATE_INIT)
|
||||||
|
@ -1,67 +1,22 @@
|
|||||||
from communiate.protocol import mqtt
|
'''
|
||||||
from gate.gate_state import gate_state
|
Created on Dec 19, 2016
|
||||||
from light_data.light_data import light_data
|
|
||||||
from light_sensor.light_sensor import light_sensor
|
|
||||||
|
|
||||||
|
@author: klaehn
|
||||||
|
'''
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from gate.gate_state import GateState
|
||||||
|
|
||||||
class gate_guard:
|
def main():
|
||||||
def __init__(self):
|
gate_state = GateState()
|
||||||
self.__light_read_delay_s = 60
|
try:
|
||||||
self.__consequtive_light_reads = 10
|
while True:
|
||||||
self.__light_lx_close = 0
|
gate_state.poll()
|
||||||
self.__light_lx_open = 0
|
time.sleep(0.001)
|
||||||
|
|
||||||
self.__light_sensor = light_sensor(1, 0x23)
|
except KeyboardInterrupt:
|
||||||
self.__light_data = light_data(self.__consequtive_light_reads)
|
print "key exit"
|
||||||
self.__comserver = mqtt("gitlab")
|
return None
|
||||||
self.__gate_state = gate_state()
|
|
||||||
|
|
||||||
def close_gate(self):
|
|
||||||
topic = "outdoor/chickenhouse/gate"
|
|
||||||
payload = str(time.time()) + " closing"
|
|
||||||
self.__comserver.connect()
|
|
||||||
self.__comserver.transmit(topic, payload)
|
|
||||||
self.__comserver.disconnect()
|
|
||||||
self.__gate_state.set_state("close")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def open_gate(self):
|
|
||||||
topic = "outdoor/chickenhouse/gate"
|
|
||||||
payload = str(time.time()) + " opening"
|
|
||||||
self.__comserver.connect()
|
|
||||||
self.__comserver.transmit(topic, payload)
|
|
||||||
self.__comserver.disconnect()
|
|
||||||
self.__gate_state.set_state("open")
|
|
||||||
return None
|
|
||||||
|
|
||||||
def handle_gate_state(self, light_avg=0):
|
|
||||||
if self.__gate_state.get_state() == "open":
|
|
||||||
if light_avg <= self.__light_lx_close:
|
|
||||||
self.close_gate()
|
|
||||||
elif self.__gate_state.get_state() == "close":
|
|
||||||
if light_avg > self.__light_lx_open:
|
|
||||||
self.open_gate()
|
|
||||||
elif self.__gate_state.get_state() == "unknown":
|
|
||||||
'''TODO: bring gate in a defined position'''
|
|
||||||
|
|
||||||
def run(self):
|
|
||||||
if self.__gate_state.get_state() == "unknown":
|
|
||||||
self.__gate_state.set_state("open")
|
|
||||||
|
|
||||||
try:
|
|
||||||
while True:
|
|
||||||
rd = self.__light_sensor.read()
|
|
||||||
self.__light_data.push(rd)
|
|
||||||
light_avg = self.__light_data.average()
|
|
||||||
if light_avg != None:
|
|
||||||
self.handle_gate_state(light_avg)
|
|
||||||
time.sleep(self.__light_read_delay_s)
|
|
||||||
|
|
||||||
except KeyboardInterrupt:
|
|
||||||
return None
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
gate_guard = gate_guard()
|
sys.exit(main())
|
||||||
sys.exit(gate_guard.run())
|
|
||||||
|
@ -0,0 +1,32 @@
|
|||||||
|
from os.path import islink, isfile
|
||||||
|
|
||||||
|
class Gpio:
|
||||||
|
DIRECTION_OUT = "out"
|
||||||
|
DIRECTION_IN = "in"
|
||||||
|
|
||||||
|
def __init__(self, pin):
|
||||||
|
self.pin = pin
|
||||||
|
|
||||||
|
def export(self):
|
||||||
|
if not islink("/sys/class/gpio/gpio" + str(self.pin)):
|
||||||
|
f = open("/sys/class/gpio/export", "w")
|
||||||
|
f.write(str(self.pin))
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
def unexport(self):
|
||||||
|
if islink("/sys/class/gpio/gpio" + str(self.pin)):
|
||||||
|
f = open("/sys/class/gpio/unexport", "w")
|
||||||
|
f.write(str(self.pin))
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
def direction(self, direction = DIRECTION_OUT):
|
||||||
|
if isfile("/sys/class/gpio/gpio" + str(self.pin) + "/direction"):
|
||||||
|
f = open("/sys/class/gpio/gpio" + str(self.pin) + "/direction", "w")
|
||||||
|
f.write(direction)
|
||||||
|
f.close()
|
||||||
|
|
||||||
|
def set(self, value = 0):
|
||||||
|
if isfile("/sys/class/gpio/gpio" + str(self.pin) + "/value"):
|
||||||
|
f = open("/sys/class/gpio/gpio" + str(self.pin) + "/value", "w")
|
||||||
|
f.write(str(value))
|
||||||
|
f.close()
|
@ -1,25 +0,0 @@
|
|||||||
from os.path import islink, isfile
|
|
||||||
|
|
||||||
class gpio:
|
|
||||||
def __init__(self, pin):
|
|
||||||
self.pin = pin
|
|
||||||
|
|
||||||
def export(self):
|
|
||||||
if not islink("/sys/class/gpio/gpio" + str(self.pin)):
|
|
||||||
f = open("/sys/class/gpio/export", "w")
|
|
||||||
f.write(str(self.pin))
|
|
||||||
|
|
||||||
def unexport(self):
|
|
||||||
if islink("/sys/class/gpio/gpio" + str(self.pin)):
|
|
||||||
f = open("/sys/class/gpio/unexport", "w")
|
|
||||||
f.write(str(self.pin))
|
|
||||||
|
|
||||||
def direction(self, direction = "out"):
|
|
||||||
if isfile("/sys/class/gpio/gpio" + str(self.pin) + "/direction"):
|
|
||||||
f = open("/sys/class/gpio/gpio" + str(self.pin) + "/direction", "w")
|
|
||||||
f.write(direction)
|
|
||||||
|
|
||||||
def set(self, value = 0):
|
|
||||||
if isfile("/sys/class/gpio/gpio" + str(self.pin) + "/value"):
|
|
||||||
f = open("/sys/class/gpio/gpio" + str(self.pin) + "/value", "w")
|
|
||||||
f.write(str(value))
|
|
@ -1,20 +0,0 @@
|
|||||||
class light_data:
|
|
||||||
def __init__(self, length=10):
|
|
||||||
self.max = length
|
|
||||||
self.data = []
|
|
||||||
|
|
||||||
def push(self, element):
|
|
||||||
if self.max == 0:
|
|
||||||
return False
|
|
||||||
if len(self.data) == self.max:
|
|
||||||
_ = self.data.pop(0)
|
|
||||||
self.data.append(element)
|
|
||||||
return True
|
|
||||||
|
|
||||||
def average(self):
|
|
||||||
if len(self.data) != self.max:
|
|
||||||
return None
|
|
||||||
return sum(self.data) / self.max
|
|
||||||
|
|
||||||
def length(self):
|
|
||||||
return len(self.data)
|
|
@ -2,10 +2,10 @@ import smbus
|
|||||||
|
|
||||||
class light_sensor:
|
class light_sensor:
|
||||||
def __init__(self, bus = 1, addr = 0x23):
|
def __init__(self, bus = 1, addr = 0x23):
|
||||||
self.bus = bus
|
self.__i2c_device = bus
|
||||||
self.addr = addr
|
self.__i2c_addr = addr
|
||||||
self.bus = smbus.SMBus(self.bus)
|
self.__i2c_device = smbus.SMBus(self.__i2c_device)
|
||||||
|
|
||||||
def read(self):
|
def read(self):
|
||||||
data = self.bus.read_i2c_block_data(self.addr, 0x10)
|
data = self.__i2c_device.read_i2c_block_data(self.__i2c_addr, 0x10)
|
||||||
return int(round((data[0] * 256 + data[1]) / 1.2, 0))
|
return int(round((data[0] * 256 + data[1]) / 1.2, 0))
|
||||||
|
@ -1,15 +0,0 @@
|
|||||||
'''
|
|
||||||
Created on Dec 20, 2016
|
|
||||||
|
|
||||||
@author: klaehn
|
|
||||||
'''
|
|
||||||
|
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
def calc_slope(x, y, x_dist):
|
|
||||||
"""
|
|
||||||
Calculate the slope of a curve given by x and y arrays. The x_distance
|
|
||||||
gives the start and end point for the slope.
|
|
||||||
Returns x and y arrays of the slope
|
|
||||||
"""
|
|
||||||
return
|
|
13
src/mymath/__init__.py
Normal file
13
src/mymath/__init__.py
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
"""
|
||||||
|
Math helper functions
|
||||||
|
"""
|
||||||
|
|
||||||
|
def mean(x_array):
|
||||||
|
"""
|
||||||
|
Calculate the mean.
|
||||||
|
Input: x: array of x-values
|
||||||
|
Return: mean
|
||||||
|
"""
|
||||||
|
if len(x_array) > 0:
|
||||||
|
return sum(x_array) / len(x_array)
|
||||||
|
return None
|
29
src/mymath/unittest/__init__.py
Normal file
29
src/mymath/unittest/__init__.py
Normal file
@ -0,0 +1,29 @@
|
|||||||
|
""" Unit tests for mymath module """
|
||||||
|
import unittest
|
||||||
|
import mymath
|
||||||
|
|
||||||
|
class Test(unittest.TestCase):
|
||||||
|
""" Test class for untit tests """
|
||||||
|
def test_mean_even(self):
|
||||||
|
""" Unit test for mean function """
|
||||||
|
test_arr = [1,2,3,4,5]
|
||||||
|
result = mymath.mean(test_arr)
|
||||||
|
self.assertEqual(result, 3, "test_mean_even - exp: 3, res: " + str(result))
|
||||||
|
|
||||||
|
def test_mean_odd(self):
|
||||||
|
""" Unit test for mean function """
|
||||||
|
test_arr = [-1,2,-3,4,-5]
|
||||||
|
result = mymath.mean(test_arr)
|
||||||
|
self.assertEqual(result, -1, "test_mean_odd - exp: -1, res: " + str(result))
|
||||||
|
|
||||||
|
def test_mean_float(self):
|
||||||
|
""" Unit test for mean function """
|
||||||
|
test_arr = [1.9,2.007,3.4,4,50.678]
|
||||||
|
result = mymath.mean(test_arr)
|
||||||
|
self.assertEqual(result, 12.397, "test_mean_float - exp: 12.397, res: " + str(result))
|
||||||
|
|
||||||
|
def test_mean_empty(self):
|
||||||
|
""" Unit test for mean function """
|
||||||
|
test_arr = []
|
||||||
|
result = mymath.mean(test_arr)
|
||||||
|
self.assertIsNone(result)
|
@ -0,0 +1,49 @@
|
|||||||
|
'''
|
||||||
|
Created on Dec 19, 2016
|
||||||
|
|
||||||
|
@author: klaehn
|
||||||
|
'''
|
||||||
|
|
||||||
|
import smbus
|
||||||
|
|
||||||
|
class PowerSensor(object):
|
||||||
|
'''
|
||||||
|
Power sensor wrapper
|
||||||
|
'''
|
||||||
|
def __init__(self, bus = 1, addr = 0x40):
|
||||||
|
self.__bus = smbus.SMBus(bus)
|
||||||
|
self.__addr = addr
|
||||||
|
|
||||||
|
value = [(0x1000 >> 8) & 0xFF, 0x1000 & 0xFF]
|
||||||
|
self.__bus.write_i2c_block_data(self.__addr, 0x05, value)
|
||||||
|
config = 0x2000 | 0x1800 | 0x0400 | 0x0018 | 0x0007
|
||||||
|
value = [(config >> 8) & 0xFF, config & 0xFF]
|
||||||
|
self.__bus.write_i2c_block_data(self.__addr, 0x00, value)
|
||||||
|
|
||||||
|
def shunt_voltage_mv(self):
|
||||||
|
''' Read the voltage at the shunt resistor [mV] '''
|
||||||
|
data = self.__bus.read_i2c_block_data(self.__addr, 0x01)
|
||||||
|
voltage = data[0] * 256 + data[1]
|
||||||
|
return voltage * 0.01
|
||||||
|
|
||||||
|
def current_ma(self):
|
||||||
|
''' Read the current [mA] '''
|
||||||
|
data = self.__bus.read_i2c_block_data(self.__addr, 0x04)
|
||||||
|
if data[0] >> 7 == 1:
|
||||||
|
current = data[0] * 256 + data[1]
|
||||||
|
if current & (1 << 15):
|
||||||
|
current = current - (1 << 16)
|
||||||
|
else:
|
||||||
|
current = (data[0] << 8) | (data[1])
|
||||||
|
return current / 10
|
||||||
|
|
||||||
|
def power_mw(self):
|
||||||
|
''' Read the power [mW] '''
|
||||||
|
data = self.__bus.read_i2c_block_data(self.__addr, 0x03)
|
||||||
|
if data[0] >> 7 == 1:
|
||||||
|
power = data[0] * 256 + data[1]
|
||||||
|
if power & (1 << 15):
|
||||||
|
power = power - (1 << 16)
|
||||||
|
else:
|
||||||
|
power = (data[0] << 8) | (data[1])
|
||||||
|
return power / 2
|
@ -1,37 +0,0 @@
|
|||||||
import smbus
|
|
||||||
|
|
||||||
class power_sensor:
|
|
||||||
def __init__(self, bus = 1, addr = 0x40):
|
|
||||||
self.bus = smbus.SMBus(bus)
|
|
||||||
self.addr = addr
|
|
||||||
|
|
||||||
bytes = [(0x1000 >> 8) & 0xFF, 0x1000 & 0xFF]
|
|
||||||
self.bus.write_i2c_block_data(self.addr, 0x05, bytes)
|
|
||||||
config = 0x2000 | 0x1800 | 0x0400 | 0x0018 | 0x0007
|
|
||||||
bytes = [(config >> 8) & 0xFF, config & 0xFF]
|
|
||||||
self.bus.write_i2c_block_data(self.addr, 0x00, bytes)
|
|
||||||
|
|
||||||
def shunt_voltage_mv(self):
|
|
||||||
data = self.bus.read_i2c_block_data(self.addr, 0x01)
|
|
||||||
voltage = data[0] * 256 + data[1]
|
|
||||||
return voltage * 0.01
|
|
||||||
|
|
||||||
def current_ma(self):
|
|
||||||
data = self.bus.read_i2c_block_data(self.addr, 0x04)
|
|
||||||
if(data[0] >> 7 == 1):
|
|
||||||
current = data[0] * 256 + data[1]
|
|
||||||
if(current & (1 << 15)):
|
|
||||||
current = current - (1 << 16)
|
|
||||||
else:
|
|
||||||
current = (data[0] << 8) | (data[1])
|
|
||||||
return current / 10
|
|
||||||
|
|
||||||
def power_mw(self):
|
|
||||||
data = self.bus.read_i2c_block_data(self.addr, 0x03)
|
|
||||||
if(data[0] >> 7 == 1):
|
|
||||||
power = data[0] * 256 + data[1]
|
|
||||||
if(power & (1 << 15)):
|
|
||||||
power = power - (1 << 16)
|
|
||||||
else:
|
|
||||||
power = (data[0] << 8) | (data[1])
|
|
||||||
return power / 2
|
|
Loading…
Reference in New Issue
Block a user