Source code for msl.equipment.factory

"""
Establish a connection to the equipment.
"""
from __future__ import annotations

from .config import Config
from .connection_demo import ConnectionDemo
from .connection_gpib import ConnectionGPIB
from .connection_nidaq import ConnectionNIDAQ
from .connection_prologix import ConnectionPrologix
from .connection_pyvisa import ConnectionPyVISA
from .connection_sdk import ConnectionSDK
from .connection_serial import ConnectionSerial
from .connection_socket import ConnectionSocket
from .connection_tcpip_hislip import ConnectionTCPIPHiSLIP
from .connection_tcpip_vxi11 import ConnectionTCPIPVXI11
from .connection_zeromq import ConnectionZeroMQ
from .constants import Backend
from .constants import Interface
from .exceptions import ResourceClassNotFound
from .resources import find_resource_class
from .utils import logger

_interface_map = {
    Interface.SERIAL: ConnectionSerial,
    Interface.SOCKET: ConnectionSocket,
    Interface.PROLOGIX: ConnectionPrologix,
    Interface.TCPIP_VXI11: ConnectionTCPIPVXI11,
    Interface.TCPIP_HISLIP: ConnectionTCPIPHiSLIP,
    Interface.ZMQ: ConnectionZeroMQ,
    Interface.GPIB: ConnectionGPIB,
}


[docs] def connect(record, demo=None): """Factory function to establish a connection to the equipment. Parameters ---------- record : :class:`~.record_types.EquipmentRecord` A record from an :ref:`equipment-database`. demo : :class:`bool`, optional Whether to simulate a connection to the equipment by opening a connection in demo mode. This allows you to test your code if the equipment is not physically connected to a computer. If :data:`None` then the `demo` value is determined from the :attr:`~.config.Config.DEMO_MODE` attribute. Returns ------- A :class:`~.connection.Connection` subclass. """ def _connect(_record): """Processes a single EquipmentRecord object""" def _raise(name): raise ValueError(f'The connection {name} has not been set for {_record}') conn = _record.connection if conn is None: _raise('object') if not conn.address and conn.backend != Backend.NIDAQ: _raise('address') if conn.backend == Backend.UNKNOWN: _raise('backend') cls = None if conn.backend == Backend.MSL: if conn.interface == Interface.NONE: _raise('interface') cls = find_resource_class(conn) if cls is None: if conn.interface == Interface.SDK: raise ResourceClassNotFound(record) cls = _interface_map.get(conn.interface, None) if cls is None: raise NotImplementedError(f'The {conn.interface.name!r} interface ' f'has not be implemented yet') elif conn.backend == Backend.PyVISA: if demo: cls = ConnectionPyVISA.resource_class(conn) else: cls = ConnectionPyVISA elif conn.backend == Backend.NIDAQ: if demo: raise NotImplementedError('NIDAQ cannot be run in demo mode...') else: cls = ConnectionNIDAQ assert cls is not None, 'The Connection class is None' logger.debug('Connecting to %s using %s', conn, conn.backend.name) if demo: return ConnectionDemo(_record, cls) else: return cls(_record) if demo is None: demo = Config.DEMO_MODE if isinstance(record, dict) and len(record) == 1: key = list(record.keys())[0] return _connect(record[key]) elif isinstance(record, (list, tuple)) and len(record) == 1: return _connect(record[0]) return _connect(record)
[docs] def find_interface(address: str) -> Interface: """Find the interface for `address`. :param address: The address of a :class:`~msl.equipment.record_types.ConnectionRecord`. """ if ConnectionSDK.parse_address(address): return Interface.SDK # this check must come before the SERIAL and SOCKET checks if ConnectionPrologix.parse_address(address): return Interface.PROLOGIX if ConnectionSerial.parse_address(address): return Interface.SERIAL if ConnectionSocket.parse_address(address): return Interface.SOCKET if ConnectionTCPIPVXI11.parse_address(address): return Interface.TCPIP_VXI11 if ConnectionTCPIPHiSLIP.parse_address(address): return Interface.TCPIP_HISLIP if ConnectionGPIB.parse_address(address): return Interface.GPIB if ConnectionZeroMQ.parse_address(address): return Interface.ZMQ raise ValueError(f'Cannot determine the Interface from address {address!r}')