Source code for msl.equipment.database

"""
Load equipment and connection records from :ref:`Databases <database-formats>`.
"""
from __future__ import annotations

import ast
import codecs
import json
import os
import re
from xml.etree import ElementTree

from msl.io import read_table_excel

from . import constants
from .record_types import ConnectionRecord
from .record_types import EquipmentRecord
from .utils import convert_to_enum
from .utils import convert_to_primitive
from .utils import logger


[docs] class Database: def __init__(self, path): """Create :class:`.EquipmentRecord`'s and :class:`.ConnectionRecord`'s from :ref:`Databases <database-formats>` that are specified in a :ref:`configuration-file`. This class should be accessed through the :meth:`~.config.Config.database` method after a :class:`~.config.Config` object has been created. Parameters ---------- path : :class:`str` The path to an XML :ref:`configuration-file`. Raises ------ OSError If `path` does not exist or if the :ref:`configuration-file` is invalid. AttributeError If an ``<equipment>`` XML tag is specified in the :ref:`configuration-file` and it does not uniquely identify an equipment record in an :ref:`equipment-database`. ValueError If an :attr:`~.EquipmentRecord.alias` has been specified multiple times for the same :class:`~.EquipmentRecord` or if the name of the Sheet in an Excel spreadsheet is invalid. """ logger.debug('Loading databases from %s', path) try: root = ElementTree.parse(path).getroot() except ElementTree.ParseError as err: parse_err = str(err) # want to raise OSError not ParseError else: parse_err = None if parse_err: raise OSError(parse_err) self._config_path = path # create a dictionary of all ConnectionRecord's self._connection_records = {} easy_names = ('address', 'backend', 'manufacturer', 'model', 'serial') for connections in root.iterfind('connections'): for element in connections.iterfind('connection'): data = self._read(element) if isinstance(data, tuple): # then the information is stored as a table of rows and columns header, rows = self._read(element) index_map = self._make_index_map(header, ConnectionRecord.__slots__) for row in rows: if not self._is_row_length_okay(row, header): continue key = self._make_key(row, self._connection_records, element, index_map=index_map) if not key: continue kwargs = {} for name in easy_names: kwargs[name] = row[index_map[name]] kwargs['properties'] = {} props = row[index_map['properties']] if props: for item in props.split(';'): s = item.split('=') if len(s) != 2: continue kwargs['properties'][s[0].strip()] = convert_to_primitive(s[1]) self._connection_records[key] = ConnectionRecord(**kwargs) elif isinstance(data, dict): # loaded a json or xml file for record in data['connection_records']: key = self._make_key(record, self._connection_records, element) if not key: continue self._connection_records[key] = ConnectionRecord(**record) else: assert False, 'Not a tuple or dict' # create a dictionary of EquipmentRecord's self._equipment_records = {} for registers in root.iterfind('registers'): for register in registers.iterfind('register'): team = register.attrib.get('team', '') data = self._read(register) if isinstance(data, tuple): # then the information is stored as a table of rows and columns header, rows = data index_map = self._make_index_map(header, EquipmentRecord.__slots__) # prepare the user_defined list temp = register.attrib.get('user_defined', []) user_defined = [] index_map_user_defined = {} if temp: temp = [t.strip().lower().replace(' ', '_') for t in temp.split(',') if t.strip()] for name in temp: if name in EquipmentRecord.__slots__: logger.warning('The "user_defined" parameter %r is already an ' 'EquipmentRecord attribute', name) else: user_defined.append(name) if user_defined: index_map_user_defined = self._make_index_map(header, user_defined) for row in rows: if not self._is_row_length_okay(row, header): continue key = self._make_key(row, self._equipment_records, register, index_map=index_map) if not key: continue kwargs = {'team': team} # find the corresponding ConnectionRecord (if it exists) try: kwargs['connection'] = self._connection_records[key] except KeyError: pass else: alias = kwargs['connection'].properties.pop('alias', None) if alias: kwargs['alias'] = alias for name in EquipmentRecord.__slots__: try: value = row[index_map[name]] except KeyError: continue kwargs[name] = value for name in user_defined: try: s = row[index_map_user_defined[name]] except KeyError: pass else: kwargs[name] = convert_to_primitive(s) self._equipment_records[key] = EquipmentRecord(**kwargs) elif isinstance(data, dict): # loaded a json or xml file team = data.get('team', team) for record in data['equipment_records']: key = self._make_key(record, self._equipment_records, register) if not key: continue record['team'] = team # find the corresponding ConnectionRecord (if it exists) try: record['connection'] = self._connection_records[key] except KeyError: pass else: alias = record['connection'].properties.pop('alias', None) if alias: record['alias'] = alias self._equipment_records[key] = EquipmentRecord(**record) else: assert False, 'Not a tuple or dict' # create a dictionary of all the <equipment> tags self._equipment_using = {} for element in root.findall('equipment'): # check if an alias attribute was defined in the configuration file try: alias = element.attrib['alias'] except KeyError: alias = None else: del element.attrib['alias'] # search for the equipment in the database equipment = self.records(**element.attrib) if len(equipment) == 0: raise AttributeError('No equipment record found with attributes {}'.format(element.attrib)) if len(equipment) > 1: raise AttributeError('The equipment specified is not unique. There are {} equipment ' 'records for {}'.format(len(equipment), element.attrib)) equip = equipment[0] # the following is all about checking/getting the alias that associates with `equip` if alias is not None: if equip.alias and alias != equip.alias: raise ValueError('Multiple aliases set for {}: {!r} and {!r}'.format(equip, alias, equip.alias)) elif alias is None and equip.alias: alias = equip.alias else: if equip.model: alias = equip.model elif equip.manufacturer: alias = equip.manufacturer elif equip.connection.backend == constants.Backend.MSL: alias = equip.connection.interface.name else: alias = 'equipment' # if this alias already exists as a dictionary key then append a unique number to the alias if alias in self._equipment_using: n = sum([1 for key in self._equipment_using if key.startswith(alias)]) alias += '({})'.format(n + 1) equip.alias = alias self._equipment_using[alias] = equip @property def equipment(self): """:class:`dict`: :class:`.EquipmentRecord`\'s that were listed as ``<equipment>`` XML tags in the :ref:`configuration-file`. """ return self._equipment_using @property def path(self): """:class:`str`: The path to the :ref:`configuration-file`. """ return self._config_path
[docs] def connections(self, **kwargs): """Search the :ref:`connections-database` to find all :class:`.ConnectionRecord`\'s that match the specified criteria. Parameters ---------- **kwargs The argument names can be any of the :class:`.ConnectionRecord` property names or a ``flags`` argument of type :class:`int` for performing the search, see :func:`re.search`. For testing regex expressions online you can use `this <https://pythex.org/>`_ website. If a `kwarg` is ``properties`` then the value must be a :class:`dict`. See the examples below. Examples -------- * `connections()` :math:`\\rightarrow` a list of all ConnectionRecord's * `connections(manufacturer='Keysight')` :math:`\\rightarrow` a list of all ConnectionRecord's that have Keysight as the manufacturer * `connections(manufacturer='Agilent|Keysight')` :math:`\\rightarrow` a list of all ConnectionRecord's that are from Agilent or Keysight * `connections(manufacturer='H.*P')` :math:`\\rightarrow` a list of all ConnectionRecord's that have Hewlett Packard (or HP) as the manufacturer * `connections(manufacturer='Agilent', model='^34')` :math:`\\rightarrow` a list of all ConnectionRecord's that have Agilent as the manufacturer and a model number beginning with '34' * `connections(interface=Interface.SERIAL)` :math:`\\rightarrow` a list of all ConnectionRecord's that use SERIAL for the connection bus * `connections(interface='SDK')` :math:`\\rightarrow` a list of all ConnectionRecord's that use the manufacturers SDK to control the equipment * `connections(backend=Backend.PyVISA)` :math:`\\rightarrow` a list of all ConnectionRecord's that use PyVISA as the backend * `connections(backend='MSL')` :math:`\\rightarrow` a list of all ConnectionRecord's that use MSL as the backend * `connections(properties={'baud_rate': 115200})` :math:`\\rightarrow` a list of all ConnectionRecord's that specify a baud rate equal to 115200 in the Properties field Returns ------- :class:`list` of :class:`.ConnectionRecord` The connection records that match the search criteria. Raises ------ NameError If the name of an input argument is not a :class:`.ConnectionRecord` property name or ``flags``. """ flags = int(kwargs.pop('flags', 0)) # used by re.search for name in kwargs: if name not in ConnectionRecord.__slots__: raise NameError('Invalid argument name {!r} for a {}'.format(name, ConnectionRecord.__name__)) return [r for r in self._connection_records.values() if self._search(r, kwargs, flags)]
[docs] def records(self, **kwargs): """Search the :ref:`equipment-database` to find all :class:`.EquipmentRecord`\'s that match the specified criteria. Parameters ---------- **kwargs The argument names can be any of the :class:`.EquipmentRecord` property names or a ``flags`` argument of type :class:`int` for performing the search, see :func:`re.search`. For testing regex expressions online you can use `this <https://pythex.org/>`_ website. If a `kwarg` is ``connection`` then the value will be used to test which :class:`.EquipmentRecord`\'s have a :attr:`~.EquipmentRecord.connection` value that is either :data:`None` or :class:`.ConnectionRecord`. See the examples below. Examples -------- * `records()` :math:`\\rightarrow` a list of all EquipmentRecord's * `records(manufacturer='Agilent')` :math:`\\rightarrow` a list of all EquipmentRecord's that are from Agilent * `records(manufacturer='Agilent|Keysight')` :math:`\\rightarrow` a list of all EquipmentRecord's that are from Agilent or Keysight * `records(manufacturer='Agilent', model='3458A')` :math:`\\rightarrow` a list of all EquipmentRecords that are from Agilent and that have the model number 3458A * `records(manufacturer='Agilent', model='3458A', serial='MY45046470')` :math:`\\rightarrow` a list of only one EquipmentRecord (if the equipment record exists, otherwise an empty list) * `records(manufacturer=r'H.*P')` :math:`\\rightarrow` a list of all EquipmentRecord's that have Hewlett Packard (or HP) as the manufacturer * `records(description='I-V Converter')` :math:`\\rightarrow` a list of all EquipmentRecords that contain 'I-V Converter' in the description field * `records(connection=True)` :math:`\\rightarrow` a list of all EquipmentRecords that can be connected to Returns ------- :class:`list` of :class:`.EquipmentRecord` The equipment records that match the search criteria. Raises ------ NameError If the name of an input argument is not an :class:`.EquipmentRecord` property name or ``flags``. """ flags = int(kwargs.pop('flags', 0)) # used by re.search for name in kwargs: if name not in EquipmentRecord.__slots__: raise NameError('Invalid argument name {!r} for an {}'.format(name, EquipmentRecord.__name__)) return [r for r in self._equipment_records.values() if self._search(r, kwargs, flags)]
def _read(self, element): """Read the allowed database file types.""" path = element.findtext('path') if path is None: raise OSError('You must create a <path> </path> element in {!r} ' 'specifying where to find the database'.format(self._config_path)) logger.debug('Reading database file %s', path) ext = os.path.splitext(path)[1].lower() # also check if the path is a relative path (relative to the XML file path) relative_path = os.path.join(os.path.dirname(self._config_path), path) for p in [path, relative_path]: try: if ext in ('.xls', '.xlsx'): encoding = element.attrib.get('encoding') dset = read_table_excel(p, sheet=element.findtext('sheet'), encoding=encoding) if dset.ndim == 1: return dset.metadata.header, [dset.data] return dset.metadata.header, dset.data elif ext in ('.csv', '.txt'): delimiter = ',' if ext == '.csv' else '\t' encoding = element.attrib.get('encoding', 'utf-8') with codecs.open(p, mode='r', encoding=encoding) as fp: header = [val for val in fp.readline().split(delimiter)] rows = [[val.strip() for val in line.split(delimiter)] for line in fp.readlines() if line.strip()] return header, rows elif ext == '.json': encoding = element.attrib.get('encoding', 'utf-8') with codecs.open(p, mode='r', encoding=encoding) as fp: return json.load(fp) elif ext == '.xml': return self._read_xml(p) else: raise OSError('Unsupported equipment-registry database file {!r}'.format(p)) except (IOError, OSError) as err: if str(err).startswith('Unsupported equipment-registry'): raise raise OSError('Cannot find the database {!r}'.format(path)) def _read_xml(self, path): """Read an XML database.""" def value(item): if item.tag.endswith('date'): return item.text try: return ast.literal_eval(item.text) except: return item.text def get(element, tag, default=None): e = element.find(tag) if tag else element if e is None: return default if len(e) == 0: return default if e.text is None else value(e) return dict((item.tag, get(item, None) if len(item) > 0 else value(item)) for item in e) root = ElementTree.parse(path).getroot() _dict = {'team': get(root, 'team', default='')} equipment_records = [] for er in root.iterfind('.//EquipmentRecord'): record = { 'calibrations': [ { 'report_date': get(cr, 'report_date', default=None), 'calibration_date': get(cr, 'calibration_date', default=None), 'report_number': get(cr, 'report_number', default=''), 'calibration_cycle': get(cr, 'calibration_cycle', default=0), 'measurands': [ { 'type': get(mr, 'type', default=''), 'unit': get(mr, 'unit', default=''), 'conditions': get(mr, 'conditions'), 'calibration': get(mr, 'calibration'), } for mr in cr.iterfind('.//MeasurandRecord') ] } for cr in er.iterfind('.//CalibrationRecord') ], 'category': get(er, 'category', default=''), 'description': get(er, 'description', default=''), 'is_operable': get(er, 'is_operable', default=False), 'maintenances': [ { 'date': get(mr, 'date', default=None), 'comment': get(mr, 'comment', default=''), } for mr in er.iterfind('.//MaintenanceRecord') ], 'manufacturer': get(er, 'manufacturer', default=''), 'model': get(er, 'model', default=''), 'serial': get(er, 'serial', default=''), 'unique_key': get(er, 'unique_key', default=''), } # add the user_defined kwargs for child in er: if child.tag not in record: record[child.tag] = value(child) equipment_records.append(record) _dict['equipment_records'] = equipment_records _dict['connection_records'] = [ dict((child.tag, get(child, None, default='')) for child in cr) for cr in root.iterfind('.//ConnectionRecord') ] return _dict def _make_index_map(self, header, field_names): """Determine the column index in the header that the field_names are located in""" index_map = {} h = [val.strip().lower().replace(' ', '_') for val in header] for index, label in enumerate(h): for name in field_names: if name not in index_map and name in label: index_map[name] = index break return index_map def _make_key(self, obj, records, element, index_map=None): """Make a new Manufacturer|Model|Serial key.""" if index_map is not None: manufacturer = obj[index_map['manufacturer']] model = obj[index_map['model']] serial = obj[index_map['serial']] else: manufacturer = obj.get('manufacturer', '') model = obj.get('model', '') serial = obj.get('serial', '') key = '{}|{}|{}'.format(manufacturer, model, serial) if key in records: logger.error('Manufacturer|Model|Serial is not unique -> %s in %r', key, element.findtext('path')) return '' return key def _is_key_unique(self, key, dictionary, element): """Returns whether the dictionary key is unique""" if key in dictionary: logger.error('Manufacturer|Model|Serial is not unique -> %s in %r', key, element.findtext('path')) return False return True def _is_row_length_okay(self, row, header): """Check if the row and the header have the same length""" if not len(row) == len(header): logger.error('len(row) [%d] != len(header) [%d] -> row: %s', len(row), len(header), row) return False return True def _search(self, record, kwargs, flags): """Check if the kwargs match a database record""" for key, value in kwargs.items(): if key == 'backend' or key == 'interface': enum = constants.Backend if key == 'backend' else constants.Interface val = getattr(record, key) if isinstance(value, int): if convert_to_enum(value, enum) != val: return False else: x = [] for s in value.split('|'): try: x.append(convert_to_enum(s.strip(), enum) == val) except ValueError: pass if not any(x): return False elif key == 'connection': if bool(value): # then want equipment records with a connection if record.connection is None: return False else: if record.connection is not None: return False elif key == 'properties': if not isinstance(value, dict): raise TypeError('The "properties" value must be a dict, got {}'.format(type(value))) for k, v in value.items(): if k not in record.properties: return False if v != record.properties[k]: return False elif key == 'is_operable': return getattr(record, key) is bool(value) else: if not bool(re.search(value, getattr(record, key), flags)): return False return True