reorder structure
This commit is contained in:
parent
2fc4339f61
commit
4d76bc4e88
17
.project
Normal file
17
.project
Normal file
@ -0,0 +1,17 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<projectDescription>
|
||||
<name>chicks.py</name>
|
||||
<comment></comment>
|
||||
<projects>
|
||||
</projects>
|
||||
<buildSpec>
|
||||
<buildCommand>
|
||||
<name>org.python.pydev.PyDevBuilder</name>
|
||||
<arguments>
|
||||
</arguments>
|
||||
</buildCommand>
|
||||
</buildSpec>
|
||||
<natures>
|
||||
<nature>org.python.pydev.pythonNature</nature>
|
||||
</natures>
|
||||
</projectDescription>
|
8
.pydevproject
Normal file
8
.pydevproject
Normal file
@ -0,0 +1,8 @@
|
||||
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||
<?eclipse-pydev version="1.0"?><pydev_project>
|
||||
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
|
||||
<path>/${PROJECT_DIR_NAME}/src</path>
|
||||
</pydev_pathproperty>
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
|
||||
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
|
||||
</pydev_project>
|
26
engine.py
26
engine.py
@ -1,26 +0,0 @@
|
||||
from gpio import gpio
|
||||
|
||||
class engine:
|
||||
|
||||
def __init__(self, gpio_1 = 13, gpio_2 = 19):
|
||||
self.gpio_1 = gpio(gpio_1)
|
||||
self.gpio_2 = gpio(gpio_2)
|
||||
|
||||
self.gpio_1.export()
|
||||
self.gpio_1.direction("out")
|
||||
self.gpio_2.export()
|
||||
self.gpio_2.direction("out")
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
self.gpio_1.set(0)
|
||||
self.gpio_2.set(0)
|
||||
|
||||
def run_up(self):
|
||||
self.gpio_1.set(1)
|
||||
self.gpio_2.set(0)
|
||||
|
||||
def run_down(self):
|
||||
self.gpio_1.set(0)
|
||||
self.gpio_2.set(1)
|
||||
|
@ -1,50 +0,0 @@
|
||||
#!/usr/bin/python2
|
||||
from engine import engine
|
||||
from power_sensor import power_sensor
|
||||
from time import sleep, time
|
||||
from datetime import datetime
|
||||
from getopt import getopt
|
||||
from sys import argv
|
||||
|
||||
eng = engine(13, 19)
|
||||
ps = power_sensor(1, 0x40)
|
||||
|
||||
def run_engine(direction):
|
||||
if direction in ("u", "up"):
|
||||
eng.run_up()
|
||||
elif direction in ("d", "down"):
|
||||
eng.run_down()
|
||||
else:
|
||||
return
|
||||
log = open("power.log", "w")
|
||||
try:
|
||||
while True:
|
||||
power = ps.power_mw()
|
||||
ts = datetime.fromtimestamp(time()).microsecond
|
||||
log.write(str(ts) + " ms " + str(power) + " mW")
|
||||
log.write(str(ts) + " ms " + str(power) + " mW")
|
||||
log.write(str(ts) + " ms " + str(power) + " mW")
|
||||
print str(ts) + " ms " + str(power) + " mW"
|
||||
except KeyboardInterrupt:
|
||||
eng.stop()
|
||||
log.close()
|
||||
|
||||
def print_help():
|
||||
print "help screen..."
|
||||
|
||||
|
||||
def main(argv):
|
||||
dir_list = ["u", "up", "d", "down"]
|
||||
options, remainder = getopt(argv, "hd:", ["help", "direction="])
|
||||
for opt, args in options:
|
||||
if opt in ("-h", "--help"):
|
||||
print_help()
|
||||
elif opt in ("-d", "--direction"):
|
||||
if args in dir_list:
|
||||
run_engine(args)
|
||||
else:
|
||||
print_help()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main(argv[1:])
|
29
gpio.py
29
gpio.py
@ -1,29 +0,0 @@
|
||||
from os.path import islink, isfile
|
||||
class gpio:
|
||||
def __init__(self, pin):
|
||||
self.pin = pin
|
||||
|
||||
def export(self):
|
||||
if not islink("/sys/class/gpio/gpio" + str(self.pin)):
|
||||
f = open("/sys/class/gpio/export", "w")
|
||||
f.write(str(self.pin))
|
||||
|
||||
def unexport(self):
|
||||
if islink("/sys/class/gpio/gpio" + str(self.pin)):
|
||||
f = open("/sys/class/gpio/unexport", "w")
|
||||
f.write(str(self.pin))
|
||||
|
||||
def direction(self, direction = "out"):
|
||||
if isfile("/sys/class/gpio/gpio" + str(self.pin) + \
|
||||
"/direction"):
|
||||
f = open("/sys/class/gpio/gpio" + str(self.pin) + \
|
||||
"/direction", "w")
|
||||
f.write(direction)
|
||||
|
||||
def set(self, value = 0):
|
||||
if isfile("/sys/class/gpio/gpio" + str(self.pin) + \
|
||||
"/value"):
|
||||
f = open("/sys/class/gpio/gpio" + str(self.pin) + \
|
||||
"/value", "w")
|
||||
f.write(str(value))
|
||||
|
@ -1,7 +0,0 @@
|
||||
#!/usr/bin/python2
|
||||
from gpio import gpio
|
||||
|
||||
g13 = gpio(13)
|
||||
g13.export()
|
||||
g13.direction("out")
|
||||
|
@ -1,12 +0,0 @@
|
||||
import smbus
|
||||
|
||||
class light_sensor:
|
||||
def __init__(self, bus = 1, addr = 0x23):
|
||||
self.bus = bus
|
||||
self.addr = addr
|
||||
self.bus = smbus.SMBus(self.bus)
|
||||
|
||||
def read(self):
|
||||
data = self.bus.read_i2c_block_data(self.addr, 0x10)
|
||||
lx = int(round((data[0] * 256 + data[1]) / 1.2, 0))
|
||||
return lx
|
@ -1,19 +0,0 @@
|
||||
#!/usr/bin/python2
|
||||
|
||||
from light_sensor import light_sensor
|
||||
from time import sleep, time
|
||||
from datetime import datetime
|
||||
|
||||
ls = light_sensor(1, 0x23)
|
||||
|
||||
while True:
|
||||
now = datetime.now()
|
||||
now_str = str(now.year) + "-" + str(now.month) + "-" + str(now.day) + " " + str(now.hour) + ":" + str(now.minute) + ":" + str(now.second)
|
||||
line = str(int(time())) + " " + now_str + " light: " + str(ls.read()) + " lx."
|
||||
print line
|
||||
|
||||
log = open("light.log", "a")
|
||||
log.write(line)
|
||||
log.write("\n")
|
||||
log.close()
|
||||
sleep(30)
|
34
mqtt_test.py
34
mqtt_test.py
@ -1,34 +0,0 @@
|
||||
#!/usr/bin/python2
|
||||
|
||||
from time import sleep, time
|
||||
import paho.mqtt.client as mqtt
|
||||
from wifi_fieldstrength import wifi_fieldstrength
|
||||
from light_sensor import light_sensor
|
||||
|
||||
def on_connect(client, data, flags, result):
|
||||
print "Connected with " + str(result)
|
||||
|
||||
def main():
|
||||
cl = mqtt.Client()
|
||||
cl.on_connect = on_connect
|
||||
cl.connect("gitlab", 1883, 60)
|
||||
cl.loop_start()
|
||||
|
||||
wf = wifi_fieldstrength("wlan0")
|
||||
ls = light_sensor(1, 0x23)
|
||||
light = 0
|
||||
for i in range(0, 5):
|
||||
light += ls.read()
|
||||
sleep(1)
|
||||
light /= 5
|
||||
tm = str(int(time()))
|
||||
cl.publish("outdoor/chickenhouse/fieldstr", tm + " " + \
|
||||
str(wf.name()) + ": " + str(wf.read()) + " dBm", qos = 2, \
|
||||
retain = True)
|
||||
cl.publish("outdoor/chickenhouse/light", tm + " " + \
|
||||
str(light) + " lx", qos = 2, retain = True)
|
||||
cl.loop_stop()
|
||||
cl.disconnect()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -1,38 +0,0 @@
|
||||
import smbus
|
||||
|
||||
class power_sensor:
|
||||
def __init__(self, bus = 1, addr = 0x40):
|
||||
self.bus = smbus.SMBus(bus)
|
||||
self.addr = addr
|
||||
|
||||
bytes = [(0x1000 >> 8) & 0xFF, 0x1000 & 0xFF]
|
||||
self.bus.write_i2c_block_data(self.addr, 0x05, bytes)
|
||||
config = 0x2000 | 0x1800 | 0x0400 | 0x0018 | 0x0007
|
||||
bytes = [(config >> 8) & 0xFF, config & 0xFF]
|
||||
self.bus.write_i2c_block_data(self.addr, 0x00, bytes)
|
||||
|
||||
def shunt_voltage_mv(self):
|
||||
data = self.bus.read_i2c_block_data(self.addr, 0x01)
|
||||
voltage = data[0] * 256 + data[1]
|
||||
return voltage * 0.01
|
||||
|
||||
def current_ma(self):
|
||||
data = self.bus.read_i2c_block_data(self.addr, 0x04)
|
||||
if(data[0] >> 7 == 1):
|
||||
current = data[0] * 256 + data[1]
|
||||
if(current & (1 << 15)):
|
||||
current = current - (1 << 16)
|
||||
else:
|
||||
current = (data[0] << 8) | (data[1])
|
||||
return current / 10
|
||||
|
||||
def power_mw(self):
|
||||
data = self.bus.read_i2c_block_data(self.addr, 0x03)
|
||||
if(data[0] >> 7 == 1):
|
||||
power = data[0] * 256 + data[1]
|
||||
if(power & (1 << 15)):
|
||||
power = power - (1 << 16)
|
||||
else:
|
||||
power = (data[0] << 8) | (data[1])
|
||||
return power / 2
|
||||
|
@ -1,14 +0,0 @@
|
||||
#!/usr/bin/python2
|
||||
from power_sensor import power_sensor
|
||||
from time import sleep
|
||||
|
||||
ps = power_sensor(1, 0x40)
|
||||
while True:
|
||||
voltage = ps.shunt_voltage_mv()
|
||||
print str(voltage) + " mV"
|
||||
current = ps.current_ma()
|
||||
print str(current) + " mA"
|
||||
power = ps.power_mw()
|
||||
print str(power) + " mW"
|
||||
sleep(1)
|
||||
|
77
powerlog.py
77
powerlog.py
@ -1,77 +0,0 @@
|
||||
#!/usr/bin/python
|
||||
#import matplotlib
|
||||
#matplotlib.use("Agg")
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import getopt
|
||||
import sys
|
||||
|
||||
def get_time_power(file_name):
|
||||
offset = 1000000
|
||||
loop_offset = 0
|
||||
last_time = 0
|
||||
time_log = np.array([])
|
||||
power_log = np.array([])
|
||||
log = open(file_name, "r")
|
||||
for line in log:
|
||||
line = line.strip().split(" ")
|
||||
time = int(line[0])
|
||||
if time < last_time:
|
||||
loop_offset += offset
|
||||
last_time = time
|
||||
time_log = np.append(time_log, time + loop_offset)
|
||||
power_log = np.append(power_log, int(line[2]))
|
||||
if len(time_log) != len(power_log):
|
||||
return None
|
||||
return time_log, power_log
|
||||
|
||||
def time_us_to_ms(time_us):
|
||||
time_ms = np.array([])
|
||||
for value in time_us:
|
||||
time_ms = np.append(time_ms, value / 1000)
|
||||
return time_ms
|
||||
|
||||
def get_basename(file_name):
|
||||
basename = file_name.strip().split(".")
|
||||
if len(basename) < 1:
|
||||
return None
|
||||
return basename[0]
|
||||
|
||||
def print_power_diagram(time_ms, power_mw, time_slope, power_slope, basename, image_format="svg"):
|
||||
plt.clf()
|
||||
plt.plot(time_ms, power_mw, "r-", time_slope, power_slope, "b-")
|
||||
plt.show()
|
||||
return None
|
||||
|
||||
def get_slope(x, y, interval):
|
||||
y_sum = 0
|
||||
x_start = x[0]
|
||||
div = 0
|
||||
x_slope = np.array([])
|
||||
y_slope = np.array([])
|
||||
for i in range(0, len(x)):
|
||||
if x[i] > (x_start + interval):
|
||||
if div == 0:
|
||||
div = 1
|
||||
x_slope = np.append(x_slope, x[i])
|
||||
y_slope = np.append(y_slope, y_sum / div)
|
||||
div = 0
|
||||
y_sum = 0
|
||||
x_start = x[i]
|
||||
else:
|
||||
y_sum += y[i]
|
||||
div += 1
|
||||
return x_slope, y_slope
|
||||
|
||||
def main(args):
|
||||
opts, _ = getopt.getopt(args, "f:", ["file="])
|
||||
for opt, arg in opts:
|
||||
if opt in ("-f", "--file"):
|
||||
time_us, power_mw = get_time_power(arg)
|
||||
time_ms = time_us_to_ms(time_us)
|
||||
time_slope, power_slope = get_slope(time_ms, power_mw, 1000)
|
||||
print_power_diagram(time_ms, power_mw, time_slope, power_slope, get_basename(arg))
|
||||
return None
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main(sys.argv[1:]))
|
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
0
src/communiate/__init__.py
Normal file
0
src/communiate/__init__.py
Normal file
39
src/communiate/protocol.py
Normal file
39
src/communiate/protocol.py
Normal file
@ -0,0 +1,39 @@
|
||||
import paho.mqtt.client as mqtt_client
|
||||
|
||||
class mqtt:
|
||||
def __init__(self, hostname, port=1883, keepalive=60, qos=2, retain=True):
|
||||
self.__hostname = hostname
|
||||
self.__port = port
|
||||
self.__keepalive = keepalive
|
||||
self.__client = mqtt_client.Client()
|
||||
self.__is_connected = False
|
||||
self.__qos = qos
|
||||
self.__retain = retain
|
||||
|
||||
def __on_connect(self, client, data, flags, result):
|
||||
print "Connected: " + str(result)
|
||||
|
||||
def connect(self):
|
||||
if not self.__is_connected:
|
||||
self.__client.on_connect = self.__on_connect
|
||||
result = self.__client.connect(self.__hostname, self.__port, self.__keepalive)
|
||||
result |= self.__client.loop_start()
|
||||
if 0 == result:
|
||||
self.__is_connected = True
|
||||
return True
|
||||
return False
|
||||
|
||||
def disconnect(self):
|
||||
if self.__is_connected:
|
||||
self.__client.loop_stop()
|
||||
self.__client.disconnect()
|
||||
return True
|
||||
return False
|
||||
|
||||
def transmit(self, topic, payload):
|
||||
if not self.__is_connected:
|
||||
return False
|
||||
result = self.__client.publish(topic, payload, self.__qos, self.__retain)
|
||||
if result == 0:
|
||||
return True
|
||||
return False
|
0
src/engine/__init__.py
Normal file
0
src/engine/__init__.py
Normal file
24
src/engine/engine.py
Normal file
24
src/engine/engine.py
Normal file
@ -0,0 +1,24 @@
|
||||
from gpio import gpio.gpio
|
||||
|
||||
class engine:
|
||||
def __init__(self, gpio_1 = 13, gpio_2 = 19):
|
||||
self.gpio_1 = gpio(gpio_1)
|
||||
self.gpio_2 = gpio(gpio_2)
|
||||
|
||||
self.gpio_1.export()
|
||||
self.gpio_1.direction("out")
|
||||
self.gpio_2.export()
|
||||
self.gpio_2.direction("out")
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
self.gpio_1.set(0)
|
||||
self.gpio_2.set(0)
|
||||
|
||||
def run_up(self):
|
||||
self.gpio_1.set(1)
|
||||
self.gpio_2.set(0)
|
||||
|
||||
def run_down(self):
|
||||
self.gpio_1.set(0)
|
||||
self.gpio_2.set(1)
|
0
src/gate/__init__.py
Normal file
0
src/gate/__init__.py
Normal file
19
src/gate/gate_state.py
Normal file
19
src/gate/gate_state.py
Normal file
@ -0,0 +1,19 @@
|
||||
'''
|
||||
Created on Dec 19, 2016
|
||||
|
||||
@author: klaehn
|
||||
'''
|
||||
class gate_state:
|
||||
def __init__(self):
|
||||
self.possible_states = ["unknown", "open", "close", "opening", "closing"]
|
||||
self.state = "unknown"
|
||||
return
|
||||
|
||||
def set_state(self, new_state):
|
||||
if new_state in self.possible_states:
|
||||
self.state = new_state
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_state(self):
|
||||
return self.state
|
66
src/gate_guard.py
Normal file
66
src/gate_guard.py
Normal file
@ -0,0 +1,66 @@
|
||||
from communiate.protocol import mqtt
|
||||
from gate.gate_state import gate_state
|
||||
from light_data.light_data import light_data
|
||||
from light_sensor.light_sensor import light_sensor
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
||||
class gate_guard:
|
||||
def __init__(self):
|
||||
self.__light_read_delay_s = 6
|
||||
self.__consequtive_light_reads = 10
|
||||
self.__light_lx_close = 0
|
||||
self.__light_lx_open = 0
|
||||
|
||||
self.__light_sensor = light_sensor(1, 0x23)
|
||||
self.__light_data = light_data(self.__consequtive_light_reads)
|
||||
self.__comserver = mqtt("gitlab")
|
||||
self.__gate_state = gate_state()
|
||||
|
||||
def close_gate(self):
|
||||
topic = "outdoor/chickenhouse/gate"
|
||||
payload = str(time.time()) + " closing"
|
||||
self.__comserver.connect()
|
||||
self.__comserver.transmit(topic, payload)
|
||||
self.__comserver.disconnect()
|
||||
return None
|
||||
|
||||
def open_gate(self):
|
||||
topic = "outdoor/chickenhouse/gate"
|
||||
payload = str(time.time()) + " opening"
|
||||
self.__comserver.connect()
|
||||
self.__comserver.transmit(topic, payload)
|
||||
self.__comserver.disconnect()
|
||||
return None
|
||||
|
||||
def handle_gate_state(self, light_avg=0):
|
||||
if self.__gate_state.get_state() == "open":
|
||||
if light_avg <= self.__light_lx_close:
|
||||
self.close_gate()
|
||||
elif self.__gate_state.get_state() == "close":
|
||||
if light_avg >= self.__light_lx_open:
|
||||
self.open_gate()
|
||||
elif self.__gate_state.get_state() == "unknown":
|
||||
'''TODO: bring gate in a defined position'''
|
||||
|
||||
def run(self):
|
||||
|
||||
if self.__gate_state() == "unknown":
|
||||
self.handle_gate_state()
|
||||
|
||||
try:
|
||||
while True:
|
||||
light_data.push(light_sensor.read())
|
||||
light_avg = light_data.average()
|
||||
if light_avg:
|
||||
print "light average: " + str(light_avg)
|
||||
self.handle_gate_state(light_avg)
|
||||
time.sleep(self.__light_read_delay_s)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
return None
|
||||
|
||||
if __name__ == "__main__":
|
||||
gate_guard = gate_guard()
|
||||
sys.exit(gate_guard.run())
|
0
src/gpio/__init__.py
Normal file
0
src/gpio/__init__.py
Normal file
25
src/gpio/gpio.py
Normal file
25
src/gpio/gpio.py
Normal file
@ -0,0 +1,25 @@
|
||||
from os.path import islink, isfile
|
||||
|
||||
class gpio:
|
||||
def __init__(self, pin):
|
||||
self.pin = pin
|
||||
|
||||
def export(self):
|
||||
if not islink("/sys/class/gpio/gpio" + str(self.pin)):
|
||||
f = open("/sys/class/gpio/export", "w")
|
||||
f.write(str(self.pin))
|
||||
|
||||
def unexport(self):
|
||||
if islink("/sys/class/gpio/gpio" + str(self.pin)):
|
||||
f = open("/sys/class/gpio/unexport", "w")
|
||||
f.write(str(self.pin))
|
||||
|
||||
def direction(self, direction = "out"):
|
||||
if isfile("/sys/class/gpio/gpio" + str(self.pin) + "/direction"):
|
||||
f = open("/sys/class/gpio/gpio" + str(self.pin) + "/direction", "w")
|
||||
f.write(direction)
|
||||
|
||||
def set(self, value = 0):
|
||||
if isfile("/sys/class/gpio/gpio" + str(self.pin) + "/value"):
|
||||
f = open("/sys/class/gpio/gpio" + str(self.pin) + "/value", "w")
|
||||
f.write(str(value))
|
0
src/light_data/__init__.py
Normal file
0
src/light_data/__init__.py
Normal file
20
src/light_data/light_data.py
Normal file
20
src/light_data/light_data.py
Normal file
@ -0,0 +1,20 @@
|
||||
class light_data:
|
||||
def __init__(self, length=10):
|
||||
self.max = length
|
||||
self.data = []
|
||||
|
||||
def push(self, element):
|
||||
if self.max == 0:
|
||||
return False
|
||||
if len(self.data) == self.max:
|
||||
_ = self.data.pop(0)
|
||||
self.data.append(element)
|
||||
return True
|
||||
|
||||
def average(self):
|
||||
if len(self.data) != self.max:
|
||||
return None
|
||||
return sum(self.data) / self.max
|
||||
|
||||
def length(self):
|
||||
return len(self.data)
|
0
src/light_data/test/__init__.py
Normal file
0
src/light_data/test/__init__.py
Normal file
64
src/light_data/test/test_light_data.py
Normal file
64
src/light_data/test/test_light_data.py
Normal file
@ -0,0 +1,64 @@
|
||||
'''
|
||||
Created on Dec 19, 2016
|
||||
|
||||
@author: klaehn
|
||||
'''
|
||||
import unittest
|
||||
import random
|
||||
from light_data.light_data import light_data
|
||||
|
||||
|
||||
class Test(unittest.TestCase):
|
||||
|
||||
|
||||
def test_zero_space(self):
|
||||
dut = light_data(0)
|
||||
result = dut.push(10)
|
||||
self.assertFalse(result, "test_zero_space - exp: False res: True")
|
||||
|
||||
def test_max_data(self):
|
||||
buffer_size = 10
|
||||
dut = light_data(buffer_size)
|
||||
for _ in range(11):
|
||||
dut.push(random.random())
|
||||
result = dut.length()
|
||||
self.assertEqual(result, buffer_size, "test_max_data - exp: " + \
|
||||
str(buffer_size) + " res: " + str(result))
|
||||
|
||||
def test_average_min(self):
|
||||
dut = light_data(10)
|
||||
rnd = random.random()
|
||||
dut.push(rnd)
|
||||
result = dut.average()
|
||||
self.assertEqual(result, None, "test_average_min - exp: None" + \
|
||||
" res: " + str(result))
|
||||
|
||||
def test_average_mid(self):
|
||||
buffer_size = 10
|
||||
res_buffer = []
|
||||
dut = light_data(buffer_size)
|
||||
for _ in range(buffer_size / 2):
|
||||
rnd = random.random()
|
||||
dut.push(rnd)
|
||||
res_buffer.append(rnd)
|
||||
result = dut.average()
|
||||
self.assertEqual(result, None, "test_average_mid - exp: None" + \
|
||||
" res: " + str(result))
|
||||
|
||||
def test_average_max(self):
|
||||
buffer_size = 10
|
||||
res_buffer = []
|
||||
dut = light_data(buffer_size)
|
||||
for _ in range(buffer_size):
|
||||
rnd = random.random()
|
||||
dut.push(rnd)
|
||||
res_buffer.append(rnd)
|
||||
result = dut.average()
|
||||
self.assertEqual(result, sum(res_buffer) / buffer_size, \
|
||||
"test_average_mid - exp: " + \
|
||||
str(sum(res_buffer) / buffer_size) + \
|
||||
" res: " + str(result))
|
||||
|
||||
if __name__ == "__main__":
|
||||
#import sys;sys.argv = ['', 'Test.testName']
|
||||
unittest.main()
|
0
src/light_sensor/__init__.py
Normal file
0
src/light_sensor/__init__.py
Normal file
11
src/light_sensor/light_sensor.py
Normal file
11
src/light_sensor/light_sensor.py
Normal file
@ -0,0 +1,11 @@
|
||||
import smbus
|
||||
|
||||
class light_sensor:
|
||||
def __init__(self, bus = 1, addr = 0x23):
|
||||
self.bus = bus
|
||||
self.addr = addr
|
||||
self.bus = smbus.SMBus(self.bus)
|
||||
|
||||
def read(self):
|
||||
data = self.bus.read_i2c_block_data(self.addr, 0x10)
|
||||
return int(round((data[0] * 256 + data[1]) / 1.2, 0))
|
0
src/power_sensor/__init__.py
Normal file
0
src/power_sensor/__init__.py
Normal file
37
src/power_sensor/power_sensor.py
Executable file
37
src/power_sensor/power_sensor.py
Executable file
@ -0,0 +1,37 @@
|
||||
import smbus
|
||||
|
||||
class power_sensor:
|
||||
def __init__(self, bus = 1, addr = 0x40):
|
||||
self.bus = smbus.SMBus(bus)
|
||||
self.addr = addr
|
||||
|
||||
bytes = [(0x1000 >> 8) & 0xFF, 0x1000 & 0xFF]
|
||||
self.bus.write_i2c_block_data(self.addr, 0x05, bytes)
|
||||
config = 0x2000 | 0x1800 | 0x0400 | 0x0018 | 0x0007
|
||||
bytes = [(config >> 8) & 0xFF, config & 0xFF]
|
||||
self.bus.write_i2c_block_data(self.addr, 0x00, bytes)
|
||||
|
||||
def shunt_voltage_mv(self):
|
||||
data = self.bus.read_i2c_block_data(self.addr, 0x01)
|
||||
voltage = data[0] * 256 + data[1]
|
||||
return voltage * 0.01
|
||||
|
||||
def current_ma(self):
|
||||
data = self.bus.read_i2c_block_data(self.addr, 0x04)
|
||||
if(data[0] >> 7 == 1):
|
||||
current = data[0] * 256 + data[1]
|
||||
if(current & (1 << 15)):
|
||||
current = current - (1 << 16)
|
||||
else:
|
||||
current = (data[0] << 8) | (data[1])
|
||||
return current / 10
|
||||
|
||||
def power_mw(self):
|
||||
data = self.bus.read_i2c_block_data(self.addr, 0x03)
|
||||
if(data[0] >> 7 == 1):
|
||||
power = data[0] * 256 + data[1]
|
||||
if(power & (1 << 15)):
|
||||
power = power - (1 << 16)
|
||||
else:
|
||||
power = (data[0] << 8) | (data[1])
|
||||
return power / 2
|
0
src/wifi_fieldstrength/__init__.py
Normal file
0
src/wifi_fieldstrength/__init__.py
Normal file
23
src/wifi_fieldstrength/wifi_fieldstrength.py
Normal file
23
src/wifi_fieldstrength/wifi_fieldstrength.py
Normal file
@ -0,0 +1,23 @@
|
||||
|
||||
from re import match, sub
|
||||
|
||||
class wifi_fieldstrength:
|
||||
def __init__(self, name = "wlan0"):
|
||||
self._name = name
|
||||
self.wf_name = "/proc/net/wireless"
|
||||
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
def read(self):
|
||||
ret = False
|
||||
f = open(self.wf_name, "r")
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
mstr = "^" + self._name
|
||||
if match(mstr, line):
|
||||
line = sub("\s+", " ", line)
|
||||
tmp = line.split(" ")
|
||||
tmp[3] = sub("\.", "", tmp[3])
|
||||
ret = int(tmp[3])
|
||||
return ret
|
@ -1,23 +0,0 @@
|
||||
|
||||
from re import match, sub
|
||||
|
||||
class wifi_fieldstrength:
|
||||
def __init__(self, name = "wlan0"):
|
||||
self._name = name
|
||||
self.wf_name = "/proc/net/wireless"
|
||||
|
||||
def name(self):
|
||||
return self._name
|
||||
|
||||
def read(self):
|
||||
ret = False
|
||||
f = open(self.wf_name, "r")
|
||||
for line in f:
|
||||
line = line.strip()
|
||||
mstr = "^" + self._name
|
||||
if match(mstr, line):
|
||||
line = sub("\s+", " ", line)
|
||||
tmp = line.split(" ")
|
||||
tmp[3] = sub("\.", "", tmp[3])
|
||||
ret = int(tmp[3])
|
||||
return ret
|
@ -1,10 +0,0 @@
|
||||
#!/usr/bin/python2
|
||||
|
||||
from wifi_fieldstrength import wifi_fieldstrength
|
||||
from time import sleep
|
||||
|
||||
wf = wifi_fieldstrength("wlan0")
|
||||
|
||||
while(True):
|
||||
print "Field strength: " + str(wf.read()) + " dBm"
|
||||
sleep(1)
|
Loading…
Reference in New Issue
Block a user