Source code for msl.equipment

"""
Manage and connect to equipment in the laboratory.
"""
from __future__ import annotations

import re
from collections import namedtuple
from typing import ValuesView

from . import resources
from .config import Config
from .constants import Backend
from .exceptions import MSLConnectionError
from .exceptions import MSLTimeoutError
from .record_types import CalibrationRecord
from .record_types import ConnectionRecord
from .record_types import EquipmentRecord
from .record_types import MaintenanceRecord
from .record_types import MeasurandRecord

__author__ = 'Measurement Standards Laboratory of New Zealand'
__copyright__ = f'\xa9 2017 - 2023, {__author__}'
__version__ = '0.2.0.dev0'

_v = re.search(r'(\d+)\.(\d+)\.(\d+)[.-]?(.*)', __version__).groups()

version_info = namedtuple('version_info', 'major minor micro releaselevel')(int(_v[0]), int(_v[1]), int(_v[2]), _v[3])
""":obj:`~collections.namedtuple`: Contains the version information as a (major, minor, micro, releaselevel) tuple."""


[docs] def find_equipment( *, ip: list[str] | None = None, timeout: float = 2, gpib_library: str = '', include_sad: bool = True) -> ValuesView: """Returns information about equipment that are available. :param ip: The IP address(es) on the local computer to use to search for network devices. If not specified, uses all network interfaces. :param timeout: The maximum number of seconds to wait for a reply from a network device. :param gpib_library: The path to a GPIB library file. The default file that is used is platform dependent. If a GPIB library cannot be found, GPIB devices will not be searched for. :param include_sad: Whether to scan all secondary GPIB addresses. :return: The information about the devices that were found. """ from threading import Thread from msl.equipment.connection_gpib import find_listeners from msl.equipment.connection_prologix import find_prologix from msl.equipment.dns_service_discovery import find_lxi from msl.equipment.utils import logger from msl.equipment.vxi11 import find_vxi11 from serial.tools.list_ports import comports class NetworkThread(Thread): def __init__(self, target): """Allows for capturing the return value from the target function.""" self.devices = {} def function(): self.devices = target(ip=ip, timeout=timeout) super(NetworkThread, self).__init__(target=function) logger.debug('start finding devices') devices = {} threads = [ NetworkThread(target=find_lxi), NetworkThread(target=find_vxi11), NetworkThread(target=find_prologix), ] for thread in threads: thread.start() num_found = 0 logger.debug('find ASRL ports') for port, desc, _ in sorted(comports()): num_found += 1 addresses = [] if port.startswith('COM'): addresses.append(port) elif port.startswith('/dev/'): addresses.append(f'ASRL{port}') devices[port] = { 'type': 'Serial', 'addresses': addresses, 'description': desc } Config.GPIB_LIBRARY = gpib_library gpib = find_listeners(include_sad=include_sad) if gpib: num_found += len(gpib) devices['gpib'] = { 'type': 'GPIB', 'addresses': gpib, 'description': '' } for thread in threads: thread.join() for thread in threads: for ipv4, device in thread.devices.items(): description = device.get('description', 'Unknown device') if ipv4 not in devices: num_found += 1 devices[ipv4] = { 'type': 'Network', 'addresses': device['addresses'], 'description': description, } if not description.startswith('Prologix'): # Prologix ENET-GPIB does not have a webserver devices[ipv4]['webserver'] = device['webserver'] else: if (devices[ipv4]['description'] == 'Unknown device' and description != 'Unknown device'): devices[ipv4]['description'] = description for address in device['addresses']: if address not in devices[ipv4]['addresses']: devices[ipv4]['addresses'].append(address) logger.debug('found %d devices', num_found) return devices.values()
def _print_stdout(equipment: ValuesView) -> None: """Print a summary of all equipment that are available to connect to.""" devices = sorted(equipment, key=lambda v: v['description']) types = sorted(set(d['type'] for d in devices)) for typ in types: print(f'{typ} Devices') for device in devices: if device['type'] != typ: continue if typ == 'GPIB': print(' ' + '\n '.join(device['addresses'])) elif typ == 'Serial': print(f" {device['addresses'][0]} [{device['description']}]") elif typ == 'Network': print(f" {device['description']}") if 'webserver' in device: print(f" webserver {device['webserver']}") if device['addresses']: print(f' ' + '\n '.join(sorted(device['addresses']))) def _find_cli() -> None: """Console script entry point to find equipment.""" import argparse import json import sys parser = argparse.ArgumentParser( add_help=False, description='Find equipment that can be connected to.', formatter_class=argparse.RawTextHelpFormatter, ) parser.add_argument( '-h', '--help', action='help', help='Show this help message and exit.', default=argparse.SUPPRESS, ) parser.add_argument( '--ip', nargs='*', help='The IP address(es) on the local computer to search for network\n' 'devices. If not specified, uses all network interfaces.' ) parser.add_argument( '-t', '--timeout', type=float, default=2, help='Maximum number of seconds to wait for a reply from a network\n' 'device. Default is 2 seconds.' ) parser.add_argument( '-g', '--gpib-library', default='', help='The path to a GPIB library file. The default file that is used\n' 'is platform dependent. If a GPIB library cannot be found, GPIB\n' 'devices will not be searched for.' ) parser.add_argument( '--debug', action='store_true', default=False, help='Whether to show DEBUG log messages.' ) parser.add_argument( '--ignore-sad', action='store_true', default=False, help='Do not scan for secondary GPIB addresses.' ) parser.add_argument( '--json', action='store_true', default=False, help='Print the results as a JSON string.' ) parsed = parser.parse_args(sys.argv[1:]) if parsed.debug: import logging logging.basicConfig( level=logging.DEBUG, format='%(asctime)s.%(msecs)03d [%(levelname)s] %(name)s %(message)s', datefmt='%H:%M:%S' ) equipment = find_equipment( ip=parsed.ip, timeout=parsed.timeout, gpib_library=parsed.gpib_library, include_sad=not parsed.ignore_sad, ) if parsed.json: print(json.dumps(list(equipment), indent=2)) else: _print_stdout(equipment)