From d273ebf611bfafa6234961b13979f21d0223caec Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Sat, 27 Jan 2018 21:47:31 +0700 Subject: fake_trx: use DATAMSG classes for DATA messages The DATAMSG API, that was introduced and extended a few commits before, provides all required methods to create, validate, generate and parse DATA messages. Let's use it now. Change-Id: Ibc99126dc05d873c1ba538a5f4e74866de563f56 --- src/target/fake_trx/burst_gen.py | 64 ++++++++++++++++++----------- src/target/fake_trx/burst_send.py | 65 ++++++++++++++++++++---------- src/target/fake_trx/data_if.py | 85 +++++---------------------------------- src/target/fake_trx/trx_sniff.py | 61 +++++++++++----------------- 4 files changed, 116 insertions(+), 159 deletions(-) diff --git a/src/target/fake_trx/burst_gen.py b/src/target/fake_trx/burst_gen.py index 04b7f477..ced2de3d 100755 --- a/src/target/fake_trx/burst_gen.py +++ b/src/target/fake_trx/burst_gen.py @@ -4,7 +4,7 @@ # Auxiliary tool to generate and send random bursts via TRX DATA # interface, which may be useful for fuzzing and testing # -# (C) 2017 by Vadim Yanitskiy +# (C) 2017-2018 by Vadim Yanitskiy # # All Rights Reserved # @@ -22,7 +22,6 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import random import signal import getopt import sys @@ -30,6 +29,7 @@ import sys from rand_burst_gen import RandBurstGen from data_if import DATAInterface from gsm_shared import * +from data_msg import * COPYRIGHT = \ "Copyright (C) 2017 by Vadim Yanitskiy \n" \ @@ -70,38 +70,56 @@ class Application: # Init random burst generator burst_gen = RandBurstGen() + # Init an empty DATA message + if self.conn_mode == "TRX": + msg = DATAMSG_L12TRX() + elif self.conn_mode == "L1": + msg = DATAMSG_TRX2L1() + # Generate a random frame number or use provided one - if self.fn is None: - fn = random.randint(0, GSM_HYPERFRAME) - else: - fn = self.fn + fn_init = msg.rand_fn() if self.fn is None else self.fn # Send as much bursts as required for i in range(self.burst_count): + # Randomize the message header + msg.rand_hdr() + + # Increase and set frame number + msg.fn = (fn_init + i) % GSM_HYPERFRAME + + # Set timeslot number + if self.tn is not None: + msg.tn = self.tn + + # Set transmit power level + if self.pwr is not None: + msg.pwr = self.pwr + + # TODO: also set TRX2L1 specific fields + # Generate a random burst if self.burst_type == "NB": - buf = burst_gen.gen_nb() + burst = burst_gen.gen_nb() elif self.burst_type == "FB": - buf = burst_gen.gen_fb() + burst = burst_gen.gen_fb() elif self.burst_type == "SB": - buf = burst_gen.gen_sb() + burst = burst_gen.gen_sb() elif self.burst_type == "AB": - buf = burst_gen.gen_ab() + burst = burst_gen.gen_ab() + + # Convert to soft-bits in case of TRX -> L1 message + if self.conn_mode == "L1": + burst = msg.ubit2sbit(burst) - print("[i] Sending %d/%d %s burst (fn=%u) to %s..." + # Set burst + msg.burst = burst + + print("[i] Sending %d/%d %s burst %s to %s..." % (i + 1, self.burst_count, self.burst_type, - fn, self.conn_mode)) - - # Send to TRX or L1 - if self.conn_mode == "TRX": - self.data_if.send_trx_msg(buf, - self.tn, fn, self.pwr) - elif self.conn_mode == "L1": - self.data_if.send_l1_msg(buf, - self.tn, fn, self.pwr) - - # Increase frame number (for count > 1) - fn = (fn + 1) % GSM_HYPERFRAME + msg.desc_hdr(), self.conn_mode)) + + # Send message + self.data_if.send_msg(msg) self.shutdown() diff --git a/src/target/fake_trx/burst_send.py b/src/target/fake_trx/burst_send.py index ee8e51ab..4dbe9846 100755 --- a/src/target/fake_trx/burst_send.py +++ b/src/target/fake_trx/burst_send.py @@ -3,7 +3,7 @@ # Auxiliary tool to send existing bursts via TRX DATA interface # -# (C) 2017 by Vadim Yanitskiy +# (C) 2017-2018 by Vadim Yanitskiy # # All Rights Reserved # @@ -21,13 +21,13 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import random import signal import getopt import sys from data_if import DATAInterface from gsm_shared import * +from data_msg import * COPYRIGHT = \ "Copyright (C) 2017 by Vadim Yanitskiy \n" \ @@ -74,40 +74,61 @@ class Application: print("[i] Reading bursts from stdin...") src = sys.stdin + # Init an empty DATA message + if self.conn_mode == "TRX": + msg = DATAMSG_L12TRX() + elif self.conn_mode == "L1": + msg = DATAMSG_TRX2L1() + # Generate a random frame number or use provided one - if self.fn is None: - fn = random.randint(0, GSM_HYPERFRAME) - else: - fn = self.fn + fn = msg.rand_fn() if self.fn is None else self.fn # Read the burst source line-by-line for line in src: # Strip spaces - burst = line.strip() - buf = [] + burst_str = line.strip() + burst = [] # Check length - if len(burst) != 148: + if len(burst_str) != 148: print("[!] Dropping burst due to length != 148") continue - print("[i] Sending a burst (fn=%u) to %s..." - % (fn, self.conn_mode)) + # Randomize the message header + msg.rand_hdr() + + # Set frame number + msg.fn = fn + + # Set timeslot number + if self.tn is not None: + msg.tn = self.tn + + # Set transmit power level + if self.pwr is not None: + msg.pwr = self.pwr + + # TODO: also set TRX2L1 specific fields # Parse a string - for bit in burst: + for bit in burst_str: if bit == "1": - buf.append(1) + burst.append(1) else: - buf.append(0) - - # Send to TRX or L1 - if self.conn_mode == "TRX": - self.data_if.send_trx_msg(buf, - self.tn, fn, self.pwr) - elif self.conn_mode == "L1": - self.data_if.send_l1_msg(buf, - self.tn, fn, self.pwr) + burst.append(0) + + # Convert to soft-bits in case of TRX -> L1 message + if self.conn_mode == "L1": + burst = msg.ubit2sbit(burst) + + # Set burst + msg.burst = burst + + print("[i] Sending a burst %s to %s..." + % (msg.desc_hdr(), self.conn_mode)) + + # Send message + self.data_if.send_msg(msg) # Increase frame number (for count > 1) fn = (fn + 1) % GSM_HYPERFRAME diff --git a/src/target/fake_trx/data_if.py b/src/target/fake_trx/data_if.py index 0f373ab5..8b0cd8e5 100644 --- a/src/target/fake_trx/data_if.py +++ b/src/target/fake_trx/data_if.py @@ -4,7 +4,7 @@ # Virtual Um-interface (fake transceiver) # DATA interface implementation # -# (C) 2017 by Vadim Yanitskiy +# (C) 2017-2018 by Vadim Yanitskiy # # All Rights Reserved # @@ -22,85 +22,18 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import random - from udp_link import UDPLink -from gsm_shared import * +from data_msg import * class DATAInterface(UDPLink): - def send_l1_msg(self, burst, - tn = None, fn = None, rssi = None): - # Generate random timeslot index if not preset - if tn is None: - tn = random.randint(0, 7) - - # Generate random frame number if not preset - if fn is None: - fn = random.randint(0, GSM_HYPERFRAME) - - # Generate random RSSI if not preset - if rssi is None: - rssi = -random.randint(-75, -50) - - # Prepare a buffer for header and burst - buf = [] - - # Put timeslot index - buf.append(tn) - - # Put frame number - buf.append((fn >> 24) & 0xff) - buf.append((fn >> 16) & 0xff) - buf.append((fn >> 8) & 0xff) - buf.append((fn >> 0) & 0xff) - - # Put RSSI - buf.append(rssi) - - # HACK: put fake TOA value - buf += [0x00] * 2 - - # Put burst - buf += burst - - # Put two unused bytes - buf += [0x00] * 2 - - # Send message - self.send(bytearray(buf)) - - def send_trx_msg(self, burst, - tn = None, fn = None, pwr = None): - # Generate random timeslot index if not preset - if tn is None: - tn = random.randint(0, 7) - - # Generate random frame number if not preset - if fn is None: - fn = random.randint(0, GSM_HYPERFRAME) - - # Generate random power level if not preset - if pwr is None: - pwr = random.randint(0, 34) - - # Prepare a buffer for header and burst - buf = [] - - # Put timeslot index - buf.append(tn) - - # Put frame number - buf.append((fn >> 24) & 0xff) - buf.append((fn >> 16) & 0xff) - buf.append((fn >> 8) & 0xff) - buf.append((fn >> 0) & 0xff) - - # Put transmit power level - buf.append(pwr) + def send_msg(self, msg): + # Validate a message + if not msg.validate(): + raise ValueError("Message incomplete or incorrect") - # Put burst - buf += burst + # Generate TRX message + payload = msg.gen_msg() # Send message - self.send(bytearray(buf)) + self.send(payload) diff --git a/src/target/fake_trx/trx_sniff.py b/src/target/fake_trx/trx_sniff.py index cf3ab9ce..91c2c4ad 100755 --- a/src/target/fake_trx/trx_sniff.py +++ b/src/target/fake_trx/trx_sniff.py @@ -27,6 +27,8 @@ import sys import scapy.all +from data_msg import * + COPYRIGHT = \ "Copyright (C) 2018 by Vadim Yanitskiy \n" \ "License GPLv2+: GNU GPL version 2 or later " \ @@ -98,32 +100,39 @@ class Application: trx = udp.payload # Convert to bytearray - trx = bytearray(str(trx)) - - # Parse GSM TDMA specific data - fn = (trx[1] << 24) | (trx[2] << 16) | (trx[3] << 8) | trx[4] - tn = trx[0] + msg_raw = bytearray(str(trx)) # Determine a burst direction (L1 <-> TRX) l12trx = udp.sport > udp.dport + # Create an empty DATA message + msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1() + + # Attempt to parse the payload as a DATA message + try: + msg.parse_msg(msg_raw) + except: + print("[!] Failed to parse message, dropping...") + self.cnt_burst_dropped_num += 1 + return + # Poke burst pass filter - rc = self.burst_pass_filter(l12trx, fn, tn) + rc = self.burst_pass_filter(l12trx, msg.fn, msg.tn) if rc is False: self.cnt_burst_dropped_num += 1 return # Debug print - print("[i] %s burst: fn=%u, tn=%d" \ - % ("L1 -> TRX" if l12trx else "TRX -> L1", fn, tn)) + print("[i] %s burst: %s" \ + % ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr())) # Poke burst handler - rc = self.burst_handle(trx, l12trx, fn, tn) + rc = self.burst_handle(l12trx, msg_raw, msg) if rc is False: self.shutdown() # Poke burst counter - rc = self.burst_count(fn, tn) + rc = self.burst_count(msg.fn, msg.tn) if rc is True: self.shutdown() @@ -149,47 +158,23 @@ class Application: # Burst passed ;) return True - def burst_handle(self, trx_burst, l12trx, fn, tn): + def burst_handle(self, l12trx, msg_raw, msg): if self.print_bursts: - self.burst_dump_bits(sys.stdout, trx_burst, l12trx) - sys.stdout.flush() + print(msg.burst) if self.output_file is not None: # TLV: tag defines burst direction (one byte, BE) self.output_file.write('\x01' if l12trx else '\x02') # TLV: length of value (one byte, BE) - length = len(trx_burst) + length = len(msg_raw) self.output_file.write(chr(length)) # TLV: raw value - self.output_file.write(trx_burst) + self.output_file.write(msg_raw) return True - def burst_dump_bits(self, dst, trx_burst, l12trx): - # Split out burst header - if l12trx: - burst = trx_burst[6:] - else: - burst = trx_burst[8:] - - # Print normal bits: 0 or 1 - for i in range(0, 148): - # Convert bits to chars - if l12trx: - # Convert bits as is - bit = '1' if burst[i] else '0' - else: - # Convert trx bits {254..0} to sbits {-127..127} - bit = -127 if burst[i] == 255 else 127 - burst[i] - # Convert sbits {-127..127} to ubits {0..1} - bit = '1' if bit < 0 else '0' - - # Write a normal bit - dst.write(bit) - dst.write("\n") - def burst_count(self, fn, tn): # Update frame counter if self.cnt_frame_last is None: -- cgit v1.2.3