Source code for msl.equipment.connection_tcpip_hislip

"""
Base class for equipment that use the HiSLIP communication protocol.
"""
from __future__ import annotations

import socket

from .connection_message_based import ConnectionMessageBased
from .constants import REGEX_TCPIP
from .hislip import AsyncClient
from .hislip import FatalErrorMessage
from .hislip import HiSLIPException
from .hislip import PORT
from .hislip import SyncClient


[docs] class ConnectionTCPIPHiSLIP(ConnectionMessageBased): def __init__(self, record): """Base class for equipment that use the HiSLIP communication protocol. The :attr:`~msl.equipment.record_types.ConnectionRecord.properties` for a HiSLIP connection supports the following key-value pairs in the :ref:`connections-database`:: 'buffer_size': int, the maximum number of bytes to read at a time [default: 4096] 'encoding': str, the encoding to use [default: 'utf-8'] 'encoding_errors': str, encoding error handling scheme, e.g. 'strict', 'ignore' [default: 'strict'] 'lock_timeout': float or None, the timeout (in seconds) to wait for a lock [default: 0] 'max_read_size': int, the maximum number of bytes that can be read [default: 1 MB] 'rstrip': bool, whether to remove trailing whitespace from "read" messages [default: False] 'timeout': float or None, the timeout (in seconds) for read and write operations [default: None] The :data:`~msl.equipment.record_types.ConnectionRecord.backend` value must be equal to :data:`~msl.equipment.constants.Backend.MSL` to use this class for the communication system. This is achieved by setting the value in the **Backend** field for a connection record in the :ref:`connections-database` to be ``MSL``. Do not instantiate this class directly. Use the :meth:`~.EquipmentRecord.connect` method to connect to the equipment. Parameters ---------- record : :class:`~.record_types.EquipmentRecord` A record from an :ref:`equipment-database`. """ # the following must be defined before calling super() self._sync = None self._async = None super(ConnectionTCPIPHiSLIP, self).__init__(record) info = self.parse_address(record.connection.address) if info is None: self.raise_exception('Invalid address {!r}'.format(record.connection.address)) # the board number is currently not used self._host = info['host'] self._name = info['name'] self._port = info['port'] # HiSLIP does not support termination characters self.write_termination = None self.read_termination = None props = record.connection.properties self._buffer_size = props.get('buffer_size', 4096) self.lock_timeout = props.get('lock_timeout', 0) self._maximum_server_message_size = None self._connect() self.log_debug('Connected to %s', record.connection) def _connect(self): # it is useful to make this method because some subclasses needed to "reconnect" err_msg = None try: # IVI-6.1: IVI High-Speed LAN Instrument Protocol (HiSLIP) # 23 April 2020 (Revision 2.0) # Section 6.1: Initialization Transaction self._sync = SyncClient(self._host) self._sync.connect(port=self._port, timeout=self._timeout) status = self._sync.initialize(sub_address=self._name.encode()) if status.encrypted or status.initial_encryption: self.disconnect() raise RuntimeError('The HiSLIP server requires encryption, ' 'this has not been tested yet') self._async = AsyncClient(self._host) self._async.connect(port=self._port, timeout=self._timeout) self._async.async_initialize(status.session_id) r = self._async.async_maximum_message_size(self._max_read_size) self._sync.maximum_server_message_size = r.maximum_message_size self._async.maximum_server_message_size = r.maximum_message_size except socket.timeout: pass except Exception as e: err_msg = e.__class__.__name__ + ': ' + str(e) else: return if err_msg is None: self.raise_timeout() self.raise_exception('Cannot connect to {}\n{}'.format(self.equipment_record, err_msg)) def _set_backend_timeout(self): """Overrides method in ConnectionMessageBased.""" if self._sync is not None: self._sync.set_timeout(self._timeout) if self._async is not None: self._async.set_timeout(self._timeout) @property def host(self): """:class:`str`: The host (IP address).""" return self._host @property def port(self): """:class:`int`: The port number.""" return self._port @property def asynchronous(self): """:class:`~msl.equipment.hislip.AsyncClient`: The reference to the asynchronous client.""" return self._async @property def synchronous(self): """:class:`~msl.equipment.hislip.SyncClient`: The reference to the synchronous client.""" return self._sync
[docs] @staticmethod def parse_address(address): """Parse the address for valid TCPIP HiSLIP fields. Parameters ---------- address : :class:`str` The address of a :class:`~msl.equipment.record_types.ConnectionRecord`. Returns ------- :class:`dict` or :data:`None` The board number, hostname, LAN device name, and HiSLIP port number of the device or :data:`None` if `address` is not valid for a TCPIP HiSLIP connection. """ match = REGEX_TCPIP.match(address) if not match: return d = match.groupdict() if not d['name'] or not d['name'].lower().startswith('hislip'): return if not d['board']: d['board'] = '0' name_split = d['name'].split(',') if len(name_split) > 1: d['name'] = name_split[0] try: d['port'] = int(name_split[1]) except ValueError: return else: d['port'] = PORT return d
@property def max_read_size(self): """:class:`int`: The maximum number of bytes that can be :meth:`~msl.equipment.connection_message_based.ConnectionMessageBased.read`.""" # Overrides property in ConnectionMessageBased. return self._max_read_size @max_read_size.setter def max_read_size(self, size): self._max_read_size = int(size) if self._sync is None or self._async is None: return r = self._async.async_maximum_message_size(self._max_read_size) self._sync.maximum_server_message_size = r.maximum_message_size self._async.maximum_server_message_size = r.maximum_message_size @property def lock_timeout(self): """:class:`float`: The time, in seconds, to wait to acquire a lock.""" return self._lock_timeout @lock_timeout.setter def lock_timeout(self, value): if value is None or value < 0: # use 1 day as equivalent to "wait forever for a lock" self._lock_timeout = 86400.0 else: self._lock_timeout = float(value)
[docs] def disconnect(self): """Close the connection to the HiSLIP server.""" if self._async is not None: self._async.close() self._async = None if self._sync is not None: self._sync.close() self._sync = None self.log_debug('Disconnected from %s', self.equipment_record.connection)
def _read(self, size): """Overrides method in ConnectionMessageBased.""" try: return self._sync.receive(size=size, max_size=self._max_read_size, chunk_size=self._buffer_size) except HiSLIPException as e: # IVI-6.1: IVI High-Speed LAN Instrument Protocol (HiSLIP) # 23 April 2020 (Revision 2.0) # Section 6.2: Fatal Error Detection and Synchronization Recovery # If the error is detected by the client, after sending the FatalError # messages it shall close the HiSLIP connection self._send_fatal_error(e.message) raise except Exception as e: msg = FatalErrorMessage(payload=str(e).encode('ascii')) self._send_fatal_error(msg) raise
[docs] def reconnect(self, max_attempts=1): """Reconnect to the equipment. Parameters ---------- max_attempts : :class:`int`, optional The maximum number of attempts to try to reconnect with the equipment. If < 1 or :data:`None` then keep trying until a connection is successful. If the maximum number of attempts has been reached then an exception is raise. """ if max_attempts is None: max_attempts = -1 attempt = 0 while True: attempt += 1 try: return self._connect() except: if 0 < max_attempts <= attempt: raise
def _send_fatal_error(self, message): # IVI-6.1: IVI High-Speed LAN Instrument Protocol (HiSLIP) # 23 April 2020 (Revision 2.0) # Section 6.2: Fatal Error Detection and Synchronization Recovery # If the error is detected by the client, after sending the FatalError # messages it shall close the HiSLIP connection self._sync.write(message) self._async.write(message) self.disconnect() def _write(self, message): """Overrides method in ConnectionMessageBased.""" try: return self._sync.send(message) except HiSLIPException as e: self._send_fatal_error(e.message) raise except Exception as e: msg = FatalErrorMessage(payload=str(e).encode('ascii')) self._send_fatal_error(msg) raise
[docs] def read_stb(self): """Read the status byte from the device. Returns ------- :class:`int` The status byte. """ reply = self._async.async_status_query(self._sync) return reply.status
[docs] def trigger(self): """Send the trigger message (emulates a GPIB Group Execute Trigger event).""" self._sync.trigger()
[docs] def clear(self): """Send the `clear` command to the device.""" # IVI-6.1: IVI High-Speed LAN Instrument Protocol (HiSLIP) # 23 April 2020 (Revision 2.0) # Section 6.12: Device Clear Transaction # # This Connection class does not use the asynchronous client in an # asynchronous manner, therefore there should not be any pending # requests that need to be waited on to finish acknowledged = self._async.async_device_clear() self._sync.device_clear_complete(acknowledged.feature_bitmap)
[docs] def lock(self, lock_string=''): """Acquire the device's lock. Parameters ---------- lock_string : :class:`str`, optional An ASCII string that identifies this lock. If not specified, then an exclusive lock is requested, otherwise the string indicates an identification of a shared-lock request. Returns ------- :class:`bool` Whether acquiring the lock was successful. """ status = self._async.async_lock_request( timeout=self._lock_timeout, lock_string=lock_string) return status.success
[docs] def unlock(self): """Release the lock acquired by :meth:`.lock`. Returns ------- :class:`bool` Whether releasing the lock was successful. """ status = self._async.async_lock_release(self._sync.message_id) return status.success
[docs] def lock_status(self): """Request the lock status from the HiSLIP server. Returns ------- :class:`bool` Whether the HiSLIP server has an exclusive lock with a client. :class:`int` The number of HiSLIP clients that have a lock with the HiSLIP server. """ reply = self._async.async_lock_info() return reply.is_exclusive, reply.num_locks
[docs] def remote_local_control(self, request): """Send a GPIB-like remote/local control request. Parameters ---------- request : :class:`int` The request to perform. * 0 -- Disable remote, `VI_GPIB_REN_DEASSERT` * 1 -- Enable remote, `VI_GPIB_REN_ASSERT` * 2 -- Disable remote and go to local, `VI_GPIB_REN_DEASSERT_GTL` * 3 -- Enable Remote and go to remote, `VI_GPIB_REN_ASSERT_ADDRESS` * 4 -- Enable remote and lock out local, `VI_GPIB_REN_ASSERT_LLO` * 5 -- Enable remote, go to remote, and set local lockout, `VI_GPIB_REN_ASSERT_ADDRESS_LLO` * 6 -- go to local without changing REN or lockout state, `VI_GPIB_REN_ADDRESS_GTL` """ self._async.async_remote_local_control(request, self._sync.message_id)