summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVadim Yanitskiy <axilirator@gmail.com>2018-01-27 21:47:31 +0700
committerVadim Yanitskiy <axilirator@gmail.com>2018-01-29 04:19:01 +0700
commitd273ebf611bfafa6234961b13979f21d0223caec (patch)
tree3b154a2d9ad2fe4455b17954ebc9b6315662b3fb
parent263ccef268c45a122d9d35b217babd772c65ccb2 (diff)
fake_trx: use DATAMSG classes for DATA messages
The DATAMSG API, that was introduced and extended a few commits before, provides all required methods to create, validate, generate and parse DATA messages. Let's use it now. Change-Id: Ibc99126dc05d873c1ba538a5f4e74866de563f56
-rwxr-xr-xsrc/target/fake_trx/burst_gen.py64
-rwxr-xr-xsrc/target/fake_trx/burst_send.py65
-rw-r--r--src/target/fake_trx/data_if.py85
-rwxr-xr-xsrc/target/fake_trx/trx_sniff.py61
4 files changed, 116 insertions, 159 deletions
diff --git a/src/target/fake_trx/burst_gen.py b/src/target/fake_trx/burst_gen.py
index 04b7f477..ced2de3d 100755
--- a/src/target/fake_trx/burst_gen.py
+++ b/src/target/fake_trx/burst_gen.py
@@ -4,7 +4,7 @@
# Auxiliary tool to generate and send random bursts via TRX DATA
# interface, which may be useful for fuzzing and testing
#
-# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@@ -22,7 +22,6 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import random
import signal
import getopt
import sys
@@ -30,6 +29,7 @@ import sys
from rand_burst_gen import RandBurstGen
from data_if import DATAInterface
from gsm_shared import *
+from data_msg import *
COPYRIGHT = \
"Copyright (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>\n" \
@@ -70,38 +70,56 @@ class Application:
# Init random burst generator
burst_gen = RandBurstGen()
+ # Init an empty DATA message
+ if self.conn_mode == "TRX":
+ msg = DATAMSG_L12TRX()
+ elif self.conn_mode == "L1":
+ msg = DATAMSG_TRX2L1()
+
# Generate a random frame number or use provided one
- if self.fn is None:
- fn = random.randint(0, GSM_HYPERFRAME)
- else:
- fn = self.fn
+ fn_init = msg.rand_fn() if self.fn is None else self.fn
# Send as much bursts as required
for i in range(self.burst_count):
+ # Randomize the message header
+ msg.rand_hdr()
+
+ # Increase and set frame number
+ msg.fn = (fn_init + i) % GSM_HYPERFRAME
+
+ # Set timeslot number
+ if self.tn is not None:
+ msg.tn = self.tn
+
+ # Set transmit power level
+ if self.pwr is not None:
+ msg.pwr = self.pwr
+
+ # TODO: also set TRX2L1 specific fields
+
# Generate a random burst
if self.burst_type == "NB":
- buf = burst_gen.gen_nb()
+ burst = burst_gen.gen_nb()
elif self.burst_type == "FB":
- buf = burst_gen.gen_fb()
+ burst = burst_gen.gen_fb()
elif self.burst_type == "SB":
- buf = burst_gen.gen_sb()
+ burst = burst_gen.gen_sb()
elif self.burst_type == "AB":
- buf = burst_gen.gen_ab()
+ burst = burst_gen.gen_ab()
+
+ # Convert to soft-bits in case of TRX -> L1 message
+ if self.conn_mode == "L1":
+ burst = msg.ubit2sbit(burst)
- print("[i] Sending %d/%d %s burst (fn=%u) to %s..."
+ # Set burst
+ msg.burst = burst
+
+ print("[i] Sending %d/%d %s burst %s to %s..."
% (i + 1, self.burst_count, self.burst_type,
- fn, self.conn_mode))
-
- # Send to TRX or L1
- if self.conn_mode == "TRX":
- self.data_if.send_trx_msg(buf,
- self.tn, fn, self.pwr)
- elif self.conn_mode == "L1":
- self.data_if.send_l1_msg(buf,
- self.tn, fn, self.pwr)
-
- # Increase frame number (for count > 1)
- fn = (fn + 1) % GSM_HYPERFRAME
+ msg.desc_hdr(), self.conn_mode))
+
+ # Send message
+ self.data_if.send_msg(msg)
self.shutdown()
diff --git a/src/target/fake_trx/burst_send.py b/src/target/fake_trx/burst_send.py
index ee8e51ab..4dbe9846 100755
--- a/src/target/fake_trx/burst_send.py
+++ b/src/target/fake_trx/burst_send.py
@@ -3,7 +3,7 @@
# Auxiliary tool to send existing bursts via TRX DATA interface
#
-# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@@ -21,13 +21,13 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import random
import signal
import getopt
import sys
from data_if import DATAInterface
from gsm_shared import *
+from data_msg import *
COPYRIGHT = \
"Copyright (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>\n" \
@@ -74,40 +74,61 @@ class Application:
print("[i] Reading bursts from stdin...")
src = sys.stdin
+ # Init an empty DATA message
+ if self.conn_mode == "TRX":
+ msg = DATAMSG_L12TRX()
+ elif self.conn_mode == "L1":
+ msg = DATAMSG_TRX2L1()
+
# Generate a random frame number or use provided one
- if self.fn is None:
- fn = random.randint(0, GSM_HYPERFRAME)
- else:
- fn = self.fn
+ fn = msg.rand_fn() if self.fn is None else self.fn
# Read the burst source line-by-line
for line in src:
# Strip spaces
- burst = line.strip()
- buf = []
+ burst_str = line.strip()
+ burst = []
# Check length
- if len(burst) != 148:
+ if len(burst_str) != 148:
print("[!] Dropping burst due to length != 148")
continue
- print("[i] Sending a burst (fn=%u) to %s..."
- % (fn, self.conn_mode))
+ # Randomize the message header
+ msg.rand_hdr()
+
+ # Set frame number
+ msg.fn = fn
+
+ # Set timeslot number
+ if self.tn is not None:
+ msg.tn = self.tn
+
+ # Set transmit power level
+ if self.pwr is not None:
+ msg.pwr = self.pwr
+
+ # TODO: also set TRX2L1 specific fields
# Parse a string
- for bit in burst:
+ for bit in burst_str:
if bit == "1":
- buf.append(1)
+ burst.append(1)
else:
- buf.append(0)
-
- # Send to TRX or L1
- if self.conn_mode == "TRX":
- self.data_if.send_trx_msg(buf,
- self.tn, fn, self.pwr)
- elif self.conn_mode == "L1":
- self.data_if.send_l1_msg(buf,
- self.tn, fn, self.pwr)
+ burst.append(0)
+
+ # Convert to soft-bits in case of TRX -> L1 message
+ if self.conn_mode == "L1":
+ burst = msg.ubit2sbit(burst)
+
+ # Set burst
+ msg.burst = burst
+
+ print("[i] Sending a burst %s to %s..."
+ % (msg.desc_hdr(), self.conn_mode))
+
+ # Send message
+ self.data_if.send_msg(msg)
# Increase frame number (for count > 1)
fn = (fn + 1) % GSM_HYPERFRAME
diff --git a/src/target/fake_trx/data_if.py b/src/target/fake_trx/data_if.py
index 0f373ab5..8b0cd8e5 100644
--- a/src/target/fake_trx/data_if.py
+++ b/src/target/fake_trx/data_if.py
@@ -4,7 +4,7 @@
# Virtual Um-interface (fake transceiver)
# DATA interface implementation
#
-# (C) 2017 by Vadim Yanitskiy <axilirator@gmail.com>
+# (C) 2017-2018 by Vadim Yanitskiy <axilirator@gmail.com>
#
# All Rights Reserved
#
@@ -22,85 +22,18 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
-import random
-
from udp_link import UDPLink
-from gsm_shared import *
+from data_msg import *
class DATAInterface(UDPLink):
- def send_l1_msg(self, burst,
- tn = None, fn = None, rssi = None):
- # Generate random timeslot index if not preset
- if tn is None:
- tn = random.randint(0, 7)
-
- # Generate random frame number if not preset
- if fn is None:
- fn = random.randint(0, GSM_HYPERFRAME)
-
- # Generate random RSSI if not preset
- if rssi is None:
- rssi = -random.randint(-75, -50)
-
- # Prepare a buffer for header and burst
- buf = []
-
- # Put timeslot index
- buf.append(tn)
-
- # Put frame number
- buf.append((fn >> 24) & 0xff)
- buf.append((fn >> 16) & 0xff)
- buf.append((fn >> 8) & 0xff)
- buf.append((fn >> 0) & 0xff)
-
- # Put RSSI
- buf.append(rssi)
-
- # HACK: put fake TOA value
- buf += [0x00] * 2
-
- # Put burst
- buf += burst
-
- # Put two unused bytes
- buf += [0x00] * 2
-
- # Send message
- self.send(bytearray(buf))
-
- def send_trx_msg(self, burst,
- tn = None, fn = None, pwr = None):
- # Generate random timeslot index if not preset
- if tn is None:
- tn = random.randint(0, 7)
-
- # Generate random frame number if not preset
- if fn is None:
- fn = random.randint(0, GSM_HYPERFRAME)
-
- # Generate random power level if not preset
- if pwr is None:
- pwr = random.randint(0, 34)
-
- # Prepare a buffer for header and burst
- buf = []
-
- # Put timeslot index
- buf.append(tn)
-
- # Put frame number
- buf.append((fn >> 24) & 0xff)
- buf.append((fn >> 16) & 0xff)
- buf.append((fn >> 8) & 0xff)
- buf.append((fn >> 0) & 0xff)
-
- # Put transmit power level
- buf.append(pwr)
+ def send_msg(self, msg):
+ # Validate a message
+ if not msg.validate():
+ raise ValueError("Message incomplete or incorrect")
- # Put burst
- buf += burst
+ # Generate TRX message
+ payload = msg.gen_msg()
# Send message
- self.send(bytearray(buf))
+ self.send(payload)
diff --git a/src/target/fake_trx/trx_sniff.py b/src/target/fake_trx/trx_sniff.py
index cf3ab9ce..91c2c4ad 100755
--- a/src/target/fake_trx/trx_sniff.py
+++ b/src/target/fake_trx/trx_sniff.py
@@ -27,6 +27,8 @@ import sys
import scapy.all
+from data_msg import *
+
COPYRIGHT = \
"Copyright (C) 2018 by Vadim Yanitskiy <axilirator@gmail.com>\n" \
"License GPLv2+: GNU GPL version 2 or later " \
@@ -98,32 +100,39 @@ class Application:
trx = udp.payload
# Convert to bytearray
- trx = bytearray(str(trx))
-
- # Parse GSM TDMA specific data
- fn = (trx[1] << 24) | (trx[2] << 16) | (trx[3] << 8) | trx[4]
- tn = trx[0]
+ msg_raw = bytearray(str(trx))
# Determine a burst direction (L1 <-> TRX)
l12trx = udp.sport > udp.dport
+ # Create an empty DATA message
+ msg = DATAMSG_L12TRX() if l12trx else DATAMSG_TRX2L1()
+
+ # Attempt to parse the payload as a DATA message
+ try:
+ msg.parse_msg(msg_raw)
+ except:
+ print("[!] Failed to parse message, dropping...")
+ self.cnt_burst_dropped_num += 1
+ return
+
# Poke burst pass filter
- rc = self.burst_pass_filter(l12trx, fn, tn)
+ rc = self.burst_pass_filter(l12trx, msg.fn, msg.tn)
if rc is False:
self.cnt_burst_dropped_num += 1
return
# Debug print
- print("[i] %s burst: fn=%u, tn=%d" \
- % ("L1 -> TRX" if l12trx else "TRX -> L1", fn, tn))
+ print("[i] %s burst: %s" \
+ % ("L1 -> TRX" if l12trx else "TRX -> L1", msg.desc_hdr()))
# Poke burst handler
- rc = self.burst_handle(trx, l12trx, fn, tn)
+ rc = self.burst_handle(l12trx, msg_raw, msg)
if rc is False:
self.shutdown()
# Poke burst counter
- rc = self.burst_count(fn, tn)
+ rc = self.burst_count(msg.fn, msg.tn)
if rc is True:
self.shutdown()
@@ -149,47 +158,23 @@ class Application:
# Burst passed ;)
return True
- def burst_handle(self, trx_burst, l12trx, fn, tn):
+ def burst_handle(self, l12trx, msg_raw, msg):
if self.print_bursts:
- self.burst_dump_bits(sys.stdout, trx_burst, l12trx)
- sys.stdout.flush()
+ print(msg.burst)
if self.output_file is not None:
# TLV: tag defines burst direction (one byte, BE)
self.output_file.write('\x01' if l12trx else '\x02')
# TLV: length of value (one byte, BE)
- length = len(trx_burst)
+ length = len(msg_raw)
self.output_file.write(chr(length))
# TLV: raw value
- self.output_file.write(trx_burst)
+ self.output_file.write(msg_raw)
return True
- def burst_dump_bits(self, dst, trx_burst, l12trx):
- # Split out burst header
- if l12trx:
- burst = trx_burst[6:]
- else:
- burst = trx_burst[8:]
-
- # Print normal bits: 0 or 1
- for i in range(0, 148):
- # Convert bits to chars
- if l12trx:
- # Convert bits as is
- bit = '1' if burst[i] else '0'
- else:
- # Convert trx bits {254..0} to sbits {-127..127}
- bit = -127 if burst[i] == 255 else 127 - burst[i]
- # Convert sbits {-127..127} to ubits {0..1}
- bit = '1' if bit < 0 else '0'
-
- # Write a normal bit
- dst.write(bit)
- dst.write("\n")
-
def burst_count(self, fn, tn):
# Update frame counter
if self.cnt_frame_last is None: