Source code for multicast.hear

#! /usr/bin/env python3
# -*- coding: utf-8 -*-

# Multicast Python Module
# ..................................
# Copyright (c) 2017-2025, Mr. Walls
# ..................................
# Licensed under MIT (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# ..........................................
# https://github.com/reactive-firewall-org/multicast/tree/HEAD/LICENSE.md
# ..........................................
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Provides multicast HEAR Features.

Provides server-side functionality for listening to multicast messages. It implements a
UDP server that can receive and process multicast messages continuously.

Classes:
	McastServer: UDP server implementation for multicast communication.
	HearUDPHandler: Request handler for processing multicast messages.
	McastHEAR: Main tool class for HEAR operations.

Caution: See details regarding dynamic imports [documented](../__init__.py) in this module.

Minimal Acceptance Testing:

	First set up test fixtures by importing multicast.

	Testcase 0: Multicast should be importable.

		>>> import multicast
		>>> multicast.hear is not None
		True
		>>> multicast.__doc__ is not None
		True
		>>>

	Testcase 1: Recv should be automatically imported.
		A: Test that the multicast component is initialized.
		B: Test that the hear component is initialized.
		C: Test that the hear component has __doc__

		>>> multicast is not None
		True
		>>> multicast.hear is not None
		True
		>>> multicast.hear.__doc__ is not None
		True
		>>> type(multicast.hear.__doc__) == type(str(''''''))
		True
		>>>

	Testcase 2: Recv should be detailed with some metadata.
		A: Test that the __MAGIC__ variables are initialized.
		B: Test that the __MAGIC__ variables are strings.

		>>> multicast.hear is not None
		True
		>>> multicast.hear.__module__ is not None
		True
		>>> multicast.hear.__package__ is not None
		True
		>>> type(multicast.hear.__doc__) == type(multicast.recv.__module__)
		True
		>>>

	Testcase 3: main should return an int.
		A: Test that the multicast component is initialized.
		B: Test that the hear component is initialized.
		C: Test that the main(HEAR) function-flow is initialized.
		D: Test that the main(HEAR) function-flow returns an int 0-256.

		>>> multicast.__main__ is not None
		True
		>>> multicast.__main__.main is not None
		True
		>>> tst_fxtr_args = ['''HEAR''', '''--port=1234''']
		>>> (test_fixture, ignored_value) = multicast.__main__.main(tst_fxtr_args)
		>>> test_fixture is not None
		True
		>>> type(test_fixture) #doctest: -DONT_ACCEPT_BLANKLINE, +ELLIPSIS
		<...int...>
		>>> int(test_fixture) >= int(0)
		True
		>>> type(test_fixture) is type(0)
		True
		>>> int(test_fixture) < int(256)
		True
		>>> (int(test_fixture) >= int(0)) and (int(test_fixture) < int(256))
		True
		>>>

	Testcase 3: Class McastHEAR should be stable.
		A: Test that the hear component is initialized.
		B: Tests that McastHEAR instantiates without error, as in trivial use-case.

		>>> multicast.hear is not None
		True
		>>> hear = multicast.hear.McastHEAR()
		>>> isinstance(hear, multicast.hear.McastHEAR)
		True
		>>>

"""

__package__ = "multicast"  # skipcq: PYL-W0622
"""Names the package of this program.

	Minimal Acceptance Testing:

	First set up test fixtures by importing multicast.

	Testcase 0: Multicast should be importable.

		>>> import multicast
		>>>

	Testcase 1: Hear should be automatically imported.

		>>> multicast.hear.__package__ is not None
		True
		>>>
		>>> multicast.hear.__package__ == multicast.__package__
		True
		>>>


"""

__module__ = "multicast"
"""Names the module of this program.

	Minimal Acceptance Testing:

	First set up test fixtures by importing multicast.

	Testcase 0: Multicast should be importable.

		>>> import multicast
		>>>

	Testcase 1: Hear should be automatically imported.

		>>> multicast.hear.__module__ is not None
		True
		>>>


