#! /usr/bin/env python3
# -*- coding: utf-8 -*-
# Python Multicast Repo
# ..................................
# Copyright (c) 2017-2025, Mr. Walls
# ..................................
# Licensed under MIT (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# ..........................................
# http://www.github.com/reactive-firewall/python-repo/LICENSE.md
# ..........................................
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Provides multicast HEAR Features.
Provides functionality to listen to and process multicast messages.
Caution: See details regarding dynamic imports [documented](../__init__.py) in this module.
Minimal Acceptance Testing:
First set up test fixtures by importing multicast.
Testcase 0: Multicast should be importable.
>>> import multicast
>>> multicast.hear is not None
True
>>> multicast.__doc__ is not None
True
>>>
Testcase 1: Recv should be automatically imported.
A: Test that the multicast component is initialized.
B: Test that the hear component is initialized.
C: Test that the hear component has __doc__
>>> multicast is not None
True
>>> multicast.hear is not None
True
>>> multicast.hear.__doc__ is not None
True
>>> type(multicast.hear.__doc__) == type(str(''''''))
True
>>>
Testcase 2: Recv should be detailed with some metadata.
A: Test that the __MAGIC__ variables are initialized.
B: Test that the __MAGIC__ variables are strings.
>>> multicast.hear is not None
True
>>> multicast.hear.__module__ is not None
True
>>> multicast.hear.__package__ is not None
True
>>> type(multicast.hear.__doc__) == type(multicast.recv.__module__)
True
>>>
Testcase 3: main should return an int.
A: Test that the multicast component is initialized.
B: Test that the hear component is initialized.
C: Test that the main(HEAR) function-flow is initialized.
D: Test that the main(HEAR) function-flow returns an int 0-256.
>>> multicast.__main__ is not None
True
>>> multicast.__main__.main is not None
True
>>> tst_fxtr_args = ['''HEAR''', '''--port=1234''']
>>> (test_fixture, ignored_value) = multicast.__main__.main(tst_fxtr_args)
>>> test_fixture is not None
True
>>> type(test_fixture) #doctest: -DONT_ACCEPT_BLANKLINE, +ELLIPSIS
<...int...>
>>> int(test_fixture) >= int(0)
True
>>> type(test_fixture) is type(0)
True
>>> int(test_fixture) < int(256)
True
>>> (int(test_fixture) >= int(0)) and (int(test_fixture) < int(256))
True
>>>
"""
__package__ = """multicast""" # skipcq: PYL-W0622
"""Names the package of this program.
Minimal Acceptance Testing:
First set up test fixtures by importing multicast.
Testcase 0: Multicast should be importable.
>>> import multicast
>>>
Testcase 1: Hear should be automatically imported.
>>> multicast.hear.__package__ is not None
True
>>>
>>> multicast.hear.__package__ == multicast.__package__
True
>>>
"""
__module__ = """multicast"""
"""Names the module of this program.
Minimal Acceptance Testing:
First set up test fixtures by importing multicast.
Testcase 0: Multicast should be importable.
>>> import multicast
>>>
Testcase 1: Hear should be automatically imported.
>>> multicast.hear.__module__ is not None
True
>>>
"""
__file__ = """multicast/hear.py"""
"""Names the file of this component."""
__name__ = """multicast.hear""" # skipcq: PYL-W0622
"""Names this component.
Minimal Acceptance Testing:
First set up test fixtures by importing multicast.
Testcase 0: Multicast should be importable.
>>> import multicast
>>>
Testcase 1: Hear should be automatically imported.
>>> multicast.hear.__name__ is not None
True
>>>
"""
try:
import sys as _sys
if 'multicast' not in _sys.modules:
from . import multicast as multicast # pylint: disable=cyclic-import - skipcq: PYL-C0414
else: # pragma: no branch
multicast = _sys.modules["""multicast"""]
_BLANK = multicast._BLANK # skipcq: PYL-W0212 - module ok
# skipcq
from . import recv as recv # pylint: disable=useless-import-alias - skipcq: PYL-C0414
# skipcq
from . import send as send # pylint: disable=useless-import-alias - skipcq: PYL-C0414
except Exception as importErr:
del importErr # skipcq - cleanup any error leaks early
# skipcq
import multicast as multicast # pylint: disable=cyclic-import - skipcq: PYL-R0401, PYL-C0414
try:
import threading
import socketserver
import warnings
from multicast import argparse as _argparse
from multicast import unicodedata as _unicodedata
from multicast import socket as _socket
from multicast import struct as _struct
depends = [
_unicodedata, _socket, _struct, _argparse
]
for unit in depends:
try:
if unit.__name__ is None: # pragma: no branch
raise ModuleNotFoundError(
str("[CWE-440] module failed to import {}.").format(str(unit))
) from None
except Exception: # pragma: no branch
raise ModuleNotFoundError(str("[CWE-758] Module failed completely.")) from None
except Exception as err:
raise ImportError(err) from err
[docs]
class McastServer(socketserver.UDPServer):
"""
Generic Subclasses socketserver.UDPServer for handling '--daemon' function.
Basically simplifies testing by allowing a trivial echo back (case-insensitive) of string
data, after printing the sender's ip out.
Minimal Acceptance Testing:
First set up test fixtures by importing multicast.
Testcase 0: Multicast should be importable.
>>> import multicast
>>> multicast.hear is not None
True
>>> from multicast.hear import McastServer as McastServer
>>>
Testcase 1: McastServer should be automatically imported.
>>> McastServer.__name__ is not None
True
>>>
"""
[docs]
def server_activate(self):
"""
Activate the server to begin handling requests.
Overrides the base class method to set up the server after binding.
Returns:
None
"""
print(str("server_activate"))
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ResourceWarning)
self.open_for_request()
super(McastServer, self).server_activate()
[docs]
def open_for_request(self):
"""
Prepare the server to accept requests.
Overrides the base class method to set up a new listening UDP socket before the
server starts processing requests.
UDP Sockets are considered ephemeral.
Sequentially, the old socket is recycled, or replaced, yielding a fungable socket, with the
same port and bound ip, which is then used to join the same multicast group(s), at which
point the new socket has transparently replaced the old socket.
Returns:
None
"""
print(str("open_request"))
# enter critical section
old_socket = self.socket
(tmp_addr, tmp_prt) = old_socket.getsockname()
multicast.endSocket(old_socket)
self.socket = recv.joinstep([tmp_addr], tmp_prt, None, tmp_addr, multicast.genSocket())
old_socket = None # release for GC
# exit critical section
[docs]
def server_bind(self):
"""
Bind the server to the specified address.
Overrides the base class method to handle multicast group binding.
Returns:
None
"""
print(str("server_bind"))
super(McastServer, self).server_bind()
# enter critical section
print(str("bound on: {}").format(str(self.socket.getsockname())))
# exit critical section
[docs]
def close_request(self, request):
"""
Clean up after handling a request.
Overrides the base class method to call open_for_request to close and regenerate the
UDP socket, in addition to closing the request as normal.
Args:
request: The request object to close.
Returns:
None
"""
print(str("close_request"))
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=ResourceWarning)
self.open_for_request()
super(McastServer, self).close_request(request)
[docs]
def handle_error(self, request, client_address):
"""
Handle errors that occur during request processing.
Overrides the base class method to handle requests with STOP in them,
resulting in a graceful server shutdown. Otherwise forwards the call to super.
Args:
request: The request being handled when the error occurred.
client_address: The client address associated with the request.
Returns:
None
"""
print(str("handle_error"))
if request is not None and request[0] is not None and """STOP""" in str(request[0]):
def kill_func(a_server):
"""
Terminate the server.
Args:
a_server: The server instance to terminate.
Returns:
None
"""
if a_server is not None:
a_server.shutdown()
end_thread = threading.Thread(name="Kill_Thread", target=kill_func, args=[self])
end_thread.start()
else:
super(McastServer, self).handle_error(request, client_address)
[docs]
class HearUDPHandler(socketserver.BaseRequestHandler):
"""
Subclass of socketserver.BaseRequestHandler for handling the HEAR function.
Basically simplifies testing by allowing a simple HEAR back (case-insensitive) of string
data, after printing the sender's ip out.
Minimal Acceptance Testing:
First set up test fixtures by importing multicast.
Testcase 0: Multicast should be importable.
>>> import multicast
>>> multicast.hear is not None
True
>>> from multicast.hear import HearUDPHandler as HearUDPHandler
>>>
Testcase 1: HearUDPHandler should be automatically imported.
>>> HearUDPHandler.__name__ is not None
True
>>>
"""
[docs]
def handle(self):
"""
Handles incoming UDP requests in the HEAR functionality.
Overrides the base class method to define how incoming data is handled.
By default:
Processes the incoming data from the client, logs the messages,
and sends a response back. If the data contains the
keyword "STOP", it raises a `RuntimeError` to
initiate server shutdown.
Silently ignores any UnicodeDecodeError when decoding data.
Returns early if data or socket is None.
Minimal Acceptance Testing:
First set up test fixtures by importing multicast.
>>> import multicast
>>>
Testcase 0: Ensure `HearUDPHandler` can be imported.
>>> import multicast
>>> from multicast.hear import HearUDPHandler
>>> HearUDPHandler.__name__ is not None
True
>>>
Testcase 1: Verify the `handle` method exists.
>>> handler = HearUDPHandler(
... request=('Test data', None), client_address=('192.0.2.1', 51111), server=None
... )
>>> hasattr(handler, 'handle')
True
>>>
Testcase 2: `handle` requires valid requests or ignores input.
>>> handler.request = ("No-Op", None)
>>> handler.client_address = ("192.0.2.2", 51234)
>>> handler.handle() is None
True
>>>
Testcase 3: `handle` requires valid requests or ignores input.
>>> handler.request = ("The Test", multicast.genSocket())
>>> handler.client_address = ("224.0.1.2", 51234)
>>> handler.handle() is None
True
>>>
"""
(data, sock) = self.request
if data is None or not sock:
return # nothing to do -- fail fast.
else:
try:
data = data.decode('utf8') if isinstance(data, bytes) else str(data)
except UnicodeDecodeError: # pragma: no cover
return # silently ignore invalid UTF-8 data -- fail quickly.
if (_sys.stdout.isatty()): # pragma: no cover
print(f"{self.client_address[0]} SAYS: {data.strip()} to ALL")
if data is not None:
myID = str(sock.getsockname()[0])
if (_sys.stdout.isatty()): # pragma: no cover
_sim_data_str = data.strip().replace("""\r""", str()).replace("""%""", """%%""")
print(str("{me} HEAR: [{you} SAID {what}]").format(
me=myID, you=self.client_address, what=str(_sim_data_str)
))
print(str("{me} SAYS [ HEAR [ {what} SAID {you} ] from {me} ]").format(
me=myID, you=self.client_address, what=str(_sim_data_str)
))
send.McastSAY()._sayStep( # skipcq: PYL-W0212 - module ok
self.client_address[0], self.client_address[1],
str("HEAR [ {what} SAID {you} ] from {me}").format(
me=myID, you=self.client_address, what=data.upper()
)
)
if """STOP""" in str(data):
raise multicast.exceptions.ShutdownCommandReceived("SHUTDOWN") from None
[docs]
class McastHEAR(multicast.mtool):
"""
Provides the HEAR tooling by subclassing multicast.mtool.
This class sets up a multicast server that listens for messages and processes them accordingly.
Testing:
Testcase 0: First set up test fixtures by importing multicast.
>>> import multicast
>>> multicast.hear is not None
True
>>> multicast._MCAST_DEFAULT_PORT is not None
True
>>> multicast._MCAST_DEFAULT_GROUP is not None
True
>>> multicast._MCAST_DEFAULT_TTL is not None
True
>>> multicast.hear.McastHEAR is not None
True
>>>
Testcase 2: Recv should be detailed with some metadata.
A: Test that the __MAGIC__ variables are initialized.
B: Test that the __MAGIC__ variables are strings.
>>> multicast.hear is not None
True
>>> multicast.hear.McastHEAR is not None
True
>>> multicast.hear.McastHEAR.__module__ is not None
True
>>> multicast.hear.McastHEAR.__proc__ is not None
True
>>> multicast.hear.McastHEAR.__epilogue__ is not None
True
>>> multicast.hear.McastHEAR.__prologue__ is not None
True
>>>
"""
__module__ = """multicast.hear"""
__name__ = """multicast.hear.McastHEAR"""
__proc__ = """HEAR"""
__epilogue__ = """Generally speaking you want to bind to one of the groups you joined in
this module/instance, but it is also possible to bind to group which
is added by some other programs (like another python program instance of this)
"""
__prologue__ = """Python Multicast Server for multicast input."""
[docs]
@classmethod
def setupArgs(cls, parser):
pass # skipcq - Optional abstract method
[docs]
def doStep(self, *args, **kwargs):
"""
Execute the HEAR operation for multicast communication.
Overrides the `doStep` method from `mtool` to set up a server that listens for
multicast messages and processes them accordingly.
Args:
*args: Variable length argument list containing command-line arguments.
**kwargs: Arbitrary keyword arguments.
Returns:
tuple: A tuple containing a status indicator and an optional result message.
"""
HOST = kwargs.get("group", multicast._MCAST_DEFAULT_GROUP) # skipcq: PYL-W0212 - module ok
PORT = kwargs.get("port", multicast._MCAST_DEFAULT_PORT) # skipcq: PYL-W0212 - module ok
server_initialized = False
server = None
try:
with McastServer((HOST, PORT), HearUDPHandler) as server:
server_initialized = True
server.serve_forever()
except KeyboardInterrupt as userInterrupt:
try:
if server and server.socket: # pragma: no cover
old_sock = server.socket
multicast.endSocket(old_sock)
finally:
raise KeyboardInterrupt(
f"HEAR has stopped due to interruption signal (was previously listening on ({HOST}, {PORT}))."
) from userInterrupt
finally:
if server: # pragma: no cover
server.shutdown()
return (server_initialized, None)