"""
Base class for equipment that use the ZeroMQ_ communication protocol.
.. _ZeroMQ: https://zeromq.org/
"""
import socket
import zmq
from .connection_message_based import ConnectionMessageBased
from .constants import REGEX_ZMQ
[docs]class ConnectionZeroMQ(ConnectionMessageBased):
def __init__(self, record):
"""Base class for equipment that use the ZeroMQ_ communication protocol.
The :attr:`~msl.equipment.record_types.ConnectionRecord.properties`
for a ZeroMQ connection supports the following key-value pairs in the
:ref:`connections-database`::
'encoding': str, the encoding to use [default: 'utf-8']
'encoding_errors': str, encoding error handling scheme, e.g. 'strict', 'ignore' [default: 'strict']
'max_read_size': int, the maximum number of bytes that can be read [default: 1 MB]
'protocol': str, the ZeroMQ protocol [default: 'tcp']
'rstrip': bool, whether to remove trailing whitespace from "read" messages [default: False]
'socket_type': str, the ZeroMQ socket type [default: 'REQ']
'timeout': float or None, the timeout (in seconds) for read and write operations [default: None]
The :data:`~msl.equipment.record_types.ConnectionRecord.backend`
value must be equal to :data:`~msl.equipment.constants.Backend.MSL`
to use this class for the communication system. This is achieved by
setting the value in the **Backend** field for a connection record
in the :ref:`connections-database` to be ``MSL``.
Do not instantiate this class directly. Use the
:meth:`~.EquipmentRecord.connect` method to connect to the equipment.
Parameters
----------
record : :class:`~.record_types.EquipmentRecord`
A record from an :ref:`equipment-database`.
Raises
------
~msl.equipment.exceptions.MSLConnectionError
If the socket cannot be opened.
"""
# the following must be defined before calling super()
self._context = zmq.Context()
props = record.connection.properties
try:
socket_type = props.get('socket_type', 'REQ').upper()
self._socket_type = getattr(zmq, socket_type)
self._socket = self._context.socket(self._socket_type)
except (AttributeError, zmq.ZMQError):
self._context.destroy()
self._context = None
raise
super(ConnectionZeroMQ, self).__init__(record)
info = self.parse_address(record.connection.address)
if info is None:
self.raise_exception('Invalid address {!r}'.format(record.connection.address))
self._host = info['host']
self._port = info['port']
# ZeroMQ does not use termination characters
self.write_termination = None
self.read_termination = None
self._protocol = props.get('protocol', 'tcp')
self._connect()
self.log_debug('Connected to %s', record.connection)
def _connect(self):
err_msg = None
default_timeout = 10
# Calling zmq.Socket.connect() does not verify if the connection
# can be made, so use the builtin socket module to verify
s = socket.socket()
try:
s.settimeout(self.timeout or default_timeout)
s.connect((self._host, self._port))
# The (host, port) is valid, so now call zmq.Socket.connect()
self._socket.connect('{}://{}:{}'.format(self._protocol, self._host, self._port))
except socket.timeout:
pass
except Exception as e:
err_msg = e.__class__.__name__ + ': ' + str(e)
else:
return
finally:
s.close()
if err_msg is None:
if not self.timeout:
self.timeout = default_timeout
self.raise_timeout()
self.raise_exception('Cannot connect to {}\n{}'.format(self.equipment_record, err_msg))
def _read(self, size):
"""Overrides method in ConnectionMessageBased."""
reply = self._socket.recv(flags=0, copy=True)
if size is None:
return reply
return reply[:size]
def _set_backend_timeout(self):
"""Overrides method in ConnectionMessageBased."""
# ZeroMQ requires the timeout to be an integer with units of milliseconds
if self._timeout is None:
timeout_ms = -1 # infinite
else:
timeout_ms = int(self._timeout * 1000)
self._socket.setsockopt(zmq.RCVTIMEO, timeout_ms)
def _write(self, message):
"""Overrides method in ConnectionMessageBased."""
self._socket.send(message, flags=0, copy=True)
return len(message)
@property
def context(self):
""":class:`~zmq.sugar.context.Context`: Reference to the ZeroMQ context."""
return self._context
[docs] def disconnect(self):
"""Close the connection."""
if self._context is None:
return
# Calling self._context.destroy() caused the interpreter to hang
# if an exception occurred prior to calling disconnect(). Explicitly
# closing the socket and terminating the context seems to be better.
self._socket.close()
self._context.term()
self.log_debug('Disconnected from %s', self.equipment_record.connection)
@property
def host(self):
""":class:`str`: The host (IP address)."""
return self._host
@property
def max_read_size(self):
""":class:`int`: The maximum number of bytes that can be
:meth:`~msl.equipment.connection_message_based.ConnectionMessageBased.read`."""
# Overrides property in ConnectionMessageBased.
return self._max_read_size
@max_read_size.setter
def max_read_size(self, size):
size = int(size)
if size < 1:
raise ValueError('The maximum number of bytes to read must be > 0, got {}'.format(size))
self._max_read_size = size
self._socket.setsockopt(zmq.MAXMSGSIZE, size)
[docs] @staticmethod
def parse_address(address):
"""Parse the address for valid ZeroMQ fields.
Parameters
----------
address : :class:`str`
The address of a :class:`~msl.equipment.record_types.ConnectionRecord`.
Returns
-------
:class:`dict` or :data:`None`
The host and port number of the device or :data:`None` if `address`
is not valid for a ZeroMQ connection.
"""
match = REGEX_ZMQ.match(address)
if not match:
return
d = match.groupdict()
return {'host': d['host'], 'port': int(d['port'])}
@property
def port(self):
""":class:`int`: The port number."""
return self._port
[docs] def reconnect(self, max_attempts=1):
"""Reconnect to the equipment.
Parameters
----------
max_attempts : :class:`int`, optional
The maximum number of attempts to try to reconnect with the
equipment. If < 1 or :data:`None` then keep trying until a
connection is successful. If the maximum number of attempts
has been reached then an exception is raise.
"""
if max_attempts is None:
max_attempts = -1
self._context.destroy()
self._context = zmq.Context()
self._socket = self._context.socket(self._socket_type)
attempt = 0
while True:
attempt += 1
try:
return self._connect()
except:
if 0 < max_attempts <= attempt:
raise
@property
def socket(self):
""":class:`~zmq.sugar.socket.Socket`: Reference to the ZeroMQ socket."""
return self._socket