"""

__file__ = "multicast/hear.py"
"""Names the file of this component."""

__name__ = "multicast.hear"  # skipcq: PYL-W0622
"""Names this component.

	Minimal Acceptance Testing:

	First set up test fixtures by importing multicast.

	Testcase 0: Multicast should be importable.

		>>> import multicast
		>>>

	Testcase 1: Hear should be automatically imported.

		>>> multicast.hear.__name__ is not None
		True
		>>>


"""

try:
	import sys
	if 'multicast' not in sys.modules:
		# skipcq
		from . import multicast as multicast  # pylint: disable=cyclic-import - skipcq: PYL-C0414
	else:  # pragma: no branch
		multicast = sys.modules["multicast"]
	_BLANK = multicast._BLANK  # skipcq: PYL-W0212 - module ok
	# skipcq
	from . import recv as recv  # pylint: disable=useless-import-alias  -  skipcq: PYL-C0414
	# skipcq
	from . import send as send  # pylint: disable=useless-import-alias  -  skipcq: PYL-C0414
except Exception as _cause:
	del _cause  # skipcq - cleanup any error leaks early
	# skipcq
	import multicast as multicast  # pylint: disable=cyclic-import - skipcq: PYL-R0401, PYL-C0414

try:
	import logging
	import threading
	import socketserver
	import warnings
	from multicast import argparse as _argparse
	from multicast import unicodedata as _unicodedata
	from multicast import socket as _socket
	from multicast import struct as _struct
	depends = [_unicodedata, _socket, _struct, _argparse]
	for unit in depends:
		try:
			if unit.__name__ is None:  # pragma: no branch
				raise ModuleNotFoundError(
					f"[CWE-440] module failed to import {str(unit)}."
				) from None
		except Exception as _root_cause:  # pragma: no branch
			raise ModuleNotFoundError("[CWE-758] Module failed completely.") from _root_cause
except Exception as _cause:  # pragma: no branch
	raise ImportError(_cause) from _cause


module_logger = logging.getLogger(__name__)
module_logger.debug(
	"Loading %s",  # lazy formatting to avoid PYL-W1203
	__name__,
)


[docs] class McastServer(socketserver.UDPServer): """ Generic Subclasses socketserver.UDPServer for handling '--daemon' function. Basically simplifies testing by allowing a trivial echo back (case-insensitive) of string data, after printing the sender's ip out. Minimal Acceptance Testing: First set up test fixtures by importing multicast. Testcase 0: Multicast should be importable. >>> import multicast >>> multicast.hear is not None True >>> from multicast.hear import McastServer as McastServer >>> Testcase 1: McastServer should be automatically imported. >>> McastServer.__name__ is not None True >>> """ __log_handle__ = """multicast.hear.McastServer""" # skipcq: PYL-W0622 """Names this server's Logger. Basically just the prefix of the logger's name. Subclasses should override. Minimal Acceptance Testing: First set up test fixtures by importing multicast. Testcase 0: Multicast should be importable. >>> import multicast >>> Testcase 1: McastServer should be automatically imported. >>> multicast.hear.McastServer.__name__ is not None True >>> multicast.hear.McastServer.__log_handle__ is not None True >>> """ def __init__( self, server_address: tuple, RequestHandlerClass: type, bind_and_activate: bool = True ) -> None: """ Initialize a new instance of the McastServer. Creates a new UDP server for multicast communication and sets up an appropriate logger based on the server address provided. May be extended, do not override. Returns: None Minimal Acceptance Testing: First set up test fixtures by importing multicast. Testcase 0: Multicast should be importable. >>> import socketserver >>> import multicast >>> Testcase 0: Basic initialization of McastServer. A: Test that McastServer can be initialized with minimal arguments. B: Test that the resulting instance is of the correct type. >>> server = multicast.hear.McastServer(('224.0.0.1', 12345), None) >>> isinstance(server, multicast.hear.McastServer) True >>> isinstance(server, socketserver.UDPServer) True >>> server.server_close() # Clean up >>> Testcase 1: Server initialization with logger name extraction. A: Test that the server extracts the logger name from server_address. B: Test that the logger is properly initialized. >>> test_addr = ('239.0.0.9', 23456) >>> server = multicast.hear.McastServer(test_addr, None) >>> server.logger is not None True >>> server.logger.name.endswith('239.0.0.9') True >>> server.server_close() # Clean up >>> """ logger_name = server_address[0] if server_address and len(server_address) > 0 else None if logger_name: # pragma: no branch self.__logger = logging.getLogger(f"{self.__log_handle__}.{logger_name}") else: self.__logger = logging.getLogger(f"{self.__log_handle__}") super().__init__(server_address, RequestHandlerClass, bind_and_activate)
[docs] def _sync_logger(self) -> None: """Synchronize the logger instance with the bound socket address. This internal method updates the instance's logger attribute based on the current socket address. It extracts the address component from the socket's bound address and uses it to create a hierarchical logger name in the format 'multicast.hear.McastServer.[address]'. If no valid address is found, it falls back to the base McastServer logger. This method is typically called after server_bind() to ensure the logger reflects the actual bound socket address. Note: This is an internal method and should not be called directly from outside the class. Args: None Returns: None Minimal Acceptance Testing: First set up test fixtures by importing multicast. Testcase 0: Multicast should be importable. >>> import types >>> import logging >>> import multicast >>> Testcase 1: Method exists and takes expected arguments. A: Test method exists in McastServer class. B: Test method signature does not accept arguments. >>> from multicast.hear import McastServer >>> hasattr(McastServer, '_sync_logger') True >>> import inspect >>> len(inspect.signature(McastServer._sync_logger).parameters) - 1 # Remove 'self' 0 >>> Testcase 2: Method handles case where socket has a valid address. A: Use a mock socket with a valid address. B: Verify logger name is properly formatted. >>> # Setup a server instance with mock socket >>> server = multicast.hear.McastServer(('239.0.0.9', 51234), None) >>> # Create mock socket with valid address >>> mock_socket = types.SimpleNamespace() >>> mock_socket.getsockname = lambda: ('239.0.0.9', 51234) >>> server.socket = mock_socket >>> # Call method and verify logger name >>> server._sync_logger() >>> server.logger.name 'multicast.hear.McastServer.239.0.0.9' >>> Testcase 3: Method handles case where socket address name component is None. A: Use a mock socket with None as the address component. B: Verify logger falls back to base logger name. >>> # Setup a server instance >>> server = multicast.hear.McastServer(('239.0.0.9', 51234), None) >>> # Create mock socket with None as the address >>> mock_socket = types.SimpleNamespace() >>> mock_socket.getsockname = lambda: (None, 5678) >>> server.socket = mock_socket >>> # Call method and verify logger name >>> server._sync_logger() >>> server.logger.name 'multicast.hear.McastServer' >>> Testcase 4: Method handles case where socket address has special formatting. A: Use a mock socket with an IPv6 address. B: Verify logger name incorporates the address correctly. >>> import multicast >>> from multicast.hear import McastServer >>> import types >>> import logging >>> # Setup a server instance >>> server = McastServer(('239.0.0.9', 51234), None) >>> # Create mock socket with IPv6 address format >>> mock_socket = types.SimpleNamespace() >>> mock_socket.getsockname = lambda: ('2001:db8::1', 5678) >>> server.socket = mock_socket >>> # Call method and verify logger name >>> server._sync_logger() >>> server.logger.name 'multicast.hear.McastServer.2001:db8::1' >>> """ (logger_name, _) = self.socket.getsockname() if logger_name: self.__logger = logging.getLogger(f"{self.__log_handle__}.{logger_name}") else: # pragma: no branch self.__logger = logging.getLogger(f"{self.__log_handle__}")
@property def logger(self) -> logging.Logger: """Getter for the logger attribute of McastServer. This property provides access to the server's internal logger instance. The logger name is determined during initialization and may be updated by calling _sync_logger when the server's socket address changes. Returns: logging.Logger -- The logger instance associated with this server. Minimal Acceptance Testing: First set up test fixtures by importing multicast. >>> import multicast >>> import logging >>> Testcase 0: Verify logger accessibility. A: Test that the logger property exists. B: Test that it's accessible from an McastServer instance. >>> server = multicast.hear.McastServer(("239.0.0.9", 0), multicast.hear.HearUDPHandler) >>> hasattr(server, 'logger') True >>> server.server_close() # Clean up >>> Testcase 1: Verify logger type. A: Test that the logger property returns a logging.Logger instance. >>> server = multicast.hear.McastServer(("239.0.0.9", 0), multicast.hear.HearUDPHandler) >>> isinstance(server.logger, logging.Logger) True >>> server.server_close() # Clean up >>> Testcase 2: Verify logger name. A: Test that the logger name includes the server class name. B: Test that it's properly formatted. >>> server = multicast.hear.McastServer(("239.0.0.9", 0), multicast.hear.HearUDPHandler) >>> server.logger.name.startswith('multicast.hear.McastServer') True >>> server.server_close() # Clean up >>> """ return self.__logger
[docs] def server_activate(self): """ Activate the server to begin handling requests. Overrides the base class method to set up the server after binding. Returns: None """ self.logger.info("server_activate") with warnings.catch_warnings(): warnings.simplefilter("ignore", category=ResourceWarning) self.open_for_request() super(McastServer, self).server_activate()
[docs] def open_for_request(self): """ Prepare the server to accept requests. Overrides the base class method to set up a new listening UDP socket before the server starts processing requests. UDP Sockets are considered ephemeral. Sequentially, the old socket is recycled, or replaced, yielding a fungable socket, with the same port and bound ip, which is then used to join the same multicast group(s), at which point the new socket has transparently replaced the old socket. Returns: None """ self.logger.info("open_request") # enter critical section old_socket = self.socket (tmp_addr, tmp_prt) = old_socket.getsockname() multicast.endSocket(old_socket) self.socket = recv.joinstep([tmp_addr], tmp_prt, None, tmp_addr, multicast.genSocket()) old_socket = None # release for GC
# exit critical section
[docs] def server_bind(self): """ Bind the server to the specified address. Overrides the base class method to handle multicast group binding. Returns: None """ self.logger.info("server_bind") super(McastServer, self).server_bind() self._sync_logger() # enter critical section self.logger.info( "bound on: %s", # lazy formatting to avoid PYL-W1203 str(self.socket.getsockname()), )
# exit critical section
[docs] def close_request(self, request): """ Clean up after handling a request. Overrides the base class method to call open_for_request to close and regenerate the UDP socket, in addition to closing the request as normal. Args: request: The request object to close. Returns: None """ self.logger.info("close_request") with warnings.catch_warnings(): warnings.simplefilter("ignore", category=ResourceWarning) self.open_for_request() super(McastServer, self).close_request(request)
[docs] def handle_error(self, request, client_address): """ Handle errors that occur during request processing. Overrides the base class method to handle requests with STOP in them, resulting in a graceful server shutdown. Otherwise forwards the call to super. Args: request: The request being handled when the error occurred. client_address: The client address associated with the request. Returns: None """ self.logger.info("handle_error") if request is not None and request[0] is not None and "STOP" in str(request[0]): def kill_func(a_server): """ Terminate the server. Args: a_server: The server instance to terminate. Returns: None """ if a_server is not None: a_server.shutdown() end_thread = threading.Thread(name="Kill_Thread", target=kill_func, args=[self]) end_thread.start() else: super(McastServer, self).handle_error(request, client_address)
[docs] class HearUDPHandler(socketserver.BaseRequestHandler): """ Subclass of socketserver.BaseRequestHandler for handling the HEAR function. Basically simplifies testing by allowing a simple HEAR back (case-insensitive) of string data, after printing the sender's ip out. Minimal Acceptance Testing: First set up test fixtures by importing multicast. Testcase 0: Multicast should be importable. >>> import multicast >>> multicast.hear is not None True >>> from multicast.hear import HearUDPHandler as HearUDPHandler >>> Testcase 1: HearUDPHandler should be automatically imported. >>> HearUDPHandler.__name__ is not None True >>> """ __name__ = "multicast.hear.HearUDPHandler" # skipcq: PYL-W0622 """Names this handler type. Minimal Acceptance Testing: First set up test fixtures by importing multicast. Testcase 0: Multicast should be importable. >>> import multicast >>> Testcase 1: HearUDPHandler should be automatically imported. >>> multicast.hear.HearUDPHandler.__name__ is not None True >>> """
[docs] def handle(self) -> None: """ Handles incoming UDP requests in the HEAR functionality. Overrides the base class method to define how incoming data is handled. By default: Processes the incoming data from the client, logs the messages, and sends a response back. If the data contains the keyword "STOP", it raises a `ShutdownCommandReceived` to initiate server shutdown. Silently ignores any UnicodeDecodeError when decoding data. Returns early if data or socket is None. Args: None Returns: None Raises: multicast.exceptions.ShutdownCommandReceived: When "STOP" is detected in incoming data. Minimal Acceptance Testing: First set up test fixtures by importing multicast. >>> import multicast >>> Testcase 0: Ensure `HearUDPHandler` can be imported. >>> import multicast >>> from multicast.hear import HearUDPHandler >>> HearUDPHandler.__name__ is not None True >>> Testcase 1: Verify the `handle` method exists. >>> handler = HearUDPHandler( ... request=('Test data', None), client_address=('192.0.2.1', 51111), server=None ... ) >>> hasattr(handler, 'handle') True >>> Testcase 2: `handle` requires valid requests or ignores input. >>> handler.request = ("No-Op", None) >>> handler.client_address = ("192.0.2.2", 51234) >>> handler.handle() is None True >>> Testcase 3: `handle` requires valid requests or ignores input. >>> tst_fixture_sock = multicast.genSocket() >>> handler.request = ("The Test", tst_fixture_sock) >>> handler.client_address = ("224.0.1.2", 51234) >>> handler.handle() is None True >>> >>> multicast.endSocket(tst_fixture_sock) >>> Testcase 4: `handle` raises on valid STOP requests. >>> tst_fixture_sock = multicast.genSocket() >>> handler.request = ("The Test is STOP", tst_fixture_sock) >>> handler.client_address = ("224.0.1.3", 54321) >>> try: ... handler.handle() ... except multicast.exceptions.ShutdownCommandReceived: ... print("ShutdownCommandReceived raised") ShutdownCommandReceived raised >>> >>> multicast.endSocket(tst_fixture_sock) >>> """ (data, sock) = self.request if data is None or not sock: # pragma: no branch return # nothing to do -- fail fast. # skipcq: PYL-R1705 -- otherwise can try to decode try: data = data.decode('utf8') if isinstance(data, bytes) else str(data) except UnicodeDecodeError: # pragma: no cover -- defensive code branch if __debug__: module_logger.debug( "Received invalid UTF-8 data from %s", # lazy formatting to avoid PYL-W1203 self.client_address[0], ) return # silently ignore invalid UTF-8 data -- fail quickly. _logger = logging.getLogger(f"{type(self).__module__}.{type(self).__qualname__}") if __debug__: _logger.info( "%s SAYS: %s to ALL", # lazy formatting to avoid PYL-W1203 str(self.client_address[0]), data.strip(), ) me = str(sock.getsockname()[0]) _sender: multicast.send.McastSAY = None _sender = send.McastSAY() if __debug__: # pragma: no cover -- defensive code branch _what = data.strip().replace("""\r""", str()).replace("""%""", """%%""") _logger.info( "%s HEAR: [%s SAID %s]", # lazy formatting to avoid PYL-W1203 str(me), str(self.client_address), str(_what), ) _logger.info( "%s SAYS [ HEAR [ {%s SAID %s ] from %s ]", # lazy formatting to avoid PYL-W1203 str(me), str(_what), str(self.client_address), str(me), ) _sender._sayStep( # skipcq: PYL-W0212 - module ok self.client_address[0], self.client_address[1], f"HEAR [ {data.upper()} SAID {self.client_address} ] from {me}" # noqa ) if "STOP" in str(data): raise multicast.exceptions.ShutdownCommandReceived("SHUTDOWN") from None
[docs] class McastHEAR(multicast.mtool): """ Provides the HEAR tooling by subclassing multicast.mtool. This class sets up a multicast server that listens for messages and processes them accordingly. Testing: Testcase 0: First set up test fixtures by importing multicast. >>> import multicast >>> multicast.hear is not None True >>> multicast._MCAST_DEFAULT_PORT is not None True >>> multicast._MCAST_DEFAULT_GROUP is not None True >>> multicast._MCAST_DEFAULT_TTL is not None True >>> multicast.hear.McastHEAR is not None True >>> Testcase 2: Recv should be detailed with some metadata. A: Test that the __MAGIC__ variables are initialized. B: Test that the __MAGIC__ variables are strings. >>> multicast.hear is not None True >>> multicast.hear.McastHEAR is not None True >>> multicast.hear.McastHEAR.__module__ is not None True >>> multicast.hear.McastHEAR.__proc__ is not None True >>> multicast.hear.McastHEAR.__epilogue__ is not None True >>> multicast.hear.McastHEAR.__prologue__ is not None True >>> """ __module__ = "multicast.hear" __name__ = "multicast.hear.McastHEAR" __proc__ = "HEAR" __epilogue__ = """Generally speaking you want to bind to one of the groups you joined in this module/instance, but it is also possible to bind to group which is added by some other programs (like another python program instance of this) """ __prologue__ = "Python Multicast Server for multicast input."
[docs] @classmethod def setupArgs(cls, parser): """ Ignored for this subclass of mtool. See multicast.__main__.McastRecvHearDispatch.setupArgs instead. Args: parser (argparse.ArgumentParser): ignored. Returns: None: This method does not return a value. Note: This is trivial implementation make this an optional abstract method. """ pass # skipcq - Optional abstract method
[docs] def doStep(self, *args, **kwargs): """ Execute the HEAR operation for multicast communication. Overrides the `doStep` method from `mtool` to set up a server that listens for multicast messages and processes them accordingly. Args: *args: Variable length argument list containing command-line arguments. **kwargs: Arbitrary keyword arguments. Returns: tuple: A tuple containing a status indicator and an optional result message. """ _logger = logging.getLogger(f"{type(self).__module__}.{type(self).__qualname__}") _logger.debug(McastHEAR.__proc__) HOST = kwargs.get("group", multicast._MCAST_DEFAULT_GROUP) # skipcq: PYL-W0212 - module ok PORT = kwargs.get("port", multicast._MCAST_DEFAULT_PORT) # skipcq: PYL-W0212 - module ok server_initialized = False server = None try: _logger.debug( "Initializing server on port %d as %s.", # lazy formatting to avoid PYL-W1203 PORT, HOST, ) with McastServer((HOST, PORT), HearUDPHandler) as server: server_initialized = True server.serve_forever() except KeyboardInterrupt as _cause: try: if server and server.socket: # pragma: no cover old_sock = server.socket multicast.endSocket(old_sock) finally: raise KeyboardInterrupt( f"HEAR has stopped due to interruption signal, was previously listening on ({HOST}, {PORT}).", ) from _cause finally: _logger.debug( "Finalizing server with port %d from %s.", # lazy formatting to avoid PYL-W1203 PORT, HOST, ) if server: # pragma: no cover # deadlocks if not called by other thread end_it = threading.Thread(name="Kill_Thread", target=server.shutdown, args=[]) end_it.start() end_it.join(1) if __debug__: if server_initialized: module_logger.debug( "HEAR result was %s. Reporting success.", # lazy formatting to avoid PYL-W1203 server_initialized, ) else: module_logger.debug( "HEAR result was %s. Reporting failure.", # lazy formatting to avoid PYL-W1203 server_initialized, ) return (server_initialized, None)