Source code for msl.equipment.connection_serial

"""
Base class for equipment that is connected through a serial port.
"""
from __future__ import annotations

import time

import serial

from .connection_message_based import ConnectionMessageBased
from .constants import DataBits
from .constants import Parity
from .constants import REGEX_PROLOGIX
from .constants import REGEX_SERIAL
from .constants import StopBits


[docs] class ConnectionSerial(ConnectionMessageBased): def __init__(self, record): """Base class for equipment that is connected through a serial port. The :attr:`~msl.equipment.record_types.ConnectionRecord.properties` for a serial connection supports the following key-value pairs in the :ref:`connections-database` (see also :class:`serial.Serial` for more details about each parameter):: 'baud_rate': int, the baud rate [default: 9600] 'data_bits': int, the number of data bits, e.g. 5, 6, 7, 8 [default: 8] 'dsr_dtr': bool, enable hardware (DSR/DTR) flow control [default: False] 'encoding': str, the encoding to use [default: 'utf-8'] 'encoding_errors': str, encoding error handling scheme, e.g. 'strict', 'ignore' [default: 'strict'] 'inter_byte_timeout': float or None, the inter-character timeout [default: None] 'max_read_size': int, the maximum number of bytes that can be read [default: 1 MB] 'parity': str or None, parity checking, e.g. 'even', 'odd' [default: None] 'read_termination': str or None, read until this termination sequence is found [default: '\\n'] 'rstrip': bool, whether to remove trailing whitespace from "read" messages [default: False] 'rts_cts': bool, enable hardware (RTS/CTS) flow control [default: False] 'stop_bits': int or float, the number of stop bits, e.g. 1, 1.5, 2 [default: 1] 'termination': shortcut for setting both 'read_termination' and 'write_termination' to this value 'timeout': float or None, the timeout (in seconds) for read and write operations [default: None] 'write_termination': str or None, termination sequence appended to write messages [default: '\\r\\n'] 'xon_xoff': bool, enable software flow control [default: False] The :data:`~msl.equipment.record_types.ConnectionRecord.backend` value must be equal to :data:`~msl.equipment.constants.Backend.MSL` to use this class for the communication system. This is achieved by setting the value in the **Backend** field for a connection record in the :ref:`connections-database` to be ``MSL``. Do not instantiate this class directly. Use the :meth:`~.EquipmentRecord.connect` method to connect to the equipment. Parameters ---------- record : :class:`~.EquipmentRecord` A record from an :ref:`equipment-database`. Raises ------ ~msl.equipment.exceptions.MSLConnectionError If the serial port cannot be opened. """ self._serial = serial.Serial() super(ConnectionSerial, self).__init__(record) info = ConnectionSerial.parse_address(record.connection.address) if info is None: self.raise_exception('Invalid address {!r}'.format(record.connection.address)) self._serial.port = info['port'] props = record.connection.properties self._serial.parity = props.get('parity', Parity.NONE).value self._serial.inter_byte_timeout = props.get('inter_byte_timeout', None) try: self._serial.baudrate = props['baud_rate'] except KeyError: self._serial.baudrate = props.get('baudrate', 9600) try: self._serial.bytesize = props['data_bits'].value except KeyError: self._serial.bytesize = props.get('bytesize', DataBits.EIGHT).value try: self._serial.stopbits = props['stop_bits'].value except KeyError: self._serial.stopbits = props.get('stopbits', StopBits.ONE).value try: self._serial.xonxoff = props['xon_xoff'] except KeyError: self._serial.xonxoff = props.get('xonxoff', False) try: self._serial.rtscts = props['rts_cts'] except KeyError: self._serial.rtscts = props.get('rtscts', False) try: self._serial.dsrdtr = props['dsr_dtr'] except KeyError: self._serial.dsrdtr = props.get('dsrdtr', False) try: self._serial.open() except serial.serialutil.SerialException as err: # don't raise SerialException, use self.raise_exception below error_msg = str(err) else: error_msg = None if error_msg: self.raise_exception(error_msg) self.log_debug('Connected to %s', record.connection) @property def serial(self): """:class:`serial.Serial`: The reference to the serial object.""" return self._serial @property def baud_rate(self): """:class:`int`: The baud rate setting.""" return self._serial.baudrate @property def data_bits(self): """:class:`~.constants.DataBits`: The number of data bits.""" return DataBits(self._serial.bytesize) @property def stop_bits(self): """:class:`~.constants.StopBits`: The stop bit setting.""" return StopBits(self._serial.stopbits) @property def parity(self): """:class:`~.constants.Parity`: The parity setting.""" return Parity(self._serial.parity) def _set_backend_timeout(self): self._serial.timeout = self._timeout self._serial.write_timeout = self._timeout
[docs] def disconnect(self): """Close the serial port.""" try: # if the subclass raised an error in the constructor before # this class is initialized then self._serial won't exist if self._serial.is_open: self._serial.close() self.log_debug('Disconnected from %s', self.equipment_record.connection) except AttributeError: pass
def _write(self, message): """Overrides method in ConnectionMessageBased.""" return self._serial.write(message) def _read(self, size): """Overrides method in ConnectionMessageBased.""" if size is not None: return self._serial.read(size) msg = bytearray() now = time.time read = self._serial.read r_term = self._read_termination timeout = self._timeout max_read_size = self._max_read_size t0 = now() while True: msg.extend(read(1)) if r_term and msg.endswith(r_term): return msg if len(msg) > max_read_size: raise RuntimeError('len(message) [{}] > max_read_size [{}]'.format( len(msg), max_read_size)) if timeout and now() - t0 > timeout: self.raise_timeout()
[docs] @staticmethod def parse_address(address): """Get the serial port from an address. Parameters ---------- address : :class:`str` The address of a :class:`~msl.equipment.record_types.ConnectionRecord` Returns ------- :class:`dict` or :data:`None` The serial port in a format that is valid for PySerial (i.e., ``ASRL3`` becomes ``COM3``) or :data:`None` if the port cannot be determined from `address`. """ match = REGEX_SERIAL.match(address) if match: if match['number']: return {'port': f"COM{match['number']}"} if match['dev']: return {'port': match['dev']} match = REGEX_PROLOGIX.match(address) if match: prefix = 'ASRL' if match['name'].startswith('/') else '' return ConnectionSerial.parse_address(prefix + match['name'])