From 7a49fc31c10502d409ed0fed1d8f5b34dd6367fa Mon Sep 17 00:00:00 2001 From: Vadim Yanitskiy Date: Sun, 22 Dec 2019 00:20:39 +1000 Subject: trx_toolkit/data_dump.py: rewrite unit tests to use unittest framework Change-Id: I8b934c15ba96d856aa79d10bf296d1446f043dd1 --- src/target/trx_toolkit/data_dump.py | 145 --------------------------- src/target/trx_toolkit/test_data_dump.py | 165 +++++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+), 145 deletions(-) create mode 100644 src/target/trx_toolkit/test_data_dump.py diff --git a/src/target/trx_toolkit/data_dump.py b/src/target/trx_toolkit/data_dump.py index b91408f6..7c5452bf 100644 --- a/src/target/trx_toolkit/data_dump.py +++ b/src/target/trx_toolkit/data_dump.py @@ -220,148 +220,3 @@ class DATADumpFile(DATADump): def append_all(self, msgs): for msg in msgs: self.append_msg(msg) - -# Regression tests -if __name__ == '__main__': - from tempfile import TemporaryFile - from gsm_shared import * - import random - - # Configure logging - log.basicConfig(level = log.DEBUG, - format = "[%(levelname)s] %(filename)s:%(lineno)d %(message)s") - - # Create a temporary file - tf = TemporaryFile() - - # Create an instance of DATA dump manager - ddf = DATADumpFile(tf) - - # Generate two random bursts - burst_l12trx = [] - burst_trx2l1 = [] - - for i in range(0, GSM_BURST_LEN): - ubit = random.randint(0, 1) - burst_l12trx.append(ubit) - - sbit = random.randint(-127, 127) - burst_trx2l1.append(sbit) - - # Generate a basic list of random messages - log.info("Generating the reference messages") - messages_ref = [] - - for i in range(100): - # Create a message - if i % 2: - msg = DATAMSG_L12TRX() - msg.burst = burst_l12trx - else: - msg = DATAMSG_TRX2L1() - msg.burst = burst_trx2l1 - - # Randomize the header - msg.rand_hdr() - - # Append - messages_ref.append(msg) - - log.info("Adding the following messages to the capture:") - for msg in messages_ref[:3]: - log.info("%s: burst_len=%d" - % (msg.desc_hdr(), len(msg.burst))) - - # Check single message appending - ddf.append_msg(messages_ref[0]) - ddf.append_msg(messages_ref[1]) - ddf.append_msg(messages_ref[2]) - - # Read the written messages back - messages_check = ddf.parse_all() - - log.info("Read the following messages back:") - for msg in messages_check: - log.info("%s: burst_len=%d" - % (msg.desc_hdr(), len(msg.burst))) - - # Expecting three messages - assert(len(messages_check) == 3) - - # Check the messages - for i in range(3): - # Compare common header parts and bursts - assert(messages_check[i].burst == messages_ref[i].burst) - assert(messages_check[i].fn == messages_ref[i].fn) - assert(messages_check[i].tn == messages_ref[i].tn) - - # Validate a message - messages_check[i].validate() - - log.info("Check append_msg(): OK") - - - # Append the pending reference messages - ddf.append_all(messages_ref[3:]) - - # Read the written messages back - messages_check = ddf.parse_all() - - # Check the final amount - assert(len(messages_check) == len(messages_ref)) - - # Check the messages - for i in range(len(messages_check)): - # Compare common header parts and bursts - assert(messages_check[i].burst == messages_ref[i].burst) - assert(messages_check[i].fn == messages_ref[i].fn) - assert(messages_check[i].tn == messages_ref[i].tn) - - # Validate a message - messages_check[i].validate() - - log.info("Check append_all(): OK") - - - # Check parse_msg() - msg0 = ddf.parse_msg(0) - msg10 = ddf.parse_msg(10) - - # Make sure parsing was successful - assert(msg0 and msg10) - - # Compare common header parts and bursts - assert(msg0.burst == messages_ref[0].burst) - assert(msg0.fn == messages_ref[0].fn) - assert(msg0.tn == messages_ref[0].tn) - - assert(msg10.burst == messages_ref[10].burst) - assert(msg10.fn == messages_ref[10].fn) - assert(msg10.tn == messages_ref[10].tn) - - # Validate both messages - msg0.validate() - msg10.validate() - - log.info("Check parse_msg(): OK") - - - # Check parse_all() with range - messages_check = ddf.parse_all(skip = 10, count = 20) - - # Make sure parsing was successful - assert(messages_check) - - # Check the amount - assert(len(messages_check) == 20) - - for i in range(20): - # Compare common header parts and bursts - assert(messages_check[i].burst == messages_ref[i + 10].burst) - assert(messages_check[i].fn == messages_ref[i + 10].fn) - assert(messages_check[i].tn == messages_ref[i + 10].tn) - - # Validate a message - messages_check[i].validate() - - log.info("Check parse_all(): OK") diff --git a/src/target/trx_toolkit/test_data_dump.py b/src/target/trx_toolkit/test_data_dump.py new file mode 100644 index 00000000..2f7e25a0 --- /dev/null +++ b/src/target/trx_toolkit/test_data_dump.py @@ -0,0 +1,165 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# TRX Toolkit +# Unit tests for DATA capture management +# +# (C) 2019 by Vadim Yanitskiy +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +import unittest +import tempfile +import random + +from gsm_shared import * +from data_dump import * + +class DATADump_Test(unittest.TestCase): + def setUp(self): + # Create a temporary file + self._tf = tempfile.TemporaryFile(mode = 'w+b') + + # Create an instance of DATA dump manager + self._ddf = DATADumpFile(self._tf) + + # Compare message a with message b + def _compare_msg(self, a, b): + # Make sure we're comparing messages of the same type + self.assertEqual(a.__class__, b.__class__) + + # Compare common header fields + self.assertEqual(a.ver, b.ver) + self.assertEqual(a.fn, b.fn) + self.assertEqual(a.tn, b.tn) + + # Burst bits (if present) + self.assertEqual(a.burst, b.burst) + + # TRX2L1 specific fields + if isinstance(a, DATAMSG_L12TRX): + self.assertEqual(a.pwr, b.pwr) + + # L12TRX specific fields + if isinstance(a, DATAMSG_TRX2L1): + # Version independent fields + self.assertEqual(a.toa256, b.toa256) + self.assertEqual(a.rssi, b.rssi) + + # Version specific fields + if a.ver >= 1: + self.assertEqual(a.nope_ind, b.nope_ind) + self.assertEqual(a.mod_type, b.mod_type) + self.assertEqual(a.tsc_set, b.tsc_set) + self.assertEqual(a.tsc, b.tsc) + self.assertEqual(a.ci, b.ci) + + # Generate a random message of a given type / version + def _gen_rand_message(self, cls, ver = 1): + msg = cls(ver = ver) + msg.rand_hdr() + msg.rand_burst() + return msg + + # Generate a list of random messages + def _gen_rand_messages(self, cls, count, ver = 1): + msg_list = [] + + for i in range(count): + msg = self._gen_rand_message(cls, ver) + msg_list.append(msg) + + return msg_list + + # Generate a mixed list of random messages + def _gen_rand_message_mix(self, count, ver = 1): + msg_list = [] + msg_list += self._gen_rand_messages(DATAMSG_TRX2L1, count) + msg_list += self._gen_rand_messages(DATAMSG_L12TRX, count) + random.shuffle(msg_list) + return msg_list + + def _test_store_and_parse(self, cls): + msg_ref = self._gen_rand_message(cls) + self._ddf.append_msg(msg_ref) + + msg = self._ddf.parse_msg(0) + self._compare_msg(msg, msg_ref) + + # Store one TRX2L1 message in a file, read it back and compare + def test_store_and_parse_trx2l1(self): + self._test_store_and_parse(DATAMSG_TRX2L1) + + # Store one L12TRX message in a file, read it back and compare + def test_store_and_parse_l12trx(self): + self._test_store_and_parse(DATAMSG_L12TRX) + + # Store multiple TRX2L1/L12TRX messages in a file, read them back and compare + def test_store_and_parse_all(self): + # Store a mixed list of random messages (19 + 19) + msg_list_ref = self._gen_rand_message_mix(19) + self._ddf.append_all(msg_list_ref) + + # Retrieve and compare stored messages + msg_list = self._ddf.parse_all() + for i in range(len(msg_list_ref)): + self._compare_msg(msg_list[i], msg_list_ref[i]) + + # Verify random access to stored messages + def test_parse_msg_idx(self): + # Store a mixed list of random messages (19 + 19) + msg_list_ref = self._gen_rand_message_mix(19) + self._ddf.append_all(msg_list_ref) + + # Random access + for _ in range(100): + idx = random.randrange(len(msg_list_ref)) + msg = self._ddf.parse_msg(idx) + self._compare_msg(msg, msg_list_ref[idx]) + + def test_parse_empty(self): + with self.assertLogs(level = 'ERROR'): + idx = random.randrange(100) + msg = self._ddf.parse_msg(idx) + self.assertEqual(msg, False) + + def test_parse_all_empty(self): + msg_list = self._ddf.parse_all() + self.assertEqual(msg_list, []) + + def test_parse_len_overflow(self): + # Write a malformed message directly + self._tf.write(DATADump.TAG_L12TRX) + self._tf.write(b'\x00\x63') # 99 + self._tf.write(b'\xff' * 90) + + with self.assertLogs(level = 'ERROR'): + msg = self._ddf.parse_msg(0) + self.assertEqual(msg, None) + + def test_parse_unknown_tag(self): + # Write a malformed message directly + self._tf.write(b'\x33') + self._tf.write(b'\x00\x63') # 99 + self._tf.write(b'\xff' * 90) + + with self.assertLogs(level = 'ERROR'): + msg = self._ddf.parse_msg(0) + self.assertEqual(msg, None) + +if __name__ == '__main__': + unittest.main() -- cgit v1.2.3