attenuation-control: Add scripts to control the attenuation

Xmlrpc client/server application to control the digital attenuators
installed in the lava testbed.

Implements AR_VUL-64.

Signed-off-by: Thomas Klaehn <thomas.klaehn@u-blox.com>
This commit is contained in:
Thomas Klaehn
2018-04-17 10:23:19 +02:00
parent 3757cb3efd
commit b1e66c2af1
13 changed files with 638 additions and 0 deletions

0
source/__init__.py Normal file
View File

View File

@@ -0,0 +1,65 @@
#!/usr/bin/python3
import argparse
import logging
import sys
from xmlrpc.client import ServerProxy
DEFAULT_PORT = "8231"
logging.basicConfig(level=logging.INFO, format='%(levelname)s %(message)s')
LOG = logging.getLogger("att_ctrl_client")
def parse_args():
"""Command line argument parser"""
parser = argparse.ArgumentParser()
parser.add_argument("host", help="Hostname or ip address the server is \
serving on. (e.g. 'attctrl:8231 or 192.168.1.16'")
parser.add_argument('attenuator', help="Attenuator to be set [0..7]")
parser.add_argument('attenuation', help="Attenuation to be set [0.0 .. 31.5]")
return parser.parse_args()
def host_port(data):
'''Determine hostname/address and port.
Args:
data (str): Hostname/address with optional port (e.g. 'localhost:1234'
or '192.168.0.1').
Returns:
str: Hostname/address
str: Port
'''
tmp = data.split(":")
port = DEFAULT_PORT
host = tmp[0]
if len(tmp) > 1:
port = tmp[1]
return host, port
def main():
args = parse_args()
host, port = host_port(args.host)
client = ServerProxy('http://{}:{}'.format(host, port))
try:
dev = int(args.attenuator)
except ValueError:
LOG.error("attenuator needs to be convertible into int.")
return 1
try:
val = float(args.attenuation)
except ValueError:
LOG.error("attenuation needs to be convertible into float.")
return 1
ret = client.set_attenuation(dev, val)
LOG.info("xmlrpcclient: set_attenuation result: {}.".format(ret))
return ret
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,145 @@
#!/usr/bin/python3
import argparse
import logging
import sys
from xmlrpc.server import SimpleXMLRPCServer
import attenuator
DEFAULT_PORT = "8231"
logging.basicConfig(level=logging.INFO, format='%(levelname)s %(message)s')
LOG = logging.getLogger("att_ctrl_srv")
# Array of attenuator devices and their i2c bus and gpio pin numbers.
attenuators = [
attenuator.Attenuator(
attenuator.Pin(0x20, 1),
attenuator.Pin(0x20, 2),
attenuator.Pin(0x20, 3),
attenuator.Pin(0x20, 4),
attenuator.Pin(0x20, 5),
attenuator.Pin(0x20, 6)
),
attenuator.Attenuator(
attenuator.Pin(0x20, 7),
attenuator.Pin(0x20, 8),
attenuator.Pin(0x20, 9),
attenuator.Pin(0x20, 10),
attenuator.Pin(0x20, 11),
attenuator.Pin(0x20, 12)
),
attenuator.Attenuator(
attenuator.Pin(0x20, 13),
attenuator.Pin(0x20, 14),
attenuator.Pin(0x20, 15),
attenuator.Pin(0x20, 16),
attenuator.Pin(0x21, 1),
attenuator.Pin(0x21, 2)
),
attenuator.Attenuator(
attenuator.Pin(0x21, 3),
attenuator.Pin(0x21, 4),
attenuator.Pin(0x21, 5),
attenuator.Pin(0x21, 6),
attenuator.Pin(0x21, 7),
attenuator.Pin(0x21, 8)
),
attenuator.Attenuator(
attenuator.Pin(0x21, 9),
attenuator.Pin(0x21, 10),
attenuator.Pin(0x21, 11),
attenuator.Pin(0x21, 12),
attenuator.Pin(0x21, 13),
attenuator.Pin(0x21, 14)
),
attenuator.Attenuator(
attenuator.Pin(0x21, 15),
attenuator.Pin(0x21, 16),
attenuator.Pin(0x22, 1),
attenuator.Pin(0x22, 2),
attenuator.Pin(0x22, 3),
attenuator.Pin(0x22, 4)
),
attenuator.Attenuator(
attenuator.Pin(0x22, 5),
attenuator.Pin(0x22, 6),
attenuator.Pin(0x22, 7),
attenuator.Pin(0x22, 8),
attenuator.Pin(0x22, 9),
attenuator.Pin(0x22, 10)
),
attenuator.Attenuator(
attenuator.Pin(0x22, 11),
attenuator.Pin(0x22, 12),
attenuator.Pin(0x22, 13),
attenuator.Pin(0x22, 14),
attenuator.Pin(0x22, 15),
attenuator.Pin(0x22, 16)
),
]
def parse_args():
"""Command line argument parser"""
parser = argparse.ArgumentParser()
parser.add_argument("host", help="Hostname or ip address the server is \
serving on. (e.g. 'attctrl:8231 or 192.168.1.16'")
return parser.parse_args()
def host_port(data):
'''Determine hostname/address and port.
Args:
data (str): Hostname/address with optional port (e.g. 'localhost:1234'
or '192.168.0.1').
Returns:
str: Hostname/address
str: Port
'''
tmp = data.split(":")
port = DEFAULT_PORT
host = tmp[0]
if len(tmp) > 1:
port = tmp[1]
return host, port
def set_attenuation(attenuator, attenuation):
"""Set the attenuation of the choosen attenuator
Args:
attenuator (int): Index of the attenuator device [0..8]
attenuation (float): Value of attenuation to be set [0.0..31.5]
"""
try:
dev = int(attenuator)
except ValueError:
LOG.error("attenuator needs to be convertible into int.")
return 1
try:
val = float(attenuation)
except ValueError:
LOG.error("attenuation needs to be convertible into float.")
return 1
attenuators[dev].set_attenuation(val)
LOG.info("xmlrpcserver: set attenuation of attenuator {} to {}".format(attenuator, attenuation))
return 0
def main():
args = parse_args()
host, port = host_port(args.host)
server = SimpleXMLRPCServer((host, int(port)))
server.register_function(set_attenuation)
LOG.info("xmlrpcserver: registering 'set_attenuation'.")
try:
LOG.info("xmlrpcserver: start serving on %s:%s.", str(host), str(port))
server.serve_forever()
except KeyboardInterrupt:
LOG.info("xmlrpcserver: Keyboard interrupt catched - shutting down.")
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,115 @@
import os
import sys
try:
from IOPi import IOPi
except ImportError:
sys.path.append('mock')
try:
from IOPi import IOPi
except ImportError:
raise ImportError("IOPi not found")
class Pin(object):
"""Abstraction of pin object. Every pin object contains a gpio number and a i2c bus address."""
bus_address = None
pin_number = None
bus = None
def __init__(self, bus_address, pin_number):
"""Constructor.
Args:
bus_address (int): i2c bus address [0x20..0x23].
pin_number (int): Gpio pin number.
"""
self.bus_address = bus_address
self.pin_number = pin_number
class Attenuator(object):
"""Abstraction of ANALOG DEVICES HMC425A digital attenuator"""
pin = None
def __init__(self,pin_0_5db, pin_1_0db, pin_2_0db, pin_4_0db, pin_8_0db, pin_16_0db):
"""Constructor.
Args:
pin_0_5db (Pin): Pin object to control the 0.5 db gpio
pin_1_0db (Pin): Pin object to control the 1.0 db gpio
pin_2_0db (Pin): Pin object to control the 2.0 db gpio
pin_4_0db (Pin): Pin object to control the 4.0 db gpio
pin_8_0db (Pin): Pin object to control the 8.0 db gpio
pin_16_0db (Pin): Pin object to control the 16.0 db gpio
"""
if type(pin_0_5db) is not Pin:
raise TypeError("Wrong type for pin_0_5db")
if type(pin_1_0db) is not Pin:
raise TypeError("Wrong type for pin_1_0db")
if type(pin_2_0db) is not Pin:
raise TypeError("Wrong type for pin_2_0db")
if type(pin_4_0db) is not Pin:
raise TypeError("Wrong type for pin_4_0db")
if type(pin_8_0db) is not Pin:
raise TypeError("Wrong type for pin_8_0db")
if type(pin_16_0db) is not Pin:
raise TypeError("Wrong type for pin_16_0db")
if pin_0_5db.bus_address not in range(0x20, 0x24):
raise IndexError('pin_0_0db i2c bus address index out of range')
if pin_1_0db.bus_address not in range(0x20, 0x24):
raise IndexError('pin_1_0db i2c bus address index out of range')
if pin_2_0db.bus_address not in range(0x20, 0x24):
raise IndexError('pin_2_0db i2c bus address index out of range')
if pin_4_0db.bus_address not in range(0x20, 0x24):
raise IndexError('pin_4_0db i2c bus address index out of range')
if pin_8_0db.bus_address not in range(0x20, 0x24):
raise IndexError('pin_8_0db i2c bus address index out of range')
if pin_16_0db.bus_address not in range(0x20, 0x24):
raise IndexError('pin_16_0db i2c bus address index out of range')
if pin_0_5db.pin_number not in range(1, 17):
raise IndexError('pin_0_5db pin number index out of range')
if pin_1_0db.pin_number not in range(1, 17):
raise IndexError('pin_1_0db pin number index out of range')
if pin_2_0db.pin_number not in range(1, 17):
raise IndexError('pin_2_0db pin number index out of range')
if pin_4_0db.pin_number not in range(1, 17):
raise IndexError('pin_4_0db pin number index out of range')
if pin_8_0db.pin_number not in range(1, 17):
raise IndexError('pin_8_0db pin number index out of range')
if pin_16_0db.pin_number not in range(1, 17):
raise IndexError('pin_16_0db pin number index out of range')
self.pin = [pin_0_5db, pin_1_0db, pin_2_0db, pin_4_0db, pin_8_0db, pin_16_0db]
for pin in self.pin:
pin.bus = IOPi(pin.bus_address)
pin.bus.set_pin_direction(pin.pin_number, 0)
pin.bus.write_pin(pin.pin_number, 0)
def set_attenuation(self, attenuation):
"""Set attenuation.
Args:
attenuation (float): Value of attenuation to be set [0..31.5]
"""
if attenuation < 0:
attenuation = 0
elif attenuation > 31.5:
attenuation = 31.5
max_single_value = 16 * 2
attenuation *= 2
switch_pin = [1 for i in range(6)]
index = 5
while index >= 0:
if attenuation - max_single_value >= 0:
attenuation -= max_single_value
switch_pin[index] = 0
max_single_value /= 2
index -= 1
for i in range(0, len(switch_pin)):
bus = IOPi(self.pin[i].bus_address)
bus.write_pin(self.pin[i].pin_number, switch_pin[i])