Source code for msl.equipment.vxi11

"""
Implementation of the VXI-11_ protocol.

VXI-11_ is a client-service model to send messages through a network.
The messages are formatted using the Remote Procedure Call protocol [RFC-1057_]
and are encoded/decoded using the eXternal Data Representation standard [RFC-1014_].

References
----------
* VXI-11_ -- *TCP/IP Instrument Protocol Specification (Revision 1.0)*, **VXIbus Consortium**, July 1995.
* RFC-1057_ -- *RPC: Remote Procedure Call Protocol Specification (Version 2)*, **Sun Microsystems**, June 1988.
* RFC-1014_ -- *XDR: External Data Representation Standard*, **Sun Microsystems**, June 1987.

.. _VXI-11: http://www.vxibus.org/specifications.html
.. _RFC-1057: https://www.rfc-editor.org/rfc/rfc1057
.. _RFC-1014: https://www.rfc-editor.org/rfc/rfc1014
"""
from __future__ import annotations

import os
import random
import socket
from enum import IntEnum
from struct import pack
from struct import unpack

from .utils import parse_lxi_webserver

# VXI-11 program numbers
DEVICE_ASYNC = 0x0607B0
DEVICE_CORE = 0x0607AF
DEVICE_INTR = 0x0607B1

# VXI-11 version numbers
DEVICE_ASYNC_VERSION = 1
DEVICE_CORE_VERSION = 1
DEVICE_INTR_VERSION = 1

# VXI-11 procedure numbers
DEVICE_ABORT = 1
CREATE_LINK = 10
DEVICE_WRITE = 11
DEVICE_READ = 12
DEVICE_READSTB = 13
DEVICE_TRIGGER = 14
DEVICE_CLEAR = 15
DEVICE_REMOTE = 16
DEVICE_LOCAL = 17
DEVICE_LOCK = 18
DEVICE_UNLOCK = 19
DEVICE_ENABLE_SRQ = 20
DEVICE_DOCMD = 22
DESTROY_LINK = 23
CREATE_INTR_CHAN = 25
DESTROY_INTR_CHAN = 26
DEVICE_INTR_SRQ = 30

# VXI-11 constants
RX_REQCNT = 1
RX_CHR = 2
RX_END = 4


