From f35413691db5a78640a05d22586a03a7badcea62 Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Tue, 27 Feb 2018 07:00:45 +0700 Subject: fake_trx/burst_fwd.py: use DATAMSG transformation API As the DATAMSG classes were introduced, let's use them. This approach abstracts one from dealing with raw bytes. Also, now BurstForwarder randomizes both RSSI and ToA values, as this feature is supported from-the-box by the DATAMSG_TRX2L1. Change-Id: Ib15018eab749150e244914dab4b6e433ce0c9209 --- src/target/fake_trx/burst_fwd.py | 83 +++++++++++++++++++++++++++++++--------- 1 file changed, 65 insertions(+), 18 deletions(-) diff --git a/src/target/fake_trx/burst_fwd.py b/src/target/fake_trx/burst_fwd.py index 5989ce9c..663f4996 100644 --- a/src/target/fake_trx/burst_fwd.py +++ b/src/target/fake_trx/burst_fwd.py @@ -22,6 +22,10 @@ # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +import random + +from data_msg import * + class BurstForwarder: # Timeslot filter (drop everything by default) ts_pass = None @@ -30,40 +34,60 @@ class BurstForwarder: bts_freq = None bb_freq = None + # Constants + # TODO: add options to change this + RSSI_RAND_TRESHOLD = 10 + RSSI_RAND_MIN = -90 + RSSI_RAND_MAX = -60 + + # TODO: add options to change this + TOA_RAND_TRESHOLD = 0.3 + TOA_RAND_BASE = 0.00 + def __init__(self, bts_link, bb_link): self.bts_link = bts_link self.bb_link = bb_link + # Generate a random RSSI range + rssi = random.randint(self.RSSI_RAND_MIN, self.RSSI_RAND_MAX) + self.rssi_min = rssi - self.RSSI_RAND_TRESHOLD + self.rssi_max = rssi + self.RSSI_RAND_TRESHOLD + + # Generate a random ToA range + self.toa_min = self.TOA_RAND_BASE - self.TOA_RAND_TRESHOLD + self.toa_max = self.TOA_RAND_BASE + self.TOA_RAND_TRESHOLD + def set_slot(self, ts): if ts > 0 and ts < 8: self.ts_pass = ts else: raise ValueError("Incorrect index for timeslot filter") - def process_payload(self, data): - payload = bytearray(data) - length = len(payload) - - # HACK: set fake RSSI value (-53) - payload[5] = 0x35 - - # HACK: add fake TOA value (6th and 7th bytes) - payload[6:2] = [0x00, 0x00] + # Converts a L12TRX message to TRX2L1 message + def transform_msg(self, msg_raw): + # Attempt to parse a message + try: + msg_l12trx = DATAMSG_L12TRX() + msg_l12trx.parse_msg(bytearray(msg_raw)) + except: + print("[!] Dropping unhandled DL message...") + return None - # Convert ubits to {255..0} - for i in range(8, length): - payload[i] = 255 if payload[i] else 0 + # Compose a new message for L1 + msg_trx2l1 = msg_l12trx.gen_trx2l1() - # WTF: append two unused bytes at the end - payload[length:2] = [0x00, 0x00] + # Randomize both RSSI and ToA values + msg_trx2l1.rssi = msg_trx2l1.rand_rssi( + min = self.rssi_min, max = self.rssi_max) + msg_trx2l1.toa = msg_trx2l1.rand_toa( + min = self.toa_min, max = self.toa_max) - return payload + return msg_trx2l1 # Downlink handler: BTS -> BB def bts2bb(self): # Read data from socket data, addr = self.bts_link.sock.recvfrom(512) - payload = self.process_payload(data) # BB is not connected / tuned if self.bb_freq is None: @@ -73,10 +97,22 @@ class BurstForwarder: if self.bb_freq != self.bts_freq: return None + # Process a message + msg = self.transform_msg(data) + if msg is None: + return None + # Timeslot filter - if payload[0] != self.ts_pass: + if msg.tn != self.ts_pass: return None + # Validate and generate the payload + payload = msg.gen_msg() + + # Append two unused bytes at the end + # in order to keep the compatibility + payload += bytearray(2) + # Send burst to BB self.bb_link.send(payload) @@ -84,7 +120,6 @@ class BurstForwarder: def bb2bts(self): # Read data from socket data, addr = self.bb_link.sock.recvfrom(512) - payload = self.process_payload(data) # BTS is not connected / tuned if self.bts_freq is None: @@ -94,5 +129,17 @@ class BurstForwarder: if self.bb_freq != self.bts_freq: return None + # Process a message + msg = self.transform_msg(data) + if msg is None: + return None + + # Validate and generate the payload + payload = msg.gen_msg() + + # Append two unused bytes at the end + # in order to keep the compatibility + payload += bytearray(2) + # Send burst to BTS self.bts_link.send(payload) -- cgit v1.2.3