Source code for msl.equipment

"""
Manage and connect to equipment in the laboratory.
"""
import re
from collections import namedtuple

from msl.equipment import resources
from msl.equipment.config import Config
from msl.equipment.constants import Backend
from msl.equipment.exceptions import MSLConnectionError
from msl.equipment.exceptions import MSLTimeoutError
from msl.equipment.record_types import CalibrationRecord
from msl.equipment.record_types import ConnectionRecord
from msl.equipment.record_types import EquipmentRecord
from msl.equipment.record_types import MaintenanceRecord
from msl.equipment.record_types import MeasurandRecord

__author__ = 'Measurement Standards Laboratory of New Zealand'
__copyright__ = '\xa9 2017 - 2023, ' + __author__
__version__ = '0.2.0.dev0'

_v = re.search(r'(\d+)\.(\d+)\.(\d+)[.-]?(.*)', __version__).groups()

version_info = namedtuple('version_info', 'major minor micro releaselevel')(int(_v[0]), int(_v[1]), int(_v[2]), _v[3])
""":obj:`~collections.namedtuple`: Contains the version information as a (major, minor, micro, releaselevel) tuple."""


[docs]def list_resources(hosts=None, timeout=2): """Returns a dictionary of all equipment that are available to connect to. Parameters ---------- hosts : :class:`list` of :class:`str`, optional The IP address(es) on the computer to use to find network devices. If not specified, then find devices on all network interfaces. timeout : :class:`float`, optional The maximum number of seconds to wait for a reply from a network device. Returns ------- :class:`dict` The information about the devices that were found. """ from threading import Thread from msl.equipment.connection_prologix import find_prologix from msl.equipment.dns_service_discovery import find_lxi from msl.equipment.vxi11 import find_vxi11 from serial.tools.list_ports import comports class NetworkThread(Thread): def __init__(self, target): """Allows for capturing the return value from the target function.""" self.devices = {} def function(): self.devices = target(hosts=hosts, timeout=timeout) super(NetworkThread, self).__init__(target=function) threads = [ NetworkThread(target=find_lxi), NetworkThread(target=find_vxi11), NetworkThread(target=find_prologix), ] for thread in threads: thread.start() for thread in threads: thread.join() devices = {} for thread in threads: for ipv4, device in thread.devices.items(): description = device.get('description', 'Unknown device') if ipv4 not in devices: devices[ipv4] = {} devices[ipv4]['addresses'] = set(device['addresses']) devices[ipv4]['description'] = description if not description.startswith('Prologix'): # Prologix ENET-GPIB does not have a webserver devices[ipv4]['webserver'] = device['webserver'] else: devices[ipv4]['description'] = description for address in device['addresses']: devices[ipv4]['addresses'].add(address) for port, desc, _ in sorted(comports()): addresses = set() if port.startswith('COM'): addresses.add(port) match = re.search(r'(\d+)', port) if match: addresses.add('ASRL{}::INSTR'.format(match.group(1))) elif port.startswith('/dev/'): addresses.add('ASRL{}'.format(port)) addresses.add('ASRL{}::INSTR'.format(port)) devices[port] = {} devices[port]['addresses'] = addresses devices[port]['description'] = desc return devices.values()