From 1b7b4ec7a41946849eadcd8ae4b276a42fedfa8e Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Wed, 28 Apr 2021 02:05:15 +0200 Subject: trx_toolkit: use RxMsg/TxMsg instead of TRX2L1/L12TRX I intentionally do not use 'Downlink' and 'Uplink' terms in this project because both MS and BTS transmit and receive on the opposite directions. A burst coming from demodulator may be a Downlink or an Uplink burst depending on the context, so we definitely need more precise terms. Back then when I started to work on TRX toolkit, I decided to use the 'TRX2L1' and 'L12TRX' for receive and transmit directions respectively. Now I find them hard to read, so let's replace them with 'Rx' and 'Tx'. Change-Id: I688f24a3c09dd7e1cc00b5530ec26c8e8cfd8f7c Related: OS#4006, SYS#4895 --- src/target/trx_toolkit/burst_fwd.py | 8 +-- src/target/trx_toolkit/burst_gen.py | 6 +-- src/target/trx_toolkit/burst_send.py | 9 ++-- src/target/trx_toolkit/ctrl_if_trx.py | 4 +- src/target/trx_toolkit/data_dump.py | 22 ++++---- src/target/trx_toolkit/data_if.py | 24 ++++----- src/target/trx_toolkit/data_msg.py | 34 ++++++------- src/target/trx_toolkit/fake_trx.py | 2 +- src/target/trx_toolkit/test_data_dump.py | 28 +++++------ src/target/trx_toolkit/test_data_msg.py | 86 ++++++++++++++++---------------- src/target/trx_toolkit/transceiver.py | 2 +- src/target/trx_toolkit/trx_sniff.py | 13 +++-- 12 files changed, 117 insertions(+), 121 deletions(-) (limited to 'src') diff --git a/src/target/trx_toolkit/burst_fwd.py b/src/target/trx_toolkit/burst_fwd.py index 0b69fe2a..2e9e97b5 100644 --- a/src/target/trx_toolkit/burst_fwd.py +++ b/src/target/trx_toolkit/burst_fwd.py @@ -37,8 +37,8 @@ class BurstForwarder(TRXList): - actual RX / TX frequencies, - list of active timeslots. - Each to be distributed L12TRX message is being transformed - into a TRX2L1 message, and then forwarded to transceivers + Each to be distributed 'TxMsg' message is being transformed + into a 'RxMsg' message, and then forwarded to transceivers with partially initialized header. All uninitialized header fields (such as rssi and toa256) shall be set by each transceiver individually before sending towards the L1. @@ -69,6 +69,6 @@ class BurstForwarder(TRXList): if trx.get_rx_freq(rx_msg.fn) != tx_freq: continue - # Transform from L12TRX to TRX2L1 and forward - tx_msg = rx_msg.gen_trx2l1(ver = trx.data_if._hdr_ver) + # Transform from TxMsg to RxMsg and forward + tx_msg = rx_msg.trans(ver = trx.data_if._hdr_ver) trx.handle_data_msg(src_trx, rx_msg, tx_msg) diff --git a/src/target/trx_toolkit/burst_gen.py b/src/target/trx_toolkit/burst_gen.py index 7f154b33..f93d8685 100755 --- a/src/target/trx_toolkit/burst_gen.py +++ b/src/target/trx_toolkit/burst_gen.py @@ -68,9 +68,9 @@ class Application(ApplicationBase): # Init an empty DATA message if self.argv.conn_mode == "TRX": - msg = DATAMSG_L12TRX(ver = self.argv.hdr_ver) + msg = TxMsg(ver = self.argv.hdr_ver) elif self.argv.conn_mode == "L1": - msg = DATAMSG_TRX2L1(ver = self.argv.hdr_ver) + msg = RxMsg(ver = self.argv.hdr_ver) # Generate a random frame number or use provided one fn_init = msg.rand_fn() if self.argv.tdma_fn is None \ @@ -176,7 +176,7 @@ class Application(ApplicationBase): help = "How many bursts to send (default %(default)s)") bg_group.add_argument("-v", "--hdr-version", metavar = "VER", dest = "hdr_ver", type = int, - default = 0, choices = DATAMSG.KNOWN_VERSIONS, + default = 0, choices = Msg.KNOWN_VERSIONS, help = "TRXD header version (default %(default)s)") bg_group.add_argument("-f", "--frame-number", metavar = "FN", dest = "tdma_fn", type = int, diff --git a/src/target/trx_toolkit/burst_send.py b/src/target/trx_toolkit/burst_send.py index 717414db..4e165711 100755 --- a/src/target/trx_toolkit/burst_send.py +++ b/src/target/trx_toolkit/burst_send.py @@ -79,11 +79,10 @@ class Application(ApplicationBase): def msg_pass_filter(self, msg): # Direction filter - l12trx = self.argv.conn_mode == "TRX" - if isinstance(msg, DATAMSG_L12TRX) and not l12trx: - return False - elif isinstance(msg, DATAMSG_TRX2L1) and l12trx: - return False + if isinstance(msg, RxMsg) and self.argv.conn_mode == "TRX": + return False # cannot send RxMsg to TRX + if isinstance(msg, TxMsg) and self.argv.conn_mode == "L1": + return False # cannot send TxMsg to L1 # Timeslot filter if self.argv.pf_tn is not None: diff --git a/src/target/trx_toolkit/ctrl_if_trx.py b/src/target/trx_toolkit/ctrl_if_trx.py index 22d4b4e8..c03dcc60 100644 --- a/src/target/trx_toolkit/ctrl_if_trx.py +++ b/src/target/trx_toolkit/ctrl_if_trx.py @@ -25,7 +25,7 @@ import logging as log from ctrl_if import CTRLInterface -from data_msg import DATAMSG +from data_msg import Msg class CTRLInterfaceTRX(CTRLInterface): """ CTRL interface handler for common transceiver management commands. @@ -222,7 +222,7 @@ class CTRLInterfaceTRX(CTRLInterface): # ... and store current for logging ver_cur = self.trx.data_if._hdr_ver - if ver_req < 0 or ver_req > DATAMSG.CHDR_VERSION_MAX: + if ver_req < 0 or ver_req > Msg.CHDR_VERSION_MAX: log.error("(%s) Incorrect TRXD header version %u" % (self.trx, ver_req)) return -1 diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py index 9a685563..8510e2d0 100644 --- a/src/target/trx_toolkit/data_dump.py +++ b/src/target/trx_toolkit/data_dump.py @@ -28,18 +28,18 @@ from data_msg import * class DATADump: # Constants - TAG_L12TRX = b'\x01' - TAG_TRX2L1 = b'\x02' + TAG_TxMsg = b'\x01' + TAG_RxMsg = b'\x02' HDR_LENGTH = 3 # Generates raw bytes from a DATA message # Return value: raw message bytes def dump_msg(self, msg): # Determine a message type - if isinstance(msg, DATAMSG_L12TRX): - tag = self.TAG_L12TRX - elif isinstance(msg, DATAMSG_TRX2L1): - tag = self.TAG_TRX2L1 + if isinstance(msg, TxMsg): + tag = self.TAG_TxMsg + elif isinstance(msg, RxMsg): + tag = self.TAG_RxMsg else: raise ValueError("Unknown message type") @@ -61,12 +61,10 @@ class DATADump: tag = hdr[:1] # Check if tag is known - if tag == self.TAG_L12TRX: - # L1 -> TRX - msg = DATAMSG_L12TRX() - elif tag == self.TAG_TRX2L1: - # TRX -> L1 - msg = DATAMSG_TRX2L1() + if tag == self.TAG_TxMsg: + msg = TxMsg() + elif tag == self.TAG_RxMsg: + msg = RxMsg() else: # Unknown tag return False diff --git a/src/target/trx_toolkit/data_if.py b/src/target/trx_toolkit/data_if.py index 07f3d323..1cded9bb 100644 --- a/src/target/trx_toolkit/data_if.py +++ b/src/target/trx_toolkit/data_if.py @@ -35,7 +35,7 @@ class DATAInterface(UDPLink): log.debug("Init TRXD interface (%s)" % self.desc_link()) def set_hdr_ver(self, ver): - if not ver in DATAMSG.KNOWN_VERSIONS: + if not ver in Msg.KNOWN_VERSIONS: return False self._hdr_ver = ver @@ -43,7 +43,7 @@ class DATAInterface(UDPLink): def pick_hdr_ver(self, ver_req): # Pick a version that is lower or equal to ver_req - for ver in DATAMSG.KNOWN_VERSIONS[::-1]: + for ver in Msg.KNOWN_VERSIONS[::-1]: if ver <= ver_req: return ver @@ -64,16 +64,16 @@ class DATAInterface(UDPLink): data, _ = self.sock.recvfrom(512) return data - def recv_l12trx_msg(self): + def recv_tx_msg(self): # Read raw data from socket data = self.recv_raw_data() - # Attempt to parse as a L12TRX message + # Attempt to parse a TRXD Tx message try: - msg = DATAMSG_L12TRX() + msg = TxMsg() msg.parse_msg(bytearray(data)) except: - log.error("Failed to parse a L12TRX message " + log.error("Failed to parse a TRXD Tx message " "from R:%s:%u" % (self.remote_addr, self.remote_port)) return None @@ -84,16 +84,16 @@ class DATAInterface(UDPLink): return msg - def recv_trx2l1_msg(self): + def recv_rx_msg(self): # Read raw data from socket data = self.recv_raw_data() - # Attempt to parse as a L12TRX message + # Attempt to parse a TRXD Rx message try: - msg = DATAMSG_TRX2L1() + msg = RxMsg() msg.parse_msg(bytearray(data)) except: - log.error("Failed to parse a TRX2L1 message " + log.error("Failed to parse a TRXD Rx message " "from R:%s:%u" % (self.remote_addr, self.remote_port)) return None @@ -106,10 +106,10 @@ class DATAInterface(UDPLink): def send_msg(self, msg, legacy = False): try: - # Validate and encode TRXD message + # Validate and encode a TRXD message payload = msg.gen_msg(legacy) except ValueError as e: - log.error("Failed to encode a TRX2L1 message ('%s') " + log.error("Failed to encode a TRXD message ('%s') " "due to error: %s" % (msg.desc_hdr(), e)) # TODO: we may want to send a NOPE.ind here return diff --git a/src/target/trx_toolkit/data_msg.py b/src/target/trx_toolkit/data_msg.py index a1410fc5..72e248da 100644 --- a/src/target/trx_toolkit/data_msg.py +++ b/src/target/trx_toolkit/data_msg.py @@ -59,7 +59,7 @@ class Modulation(Enum): return mod return None -class DATAMSG(abc.ABC): +class Msg(abc.ABC): ''' TRXD (DATA) message coding API (common part). ''' # NOTE: up to 16 versions can be encoded @@ -228,8 +228,8 @@ class DATAMSG(abc.ABC): else: self.burst = None -class DATAMSG_L12TRX(DATAMSG): - ''' L12TRX (L1 -> TRX) message coding API. ''' +class TxMsg(Msg): + ''' Tx (L1 -> TRX) message coding API. ''' # Constants PWR_MIN = 0x00 @@ -257,7 +257,7 @@ class DATAMSG_L12TRX(DATAMSG): ''' Validate the message fields (throws ValueError). ''' # Validate common fields - DATAMSG.validate(self) + Msg.validate(self) if self.pwr is None: raise ValueError("Tx Attenuation level is not set") @@ -287,14 +287,14 @@ class DATAMSG_L12TRX(DATAMSG): def rand_hdr(self): ''' Randomize message specific header. ''' - DATAMSG.rand_hdr(self) + Msg.rand_hdr(self) self.pwr = self.rand_pwr() def desc_hdr(self): ''' Generate human-readable header description. ''' # Describe the common part - result = DATAMSG.desc_hdr(self) + result = Msg.desc_hdr(self) if self.pwr is not None: result += ("pwr=%u " % self.pwr) @@ -340,11 +340,11 @@ class DATAMSG_L12TRX(DATAMSG): ''' Generate a random message specific burst. ''' self.burst = [random.randint(0, 1) for _ in range(length)] - def gen_trx2l1(self, ver = None): - ''' Transform this message to TRX2L1 message. ''' + def trans(self, ver = None): + ''' Transform this message into RxMsg. ''' # Allocate a new message - msg = DATAMSG_TRX2L1(fn = self.fn, tn = self.tn, + msg = RxMsg(fn = self.fn, tn = self.tn, ver = self.ver if ver is None else ver) # Convert burst bits @@ -355,8 +355,8 @@ class DATAMSG_L12TRX(DATAMSG): return msg -class DATAMSG_TRX2L1(DATAMSG): - ''' TRX2L1 (TRX -> L1) message coding API. ''' +class RxMsg(Msg): + ''' Rx (TRX -> L1) message coding API. ''' # rxlev2dbm(0..63) gives us [-110..-47], plus -10 dbm for noise RSSI_MIN = -120 @@ -442,7 +442,7 @@ class DATAMSG_TRX2L1(DATAMSG): ''' Validate the message header fields (throws ValueError). ''' # Validate common fields - DATAMSG.validate(self) + Msg.validate(self) if self.rssi is None: raise ValueError("RSSI is not set") @@ -512,7 +512,7 @@ class DATAMSG_TRX2L1(DATAMSG): def rand_hdr(self): ''' Randomize message specific header. ''' - DATAMSG.rand_hdr(self) + Msg.rand_hdr(self) self.rssi = self.rand_rssi() self.toa256 = self.rand_toa256() @@ -531,7 +531,7 @@ class DATAMSG_TRX2L1(DATAMSG): ''' Generate human-readable header description. ''' # Describe the common part - result = DATAMSG.desc_hdr(self) + result = Msg.desc_hdr(self) if self.rssi is not None: result += ("rssi=%d " % self.rssi) @@ -679,11 +679,11 @@ class DATAMSG_TRX2L1(DATAMSG): self.burst = [random.randint(-127, 127) for _ in range(length)] - def gen_l12trx(self, ver = None): - ''' Transform this message to L12TRX message. ''' + def trans(self, ver = None): + ''' Transform this message to TxMsg. ''' # Allocate a new message - msg = DATAMSG_L12TRX(fn = self.fn, tn = self.tn, + msg = TxMsg(fn = self.fn, tn = self.tn, ver = self.ver if ver is None else ver) # Convert burst bits diff --git a/src/target/trx_toolkit/fake_trx.py b/src/target/trx_toolkit/fake_trx.py index 88417f7a..d519a9ae 100755 --- a/src/target/trx_toolkit/fake_trx.py +++ b/src/target/trx_toolkit/fake_trx.py @@ -213,7 +213,7 @@ class FakeTRX(Transceiver): msg.tsc_set = 0 msg.tsc = 0 - # Takes (partially initialized) TRX2L1 message, + # Takes (partially initialized) TRXD Rx message, # simulates RF path parameters (such as RSSI), # and sends towards the L1 def handle_data_msg(self, src_trx, src_msg, msg): diff --git a/src/target/trx_toolkit/test_data_dump.py b/src/target/trx_toolkit/test_data_dump.py index 9cd9b792..f7b4fde4 100644 --- a/src/target/trx_toolkit/test_data_dump.py +++ b/src/target/trx_toolkit/test_data_dump.py @@ -50,12 +50,12 @@ class DATADump_Test(unittest.TestCase): # Burst bits (if present) self.assertEqual(a.burst, b.burst) - # TRX2L1 specific fields - if isinstance(a, DATAMSG_L12TRX): + # TxMsg specific fields + if isinstance(a, TxMsg): self.assertEqual(a.pwr, b.pwr) - # L12TRX specific fields - if isinstance(a, DATAMSG_TRX2L1): + # RxMsg specific fields + if isinstance(a, RxMsg): # Version independent fields self.assertEqual(a.toa256, b.toa256) self.assertEqual(a.rssi, b.rssi) @@ -88,8 +88,8 @@ class DATADump_Test(unittest.TestCase): # Generate a mixed list of random messages def _gen_rand_message_mix(self, count, ver = 1): msg_list = [] - msg_list += self._gen_rand_messages(DATAMSG_TRX2L1, count) - msg_list += self._gen_rand_messages(DATAMSG_L12TRX, count) + msg_list += self._gen_rand_messages(RxMsg, count) + msg_list += self._gen_rand_messages(TxMsg, count) random.shuffle(msg_list) return msg_list @@ -100,15 +100,15 @@ class DATADump_Test(unittest.TestCase): msg = self._ddf.parse_msg(0) self._compare_msg(msg, msg_ref) - # Store one TRX2L1 message in a file, read it back and compare - def test_store_and_parse_trx2l1(self): - self._test_store_and_parse(DATAMSG_TRX2L1) + # Store one Rx message in a file, read it back and compare + def test_store_and_parse_rx_msg(self): + self._test_store_and_parse(RxMsg) - # Store one L12TRX message in a file, read it back and compare - def test_store_and_parse_l12trx(self): - self._test_store_and_parse(DATAMSG_L12TRX) + # Store one Tx message in a file, read it back and compare + def test_store_and_parse_tx_msg(self): + self._test_store_and_parse(TxMsg) - # Store multiple TRX2L1/L12TRX messages in a file, read them back and compare + # Store multiple Rx/Tx messages in a file, read them back and compare def test_store_and_parse_all(self): # Store a mixed list of random messages (19 + 19) msg_list_ref = self._gen_rand_message_mix(19) @@ -143,7 +143,7 @@ class DATADump_Test(unittest.TestCase): def test_parse_len_overflow(self): # Write a malformed message directly - self._tf.write(DATADump.TAG_L12TRX) + self._tf.write(DATADump.TAG_TxMsg) self._tf.write(b'\x00\x63') # 99 self._tf.write(b'\xff' * 90) diff --git a/src/target/trx_toolkit/test_data_msg.py b/src/target/trx_toolkit/test_data_msg.py index 991dd25a..24fda673 100644 --- a/src/target/trx_toolkit/test_data_msg.py +++ b/src/target/trx_toolkit/test_data_msg.py @@ -24,9 +24,9 @@ import unittest -from data_msg import DATAMSG, DATAMSG_L12TRX, DATAMSG_TRX2L1 +from data_msg import Msg, TxMsg, RxMsg -class DATAMSG_Test(unittest.TestCase): +class Msg_Test(unittest.TestCase): # Compare message a with message b def _compare_msg(self, a, b): # Make sure we're comparing messages of the same type @@ -40,12 +40,12 @@ class DATAMSG_Test(unittest.TestCase): # Burst bits (if present) self.assertEqual(a.burst, b.burst) - # TRX2L1 specific fields - if isinstance(a, DATAMSG_L12TRX): + # TxMsg specific fields + if isinstance(a, TxMsg): self.assertEqual(a.pwr, b.pwr) - # L12TRX specific fields - if isinstance(a, DATAMSG_TRX2L1): + # RxMsg specific fields + if isinstance(a, RxMsg): # Version independent fields self.assertEqual(a.toa256, b.toa256) self.assertEqual(a.rssi, b.rssi) @@ -62,38 +62,38 @@ class DATAMSG_Test(unittest.TestCase): def test_validate(self): # Unknown version with self.assertRaises(ValueError): - msg = DATAMSG_TRX2L1(fn = 0, tn = 0, ver = 100) + msg = RxMsg(fn = 0, tn = 0, ver = 100) msg.validate() # Uninitialized field with self.assertRaises(ValueError): - msg = DATAMSG_TRX2L1() + msg = RxMsg() msg.validate() with self.assertRaises(ValueError): - msg = DATAMSG_TRX2L1(fn = None, tn = 0) + msg = RxMsg(fn = None, tn = 0) msg.validate() # Out-of-range value(s) with self.assertRaises(ValueError): - msg = DATAMSG_TRX2L1(fn = -1, tn = 0) + msg = RxMsg(fn = -1, tn = 0) msg.validate() with self.assertRaises(ValueError): - msg = DATAMSG_TRX2L1(fn = 0, tn = 10) + msg = RxMsg(fn = 0, tn = 10) msg.validate() # Validate header and burst randomization def test_rand_hdr_burst(self): - msg_l12trx = DATAMSG_L12TRX() - msg_trx2l1 = DATAMSG_TRX2L1() + tx_msg = TxMsg() + rx_msg = RxMsg() for i in range(100): - msg_l12trx.rand_burst() - msg_trx2l1.rand_burst() - msg_l12trx.rand_hdr() - msg_trx2l1.rand_hdr() + tx_msg.rand_burst() + rx_msg.rand_burst() + tx_msg.rand_hdr() + rx_msg.rand_hdr() - msg_l12trx.validate() - msg_trx2l1.validate() + tx_msg.validate() + rx_msg.validate() def _test_enc_dec(self, msg, legacy = False, nope_ind = False): # Prepare a given message (randomize) @@ -120,22 +120,22 @@ class DATAMSG_Test(unittest.TestCase): # Validate encoding and decoding def test_enc_dec(self): - for ver in DATAMSG.KNOWN_VERSIONS: - with self.subTest("L1 -> TRX message", ver = ver): - msg = DATAMSG_L12TRX(ver = ver) + for ver in Msg.KNOWN_VERSIONS: + with self.subTest("TxMsg", ver = ver): + msg = TxMsg(ver = ver) self._test_enc_dec(msg) - with self.subTest("TRX -> L1 message", ver = ver): - msg = DATAMSG_TRX2L1(ver = ver) + with self.subTest("RxMsg", ver = ver): + msg = RxMsg(ver = ver) self._test_enc_dec(msg) if ver >= 1: - with self.subTest("TRX -> L1 NOPE.ind", ver = ver): - msg = DATAMSG_TRX2L1(ver = ver) + with self.subTest("RxMsg NOPE.ind", ver = ver): + msg = RxMsg(ver = ver) self._test_enc_dec(msg, nope_ind = True) - with self.subTest("TRX -> L1 message (legacy)"): - msg = DATAMSG_TRX2L1(ver = 0) + with self.subTest("RxMsg (legacy transceiver)"): + msg = RxMsg(ver = 0) self._test_enc_dec(msg, legacy = True) # Validate bit conversations @@ -144,16 +144,16 @@ class DATAMSG_Test(unittest.TestCase): sbits_ref = list(range(-127, 128)) # Test both usbit2sbit() and sbit2usbit() - sbits = DATAMSG.usbit2sbit(usbits_ref) - usbits = DATAMSG.sbit2usbit(sbits) + sbits = Msg.usbit2sbit(usbits_ref) + usbits = Msg.sbit2usbit(sbits) self.assertEqual(usbits[:255], usbits_ref[:255]) self.assertEqual(usbits[255], 254) # Test both sbit2ubit() and ubit2sbit() - ubits = DATAMSG.sbit2ubit(sbits_ref) + ubits = Msg.sbit2ubit(sbits_ref) self.assertEqual(ubits, ([1] * 127 + [0] * 128)) - sbits = DATAMSG.ubit2sbit(ubits) + sbits = Msg.ubit2sbit(ubits) self.assertEqual(sbits, ([-127] * 127 + [127] * 128)) def _test_transform(self, msg): @@ -162,31 +162,31 @@ class DATAMSG_Test(unittest.TestCase): msg.rand_burst() # Perform message transformation - if isinstance(msg, DATAMSG_L12TRX): - msg_trans = msg.gen_trx2l1() + if isinstance(msg, TxMsg): + msg_trans = msg.trans() else: - msg_trans = msg.gen_l12trx() + msg_trans = msg.trans() self.assertEqual(msg_trans.ver, msg.ver) self.assertEqual(msg_trans.fn, msg.fn) self.assertEqual(msg_trans.tn, msg.tn) - if isinstance(msg, DATAMSG_TRX2L1): - burst = DATAMSG.sbit2ubit(msg.burst) + if isinstance(msg, RxMsg): + burst = Msg.sbit2ubit(msg.burst) self.assertEqual(msg_trans.burst, burst) else: - burst = DATAMSG.ubit2sbit(msg.burst) + burst = Msg.ubit2sbit(msg.burst) self.assertEqual(msg_trans.burst, burst) # Validate message transformation def test_transform(self): - for ver in DATAMSG.KNOWN_VERSIONS: - with self.subTest("L1 -> TRX message", ver = ver): - msg = DATAMSG_L12TRX(ver = ver) + for ver in Msg.KNOWN_VERSIONS: + with self.subTest("TxMsg", ver = ver): + msg = TxMsg(ver = ver) self._test_transform(msg) - with self.subTest("TRX -> L1 message", ver = ver): - msg = DATAMSG_TRX2L1(ver = ver) + with self.subTest("RxMsg", ver = ver): + msg = RxMsg(ver = ver) self._test_transform(msg) if __name__ == '__main__': diff --git a/src/target/trx_toolkit/transceiver.py b/src/target/trx_toolkit/transceiver.py index 19b998e9..d0410707 100644 --- a/src/target/trx_toolkit/transceiver.py +++ b/src/target/trx_toolkit/transceiver.py @@ -258,7 +258,7 @@ class Transceiver: def recv_data_msg(self): # Read and parse data from socket - msg = self.data_if.recv_l12trx_msg() + msg = self.data_if.recv_tx_msg() if not msg: return None diff --git a/src/target/trx_toolkit/trx_sniff.py b/src/target/trx_toolkit/trx_sniff.py index 0cb62d36..8b6f80c7 100755 --- a/src/target/trx_toolkit/trx_sniff.py +++ b/src/target/trx_toolkit/trx_sniff.py @@ -101,10 +101,10 @@ class Application(ApplicationBase): msg_raw = bytearray(trx.load) # Determine a burst direction (L1 <-> TRX) - l12trx = udp.sport > udp.dport + tx_dir = udp.sport > udp.dport # Create an empty DATA message - msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1() + msg = TxMsg() if tx_dir else RxMsg() # Attempt to parse the payload as a DATA message try: @@ -124,8 +124,7 @@ class Application(ApplicationBase): return # Debug print - log.debug("%s burst: %s" \ - % ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr())) + log.debug("%s burst: %s", "L1 -> TRX" if tx_dir else "TRX -> L1", msg.desc_hdr()) # Poke message handler self.msg_handle(msg) @@ -139,10 +138,10 @@ class Application(ApplicationBase): # Direction filter if self.argv.direction is not None: if self.argv.direction == "TRX": # L1 -> TRX - if not isinstance(msg, DATAMSG_L12TRX): + if not isinstance(msg, TxMsg): return False elif self.argv.direction == "L1": # TRX -> L1 - if not isinstance(msg, DATAMSG_TRX2L1): + if not isinstance(msg, RxMsg): return False # Timeslot filter @@ -159,7 +158,7 @@ class Application(ApplicationBase): return False # Message type specific filtering - if isinstance(msg, DATAMSG_TRX2L1): + if isinstance(msg, RxMsg): # NOPE.ind filter if not self.argv.pf_nope_ind and msg.nope_ind: return False -- cgit v1.2.3