# VXI-11 Operation Flags, see Section B.5.3 in the document
[docs] class OperationFlag(IntEnum): """`VXI-11:` Additional information concerning how a request is carried out.""" NULL = 0x00 WAITLOCK = 0x01 END = 0x08 TERMCHRSET = 0x80
# VXI-11 error codes, see Table B.2 in the document VXI_ERROR_CODES = { 0: 'No error', 1: 'Syntax error', 3: 'Device not accessible', 4: 'Invalid link identifier', 5: 'Parameter error', 6: 'Channel not established', 8: 'Operation not supported', 9: 'Out of resources', 11: 'Device locked by another link', 12: 'No lock held by this link', 15: 'I/O timeout', 17: 'I/O error', 21: 'Invalid address', 23: 'Abort', 29: 'Channel already established' } # RPC program numbers PMAP_PROG = 100000 # RPC version numbers RPC_VERS = 2 PMAP_VERS = 2 # RPC procedure numbers PMAPPROC_NULL = 0 PMAPPROC_SET = 1 PMAPPROC_UNSET = 2 PMAPPROC_GETPORT = 3 PMAPPROC_DUMP = 4 PMAPPROC_CALLIT = 5 # RPC constants if os.getenv('MSL_EQUIPMENT_TESTING'): PMAP_PORT = 11111 else: PMAP_PORT = 111 # RPC enums
[docs] class MessageType(IntEnum): """`RPC:` The message type.""" CALL = 0 REPLY = 1
[docs] class ReplyStatus(IntEnum): """`RPC:` Message reply status.""" MSG_ACCEPTED = 0 MSG_DENIED = 1
[docs] class AcceptStatus(IntEnum): """`RPC:` Message accepted status.""" SUCCESS = 0 PROG_UNAVAIL = 1 PROG_MISMATCH = 2 PROC_UNAVAIL = 3 GARBAGE_ARGS = 4
[docs] class RejectStatus(IntEnum): """`RPC:` Message rejected status.""" RPC_MISMATCH = 0 AUTH_ERROR = 1
[docs] class AuthStatus(IntEnum): """`RPC:` Authorization status.""" AUTH_BADCRED = 1 AUTH_REJECTEDCRED = 2 AUTH_BADVERF = 3 AUTH_REJECTEDVERF = 4 AUTH_TOOWEAK = 5
[docs] class RPCClient(object): def __init__(self, host): """Remote Procedure Call implementation for a client. Parameters ---------- host : :class:`str` The hostname or IP address of the remote device. """ super(RPCClient, self).__init__() self._host = host self._sock = None self._xid = 0 # transaction identifier self._buffer = bytearray() self._chunk_size = 4096
[docs] def append(self, data): """Append data to the body of the current RPC message. Parameters ---------- data : :class:`bytes` or :class:`memoryview` The data to append. """ self._buffer.extend(data)
[docs] def append_opaque(self, text): """Append a variable-length string to the body of the current RPC message. Parameters ---------- text : :class:`memoryview`, :class:`bytes` or :class:`str` The data to append. """ # mimic the builtin xdrlib.Packer class # don't use xdrlib since it became deprecated in Python 3.11 if not text: return if isinstance(text, memoryview): encoded = text elif isinstance(text, bytes): encoded = memoryview(text) else: encoded = memoryview(text.encode('ascii')) # must be an ASCII message n = len(encoded) self.append(pack('>L', n)) data = encoded[:n] self.append(data) n = ((n + 3) // 4) * 4 self.append((n - len(data)) * b'\0')
@property def chunk_size(self): """:class:`int`: The maximum number of bytes to receive at a time from the socket.""" return self._chunk_size @chunk_size.setter def chunk_size(self, size): self._chunk_size = int(size)
[docs] def close(self): """Close the RPC socket, if one is open.""" if self._sock: self._sock.close() self._sock = None
[docs] def connect(self, port, timeout=10): """Connect to a specific port on the device. Parameters ---------- port : :class:`int` The port number to connect to. timeout : :class:`float` or :data:`None`, optional The maximum number of seconds to wait for the connection to be established. """ self.close() self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.settimeout(timeout) self._sock.connect((self._host, port))
[docs] def get_buffer(self): """Get the data in the buffer. Returns ------- :class:`bytearray` The data in the current RPC message. """ return self._buffer
[docs] def get_port(self, prog, vers, prot, timeout=10): """Call the Port Mapper procedure to determine which port to use for a program. This method will automatically open and close the socket connection. Parameters ---------- prog : :class:`int` The program number to get the port number of. vers : :class:`int` The version number of `prog`. prot : :class:`int` The socket protocol family type to use when sending requests to `prog` (``IPPROTO_TCP`` or ``IPPROTO_UDP``). timeout : :class:`float` or :data:`None`, optional The maximum number of seconds to wait to get the port value. Returns ------- :class:`int` The port number that corresponds to `prog`. """ try: self.connect(PMAP_PORT, timeout=timeout) self.init(PMAP_PROG, PMAP_VERS, PMAPPROC_GETPORT) self.append(pack('>4I', prog, vers, prot, 0)) self.write() port, = unpack('>L', self.read()) finally: self.close() if port == 0: raise RuntimeError( 'Could not determine the port from the Port Mapper procedure') return port
[docs] def init(self, prog, vers, proc): """Construct a new RPC message. Parameters ---------- prog : :class:`int` The program number. vers : :class:`int` The version number of program. proc : :class:`int` The procedure number within the program to be called. """ # increment and wrap to 0 on uint32 overflow xid = (self._xid + 1) & 0xFFFFFFFF # The VXI-11 document, Section B.4.5 (Security Control), states # that authentication is not used. That is where the two 0's come from self._buffer = bytearray(pack('>6I2Q', xid, MessageType.CALL, RPC_VERS, prog, vers, proc, 0, 0)) self._xid = xid
[docs] def interrupt_handler(self): """Override this method to be notified of a service interrupt. This method gets called if an interrupt is received during a :meth:`.read`. It does not continuously poll the device. """ return
[docs] def read(self): """Read an RPC message, check for errors, and return the procedure-specific data. Returns ------- :class:`memoryview` The procedure-specific data. """ # RFC-1057, Section 10 describes that RPC messages are sent in fragments last_fragment = False message = bytearray() recv = self._sock.recv recv_into = self._sock.recv_into chunk_size = self._chunk_size while not last_fragment: header = recv(4) if len(header) < 4: raise EOFError('The RPC reply header is < 4 bytes') h, = unpack('>I', header) last_fragment = (h & 0x80000000) != 0 fragment_size = h & 0x7FFFFFFF fragment = bytearray(fragment_size) # preallocate view = memoryview(fragment) # avoids unnecessarily copying of slices size = 0 while size < fragment_size: request_size = min(chunk_size, fragment_size - size) received_size = recv_into(view, request_size) view = view[received_size:] size += received_size message.extend(fragment) reply = self.check_reply(memoryview(message)) if reply is None: # Unexpected transaction id (xid), most likely from reading an interrupt. # Recursively read from the device until the corrected xid is received. self.interrupt_handler() return self.read() return reply
[docs] def set_timeout(self, timeout): """Set the socket timeout value. Parameters ---------- timeout : :class:`float` The timeout, in seconds, to use for the socket. """ self._sock.settimeout(timeout)
@property def socket(self): """:class:`~socket.socket`: The reference to the socket.""" return self._sock
[docs] @staticmethod def unpack_opaque(data): """Unpack and return a variable-length string. Parameters ---------- data : :class:`bytes`, :class:`bytearray` or :class:`memoryview` The data to unpack. Returns ------- :class:`bytes`, :class:`bytearray` or :class:`memoryview` The unpacked data. """ # mimic the builtin xdrlib.Unpacker class # don't use xdrlib since it became deprecated in Python 3.11 if not data: return b'' n, = unpack('>L', data[:4]) return data[4:4+n]
[docs] def write(self): """Write the RPC message that is in the buffer.""" # RFC-1057, Section 10 describes that RPC messages are sent in fragments fragment_size = 0x7FFFFFFF # (2**31) - 1 remaining = len(self._buffer) view = memoryview(self._buffer) sendall = self._sock.sendall while remaining > 0: if remaining <= fragment_size: # then it is the last fragment fragment_size = remaining | 0x80000000 data = bytearray(pack('>I', fragment_size)) data.extend(view[:fragment_size]) sendall(data) view = view[fragment_size:] remaining -= fragment_size
[docs] def check_reply(self, message): """Checks the message for errors and returns the procedure-specific data. Parameters ---------- message : :class:`memoryview` The reply from an RPC message. Returns ------- :class:`memoryview` or :data:`None` The reply or :data:`None` if the transaction id does not match the value that was used in the corresponding :meth:`.write` call. """ xid, mtype, status = unpack('>3I', message[:12]) if xid != self._xid: # data in read buffer is due to an interrupt? return if mtype != MessageType.REPLY: raise RuntimeError('RPC message type is not {!r}, got {}'.format( MessageType.REPLY, mtype)) if status == ReplyStatus.MSG_ACCEPTED: verf, status = unpack('>QI', message[12:24]) assert verf == 0 # VXI-11 does not use authentication if status == AcceptStatus.SUCCESS: return message[24:] # procedure-specific data elif status == AcceptStatus.PROG_MISMATCH: low, high = unpack('>2I', message[24:32]) raise RuntimeError('RPC call failed: {!r}: low={}, high={}'.format( AcceptStatus.PROG_MISMATCH, low, high)) else: # cases include PROG_UNAVAIL, PROC_UNAVAIL, and GARBAGE_ARGS raise RuntimeError('RPC call failed: {!r}'.format( AcceptStatus(status))) elif status == ReplyStatus.MSG_DENIED: status, = unpack('>I', message[12:16]) if status == RejectStatus.RPC_MISMATCH: low, high = unpack('>2I', message[16:24]) raise RuntimeError('RPC call failed: {!r}: low={}, high={}'.format( RejectStatus(status), low, high)) elif status == RejectStatus.AUTH_ERROR: raise RuntimeError('RPC authentication failed: {!r}'.format( AuthStatus(status))) else: raise RuntimeError('RPC MSG_DENIED reply status is not ' 'RPC_MISMATCH nor AUTH_ERROR') else: raise RuntimeError('RPC reply is not MSG_ACCEPTED nor MSG_DENIED')
[docs] class VXIClient(RPCClient): def __init__(self, host): """Base class for a VXI-11 program. Parameters ---------- host : :class:`str` The hostname or IP address of the remote device. """ super(VXIClient, self).__init__(host)
[docs] def read_reply(self): """Check the RPC message for an error and return the remaining data. Returns ------- :class:`memoryview` The reply data. """ reply = self.read() error, = unpack('>L', reply[:4]) if error == 0: return reply[4:] msg = VXI_ERROR_CODES.get(error, 'Undefined error') raise RuntimeError('{} [error={}]'.format(msg, error))
[docs] class CoreClient(VXIClient): def __init__(self, host): """Communicate with the `Device Core` program on the remote device. Parameters ---------- host : :class:`str` The hostname or IP address of the remote device. """ super(CoreClient, self).__init__(host)
[docs] def device_write(self, lid, io_timeout, lock_timeout, flags, data): """Write data to the specified device. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. io_timeout : :class:`int` Time, in milliseconds, to wait for I/O to complete. lock_timeout : :class:`int` Time, in milliseconds, to wait on a lock. flags : :class:`int` or :class:`OperationFlag` Operation flags to use. data : :class:`memoryview`, :class:`bytes` or :class:`str` The data to write. Returns ------- :class:`int` The number of bytes written. """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_WRITE) self.append(pack('>4l', lid, io_timeout, lock_timeout, flags)) self.append_opaque(data) self.write() size, = unpack('>L', self.read_reply()) return size
[docs] def device_read(self, lid, request_size, io_timeout, lock_timeout, flags, term_char): """Read data from the device. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. request_size : :class:`int` The number of bytes requested. io_timeout : :class:`int` Time, in milliseconds, to wait for I/O to complete. lock_timeout : :class:`int` Time, in milliseconds, to wait on a lock. flags : :class:`int` or :class:`OperationFlag` Operation flags to use. term_char : :class:`int` The termination character. Valid only if `flags` is :attr:`~OperationFlag.TERMCHRSET`. Returns ------- :class:`int` The reason(s) the read completed. :class:`memoryview` A view of the data (the RPC header is removed). """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_READ) self.append(pack('>6l', lid, request_size, io_timeout, lock_timeout, flags, term_char)) self.write() reply = self.read_reply() reason, = unpack('>L', reply[:4]) return reason, self.unpack_opaque(reply[4:])
[docs] def device_readstb(self, lid, flags, lock_timeout, io_timeout): """Read the status byte from the device. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. flags : :class:`int` or :class:`OperationFlag` Operation flags to use. lock_timeout : :class:`int` Time, in milliseconds, to wait on a lock. io_timeout : :class:`int` Time, in milliseconds, to wait for I/O to complete. Returns ------- :class:`int` The status byte. """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_READSTB) self.append(pack('>4l', lid, flags, lock_timeout, io_timeout)) self.write() stb, = unpack('>L', self.read_reply()) return stb
[docs] def device_trigger(self, lid, flags, lock_timeout, io_timeout): """Send a trigger to the device. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. flags : :class:`int` or :class:`OperationFlag` Operation flags to use. lock_timeout : :class:`int` Time, in milliseconds, to wait on a lock. io_timeout : :class:`int` Time, in milliseconds, to wait for I/O to complete. """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_TRIGGER) self.append(pack('>4l', lid, flags, lock_timeout, io_timeout)) self.write() self.read_reply()
[docs] def device_clear(self, lid, flags, lock_timeout, io_timeout): """Send the `clear` command to the device. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. flags : :class:`int` or :class:`OperationFlag` Operation flags to use. lock_timeout : :class:`int` Time, in milliseconds, to wait on a lock. io_timeout : :class:`int` Time, in milliseconds, to wait for I/O to complete. """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_CLEAR) self.append(pack('>4l', lid, flags, lock_timeout, io_timeout)) self.write() self.read_reply()
[docs] def device_remote(self, lid, flags, lock_timeout, io_timeout): """Place the device in a remote state wherein all programmable local controls are disabled. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. flags : :class:`int` or :class:`OperationFlag` Operation flags to use. lock_timeout : :class:`int` Time, in milliseconds, to wait on a lock. io_timeout : :class:`int` Time, in milliseconds, to wait for I/O to complete. """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_REMOTE) self.append(pack('>4l', lid, flags, lock_timeout, io_timeout)) self.write() self.read_reply()
[docs] def device_local(self, lid, flags, lock_timeout, io_timeout): """Place the device in a local state wherein all programmable local controls are enabled. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. flags : :class:`int` or :class:`OperationFlag` Operation flags to use. lock_timeout : :class:`int` Time, in milliseconds, to wait on a lock. io_timeout : :class:`int` Time, in milliseconds, to wait for I/O to complete. """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_LOCAL) self.append(pack('>4l', lid, flags, lock_timeout, io_timeout)) self.write() self.read_reply()
[docs] def device_lock(self, lid, flags, lock_timeout): """Acquire a device's lock. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. flags : :class:`int` or :class:`OperationFlag` Operation flags to use. lock_timeout : :class:`int` Time, in milliseconds, to wait on a lock. """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_LOCK) self.append(pack('>3l', lid, flags, lock_timeout)) self.write() self.read_reply()
[docs] def device_unlock(self, lid): """Release a lock acquired by :meth:`.device_lock`. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_UNLOCK) self.append(pack('>l', lid)) self.write() self.read_reply()
[docs] def device_enable_srq(self, lid, enable, handle): """Enable or disable the sending of `device_intr_srq` RPCs by the network instrument server. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. enable : :class:`bool` Whether to enable or disable interrupts. handle : :class:`bytes` Host specific data (maximum length is 40 characters). """ if len(handle) > 40: raise ValueError('The handle must be <= 40 characters') self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_ENABLE_SRQ) self.append(pack('>2l', lid, enable)) self.append_opaque(handle) self.write() self.read_reply()
[docs] def device_docmd(self, lid, flags, io_timeout, lock_timeout, cmd, network_order, datasize, data_in): """Allows for a variety of operations to be executed. Parameters ---------- lid : :class:`int` Link id from :meth:`.create_link`. flags : :class:`int` or :class:`OperationFlag` Operation flags to use. io_timeout : :class:`int` Time, in milliseconds, to wait for I/O to complete. lock_timeout : :class:`int` Time, in milliseconds, to wait on a lock. cmd : :class:`int` Which command to execute. network_order : :class:`bool` Client's byte order. datasize : :class:`int` Size of individual data elements. data_in : :class:`bytes` or :class:`str` Data input parameters. Returns ------- :class:`bytes` The results defined by `cmd`. """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DEVICE_DOCMD) self.append(pack('>7l', lid, flags, io_timeout, lock_timeout, cmd, network_order, datasize)) self.append_opaque(data_in) self.write() return bytes(self.unpack_opaque(self.read_reply()))
[docs] def create_intr_chan(self, host_addr, host_port, prog_num, prog_vers, prog_family): """Inform the network instrument server to establish an interrupt channel. Parameters ---------- host_addr : :class:`int` Host servicing the interrupt. host_port : :class:`int` Valid port number on the client. prog_num : :class:`int` Program number. prog_vers : :class:`int` Program version number. prog_family : :class:`int` The underlying socket protocol family type (``IPPROTO_TCP`` or ``IPPROTO_UDP``). """ self.init(DEVICE_CORE, DEVICE_CORE_VERSION, CREATE_INTR_CHAN) self.append(pack('>5L', host_addr, host_port, prog_num, prog_vers, prog_family)) self.write() self.read_reply()
[docs] def destroy_intr_chan(self): """Inform the network instrument server to close its interrupt channel.""" self.init(DEVICE_CORE, DEVICE_CORE_VERSION, DESTROY_INTR_CHAN) self.write() self.read_reply()
[docs] class AsyncClient(VXIClient): def __init__(self, host): """Communicate with the `Device Async` program on the remote device. Parameters ---------- host : :class:`str` The hostname or IP address of the remote device. """ super(AsyncClient, self).__init__(host)
[docs] def device_abort(self, lid): """Stops an in-progress call. Parameters ---------- lid : :class:`int` Link id from :meth:`~.CoreClient.create_link`. """ self.init(DEVICE_ASYNC, DEVICE_ASYNC_VERSION, DEVICE_ABORT) self.append(pack('>l', lid)) self.write() self.read_reply()
[docs] def find_vxi11(*, ip: list[str] | None = None, timeout: float = 1) -> dict[str, dict[str, str | list[str]]]: """Find all VXI-11 devices that are on the network. The RPC port-mapper protocol (RFC-1057_, Appendix A) broadcasts a message via UDP to port 111 for VXI-11 device discovery. :param ip: The IP address(es) on the local computer to use to broadcast the discovery message. If not specified, broadcast on all network interfaces. :param timeout: The maximum number of seconds to wait for a reply. :return: The information about the VXI-11 devices that were found. """ import select import threading import time from .utils import logger if not ip: from .utils import ipv4_addresses all_ips = ipv4_addresses() else: all_ips = ip logger.debug('find VXI-11 devices: interfaces:=%s, timeout=%s', all_ips, timeout) def broadcast(host): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) sock.bind((host, 0)) sock.sendto(broadcast_msg, ('255.255.255.255', PMAP_PORT)) select_timeout = min(timeout*0.1, 0.1) t0 = time.time() while True: r, w, x = select.select([sock], [], [], select_timeout) if time.time() - t0 > timeout: break if not r: continue reply, (ip_address, port) = sock.recvfrom(1024) try: assert port == 111 port, = unpack('>L', client.check_reply(memoryview(reply))) assert port > 0 except: continue try: lxi = parse_lxi_webserver(ip_address, timeout=timeout) except: lxi = {} device = {} addresses = set() addresses.add(f'TCPIP::{ip_address}::inst0::INSTR') if 'title' in lxi: # The XML document does not exist, the homepage was parsed device['description'] = lxi['title'] elif 'Manufacturer' in lxi: # The XML document exists md = lxi['ManufacturerDescription'] description = [] for item in ('Manufacturer', 'Model', 'SerialNumber'): if lxi[item] not in md: description.append(lxi[item]) description.append(md) device['description'] = ', '.join(description) for interface in lxi['Interfaces']: if interface['InterfaceType'] != 'LXI': continue for address in interface['InstrumentAddressStrings']: addresses.add(address) addresses.add(f"TCPIP::{interface['Hostname']}::inst0::INSTR") else: device['description'] = 'Unknown LXI device' device['webserver'] = f'http://{ip_address}' device['addresses'] = sorted(addresses) key = tuple(int(s) for s in ip_address.split('.')) devices[key] = device sock.close() # construct the broadcast message client = RPCClient('') client.init(PMAP_PROG, PMAP_VERS, PMAPPROC_GETPORT) client.append(pack('>4I', DEVICE_CORE, DEVICE_CORE_VERSION, socket.IPPROTO_TCP, 0)) broadcast_msg = client.get_buffer() # TODO use asyncio instead of threading when dropping Python 2.7 support devices = {} threads = [threading.Thread(target=broadcast, args=(ip,)) for ip in all_ips] for thread in threads: thread.start() for thread in threads: thread.join() return dict(('.'.join(str(v) for v in k), devices[k]) for k in sorted(devices))