From afd110a3b5d8a565bffb251f9e031b56605558ac Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Tue, 20 Feb 2018 17:36:09 +0700 Subject: fake_trx: implement classes for DATA capture menagement This change introduces the following classes: - DATADump - basic class, which contains methods to generate and parse the a message header, and some constants. - DATADumpFile - a child class, which contains methods to write and parse DATA messages from capture files. Usage example: # Open a capture file ddf = DATADumpFile("capture.bin") # Parse the 10th message msg = ddf.parse_msg(10) msg.fn = 100 msg.tn = 0 # Append one to the end of the capture ddf.append_msg(msg) Change-Id: I1b31183bd7bcca94de089847ee0b2f4ec88a7f1d --- src/target/fake_trx/data_dump.py | 379 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 379 insertions(+) create mode 100644 src/target/fake_trx/data_dump.py diff --git a/src/target/fake_trx/data_dump.py b/src/target/fake_trx/data_dump.py new file mode 100644 index 00000000..b9047365 --- /dev/null +++ b/src/target/fake_trx/data_dump.py @@ -0,0 +1,379 @@ +#!/usr/bin/env python2 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Helpers for DATA capture management +# +# (C) 2018 by Vadim Yanitskiy +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import struct + +from data_msg import * + +class DATADump: + # Constants + TAG_L12TRX = '\x01' + TAG_TRX2L1 = '\x02' + HDR_LENGTH = 2 + + # Generates raw bytes from a DATA message + # Return value: raw message bytes + def dump_msg(self, msg): + # Determine a message type + if isinstance(msg, DATAMSG_L12TRX): + tag = self.TAG_L12TRX + elif isinstance(msg, DATAMSG_TRX2L1): + tag = self.TAG_TRX2L1 + else: + raise ValueError("Unknown message type") + + # Generate a message payload + msg_raw = msg.gen_msg() + + # Calculate the length + msg_len = len(msg_raw) + + # Concatenate a message with header + return bytearray([tag, msg_len]) + msg_raw + + def parse_hdr(self, hdr): + # Extract the header info + msg_len = struct.unpack(" TRX + msg = DATAMSG_L12TRX() + elif tag == self.TAG_TRX2L1: + # TRX -> L1 + msg = DATAMSG_TRX2L1() + else: + # Unknown tag + return False + + return (msg, msg_len) + +class DATADumpFile(DATADump): + def __init__(self, capture): + # Check if capture file is already opened + if type(capture) is file: + self.f = capture + else: + print("[i] Opening capture file '%s'..." % capture) + self.f = open(capture, "a+b") + + def __del__(self): + print("[i] Closing the capture file") + self.f.close() + + # Moves the file descriptor before a specified message + # Return value: + # True in case of success, + # or False in case of EOF or header parsing error. + def _seek2msg(self, idx): + # Seek to the begining of the capture + self.f.seek(0) + + # Read the capture in loop... + for i in range(idx): + # Attempt to read a message header + hdr_raw = self.f.read(self.HDR_LENGTH) + if len(hdr_raw) != self.HDR_LENGTH: + return False + + # Attempt to parse it + rc = self.parse_hdr(hdr_raw) + if rc is False: + print("[!] Couldn't parse a message header") + return False + + # Expand the header + (_, msg_len) = rc + + # Skip a message + self.f.seek(msg_len, 1) + + return True + + # Parses a single message at the current descriptor position + # Return value: + # a parsed message in case of success, + # or None in case of EOF or header parsing error, + # or False in case of message parsing error. + def _parse_msg(self): + # Attempt to read a message header + hdr_raw = self.f.read(self.HDR_LENGTH) + if len(hdr_raw) != self.HDR_LENGTH: + return None + + # Attempt to parse it + rc = self.parse_hdr(hdr_raw) + if rc is False: + print("[!] Couldn't parse a message header") + return None + + # Expand the header + (msg, msg_len) = rc + + # Attempt to read a message + msg_raw = self.f.read(msg_len) + if len(msg_raw) != msg_len: + print("[!] Message length mismatch") + return None + + # Attempt to parse a message + try: + msg_raw = bytearray(msg_raw) + msg.parse_msg(msg_raw) + except: + print("[!] Couldn't parse a message, skipping...") + return False + + # Success + return msg + + # Parses a particular message defined by index idx + # Return value: + # a parsed message in case of success, + # or None in case of EOF or header parsing error, + # or False in case of message parsing error or out of range. + def parse_msg(self, idx): + # Move descriptor to the begining of requested message + rc = self._seek2msg(idx) + if not rc: + print("[!] Couldn't find requested message") + return False + + # Attempt to parse a message + return self._parse_msg() + + # Parses all messages from a given file + # Return value: + # list of parsed messages, + # or False in case of range error. + def parse_all(self, skip = None, count = None): + result = [] + + # Should we skip some messages? + if skip is None: + # Seek to the begining of the capture + self.f.seek(0) + else: + rc = self._seek2msg(skip) + if not rc: + print("[!] Couldn't find requested message") + return False + + # Read the capture in loop... + while True: + # Attempt to parse a message + msg = self._parse_msg() + + # EOF or broken header + if msg is None: + break + + # Skip unparsed messages + if msg is False: + continue + + # Success, append a message + result.append(msg) + + # Count limitation + if count is not None: + if len(result) == count: + break + + return result + + # Writes a new message at the end of the capture + def append_msg(self, msg): + # Generate raw bytes and write + msg_raw = self.dump_msg(msg) + self.f.write(msg_raw) + + # Writes a list of messages at the end of the capture + def append_all(self, msgs): + for msg in msgs: + self.append_msg(msg) + +# Regression tests +if __name__ == '__main__': + from tempfile import TemporaryFile + from gsm_shared import * + import random + + # Create a temporary file + tf = TemporaryFile() + + # Create an instance of DATA dump manager + ddf = DATADumpFile(tf) + + # Generate two random bursts + burst_l12trx = [] + burst_trx2l1 = [] + + for i in range(0, GSM_BURST_LEN): + ubit = random.randint(0, 1) + burst_l12trx.append(ubit) + + sbit = random.randint(-127, 127) + burst_trx2l1.append(sbit) + + # Generate a basic list of random messages + print("[i] Generating the reference messages") + messages_ref = [] + + for i in range(100): + # Create a message + if i % 2: + msg = DATAMSG_L12TRX() + msg.burst = burst_l12trx + else: + msg = DATAMSG_TRX2L1() + msg.burst = burst_trx2l1 + + # Randomize the header + msg.rand_hdr() + + # HACK: as ToA parsing is not implemented yet, + # we have to use a fixed 0.00 value for now... + if isinstance(msg, DATAMSG_TRX2L1): + msg.toa = 0.00 + + # Append + messages_ref.append(msg) + + print("[i] Adding the following messages to the capture:") + for msg in messages_ref[:3]: + print(" %s: burst_len=%d" + % (msg.desc_hdr(), len(msg.burst))) + + # Check single message appending + ddf.append_msg(messages_ref[0]) + ddf.append_msg(messages_ref[1]) + ddf.append_msg(messages_ref[2]) + + # Read the written messages back + messages_check = ddf.parse_all() + + print("[i] Read the following messages back:") + for msg in messages_check: + print(" %s: burst_len=%d" + % (msg.desc_hdr(), len(msg.burst))) + + # Expecting three messages + assert(len(messages_check) == 3) + + # Check the messages + for i in range(3): + # Compare common header parts and bursts + assert(messages_check[i].burst == messages_ref[i].burst) + assert(messages_check[i].fn == messages_ref[i].fn) + assert(messages_check[i].tn == messages_ref[i].tn) + + # HACK: as ToA parsing is not implemented yet, + # we have to use a fixed 0.00 value for now... + messages_check[i].toa = 0.00 + + # Validate a message + assert(messages_check[i].validate()) + + print("[?] Check append_msg(): OK") + + + # Append the pending reference messages + ddf.append_all(messages_ref[3:]) + + # Read the written messages back + messages_check = ddf.parse_all() + + # Check the final amount + assert(len(messages_check) == len(messages_ref)) + + # Check the messages + for i in range(len(messages_check)): + # Compare common header parts and bursts + assert(messages_check[i].burst == messages_ref[i].burst) + assert(messages_check[i].fn == messages_ref[i].fn) + assert(messages_check[i].tn == messages_ref[i].tn) + + # HACK: as ToA parsing is not implemented yet, + # we have to use a fixed 0.00 value for now... + messages_check[i].toa = 0.00 + + # Validate a message + assert(messages_check[i].validate()) + + print("[?] Check append_all(): OK") + + + # Check parse_msg() + msg0 = ddf.parse_msg(0) + msg10 = ddf.parse_msg(10) + + # Make sure parsing was successful + assert(msg0 and msg10) + + # Compare common header parts and bursts + assert(msg0.burst == messages_ref[0].burst) + assert(msg0.fn == messages_ref[0].fn) + assert(msg0.tn == messages_ref[0].tn) + + assert(msg10.burst == messages_ref[10].burst) + assert(msg10.fn == messages_ref[10].fn) + assert(msg10.tn == messages_ref[10].tn) + + # HACK: as ToA parsing is not implemented yet, + # we have to use a fixed 0.00 value for now... + msg0.toa = 0.00 + msg10.toa = 0.00 + + # Validate both messages + assert(msg0.validate()) + assert(msg10.validate()) + + print("[?] Check parse_msg(): OK") + + + # Check parse_all() with range + messages_check = ddf.parse_all(skip = 10, count = 20) + + # Make sure parsing was successful + assert(messages_check) + + # Check the amount + assert(len(messages_check) == 20) + + for i in range(20): + # Compare common header parts and bursts + assert(messages_check[i].burst == messages_ref[i + 10].burst) + assert(messages_check[i].fn == messages_ref[i + 10].fn) + assert(messages_check[i].tn == messages_ref[i + 10].tn) + + # HACK: as ToA parsing is not implemented yet, + # we have to use a fixed 0.00 value for now... + messages_check[i].toa = 0.00 + + # Validate a message + assert(messages_check[i].validate()) + + print("[?] Check parse_all(): OK") -- cgit v1.2.3