aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/osmo_gsm_tester/bts_octphy.py64
-rw-r--r--src/osmo_gsm_tester/bts_osmo.py146
-rw-r--r--src/osmo_gsm_tester/bts_osmotrx.py63
-rw-r--r--src/osmo_gsm_tester/bts_sysmo.py57
4 files changed, 157 insertions, 173 deletions
diff --git a/src/osmo_gsm_tester/bts_octphy.py b/src/osmo_gsm_tester/bts_octphy.py
index f25823c..fd8d078 100644
--- a/src/osmo_gsm_tester/bts_octphy.py
+++ b/src/osmo_gsm_tester/bts_octphy.py
@@ -20,56 +20,21 @@
import os
import pprint
import tempfile
-from . import log, config, util, template, process, event_loop, pcu_osmo
+from . import log, config, util, template, process, event_loop, pcu_osmo, bts_osmo
-class OsmoBtsOctphy(log.Origin):
- suite_run = None
- bsc = None
- sgsn = None
+class OsmoBtsOctphy(bts_osmo.OsmoBtsMainUnit):
run_dir = None
inst = None
env = None
- pcu_sk_tmp_dir = None
- values = None
- lac = None
- rac = None
- cellid = None
- bvci = None
- proc_bts = None
- _pcu = None
BIN_BTS_OCTPHY = 'osmo-bts-octphy'
CONF_BTS_OCTPHY = 'osmo-bts-octphy.cfg'
def __init__(self, suite_run, conf):
- super().__init__(log.C_RUN, OsmoBtsOctphy.BIN_BTS_OCTPHY)
- self.suite_run = suite_run
- self.conf = conf
+ super().__init__(suite_run, conf, OsmoBtsOctphy.BIN_BTS_OCTPHY)
self.env = {}
self.values = {}
- self.pcu_sk_tmp_dir = tempfile.mkdtemp('', 'ogtpcusk')
- if len(self.pcu_socket_path().encode()) > 107:
- raise log.Error('Path for pcu socket is longer than max allowed len for unix socket path (107):', self.pcu_socket_path())
-
- def cleanup(self):
- if self.pcu_sk_tmp_dir:
- try:
- os.remove(self.pcu_socket_path())
- except OSError:
- pass
- os.rmdir(self.pcu_sk_tmp_dir)
-
- def pcu(self):
- if self._pcu is None:
- self._pcu = pcu_osmo.OsmoPcu(self.suite_run, self, self.conf)
- return self._pcu
-
- def pcu_socket_path(self):
- return os.path.join(self.pcu_sk_tmp_dir, 'pcu_bts')
-
- def remote_addr(self):
- return self.conf.get('addr')
def start(self):
if self.bsc is None:
@@ -185,27 +150,4 @@ class OsmoBtsOctphy(log.Origin):
self.dbg(conf=values)
return values
- def ready_for_pcu(self):
- if not self.proc_bts or not self.proc_bts.is_running:
- return False
- return 'BTS is up' in (self.proc_bts.get_stderr() or '')
-
- def set_bsc(self, bsc):
- self.bsc = bsc
-
- def set_sgsn(self, sgsn):
- self.sgsn = sgsn
-
- def set_lac(self, lac):
- self.lac = lac
-
- def set_rac(self, rac):
- self.rac = rac
-
- def set_cellid(self, cellid):
- self.cellid = cellid
-
- def set_bvci(self, bvci):
- self.bvci = bvci
-
# vim: expandtab tabstop=4 shiftwidth=4
diff --git a/src/osmo_gsm_tester/bts_osmo.py b/src/osmo_gsm_tester/bts_osmo.py
new file mode 100644
index 0000000..16e79cf
--- /dev/null
+++ b/src/osmo_gsm_tester/bts_osmo.py
@@ -0,0 +1,146 @@
+# osmo_gsm_tester: base classes to share code among BTS subclasses.
+#
+# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
+#
+# Author: Pau Espin Pedrol <pespin@sysmocom.de>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import os
+import pprint
+import tempfile
+from abc import ABCMeta, abstractmethod
+from . import log, config, util, template, process, event_loop, pcu_osmo
+
+class OsmoBts(log.Origin, metaclass=ABCMeta):
+ suite_run = None
+ proc_bts = None
+ bsc = None
+ sgsn = None
+ lac = None
+ rac = None
+ cellid = None
+ bvci = None
+ _pcu = None
+
+##############
+# PROTECTED
+##############
+ def __init__(self, suite_run, conf, name):
+ super().__init__(log.C_RUN, name)
+ self.suite_run = suite_run
+ self.conf = conf
+ if len(self.pcu_socket_path().encode()) > 107:
+ raise log.Error('Path for pcu socket is longer than max allowed len for unix socket path (107):', self.pcu_socket_path())
+
+########################
+# PUBLIC - INTERNAL API
+########################
+ @abstractmethod
+ def conf_for_bsc(self):
+ 'Used by bsc objects to get path to socket.'
+ pass
+
+ @abstractmethod
+ def pcu_socket_path(self):
+ 'Used by pcu objects to get path to socket.'
+ pass
+
+ @abstractmethod
+ def create_pcu(self):
+ 'Used by base class. Subclass can create different pcu implementations.'
+ pass
+
+ def remote_addr(self):
+ return self.conf.get('addr')
+
+ def ready_for_pcu(self):
+ if not self.proc_bts or not self.proc_bts.is_running:
+ return False
+ return 'BTS is up' in (self.proc_bts.get_stderr() or '')
+
+ def cleanup(self):
+ 'Nothing to do by default. Subclass can override if required.'
+ pass
+
+###################
+# PUBLIC (test API included)
+###################
+ @abstractmethod
+ def start(self):
+ 'Starts BTS proccess and sets self.proc_bts with an object of Process interface'
+ pass
+
+ def pcu(self):
+ if self._pcu is None:
+ self._pcu = self.create_pcu(self.suite_run, self, self.conf)
+ return self._pcu
+
+ def set_bsc(self, bsc):
+ self.bsc = bsc
+
+ def set_sgsn(self, sgsn):
+ self.sgsn = sgsn
+
+ def set_lac(self, lac):
+ self.lac = lac
+
+ def set_rac(self, rac):
+ self.rac = rac
+
+ def set_cellid(self, cellid):
+ self.cellid = cellid
+
+ def set_bvci(self, bvci):
+ self.bvci = bvci
+
+
+class OsmoBtsMainUnit(OsmoBts, metaclass=ABCMeta):
+##############
+# PROTECTED
+##############
+ pcu_sk_tmp_dir = None
+
+ def __init__(self, suite_run, conf, name):
+ super().__init__(suite_run, conf, name)
+
+########################
+# PUBLIC - INTERNAL API
+########################
+ @abstractmethod
+ def conf_for_bsc(self):
+ pass
+
+ def cleanup(self):
+ if self.pcu_sk_tmp_dir:
+ try:
+ os.remove(self.pcu_socket_path())
+ except OSError:
+ pass
+ os.rmdir(self.pcu_sk_tmp_dir)
+
+ def create_pcu(self):
+ return pcu_osmo.OsmoPcu(self.suite_run, self, self.conf)
+
+ def pcu_socket_path(self):
+ if self.pcu_sk_tmp_dir is None:
+ self.pcu_sk_tmp_dir = tempfile.mkdtemp('', 'ogtpcusk')
+ return os.path.join(self.pcu_sk_tmp_dir, 'pcu_bts')
+
+###################
+# PUBLIC (test API included)
+###################
+ @abstractmethod
+ def start(self):
+ pass
diff --git a/src/osmo_gsm_tester/bts_osmotrx.py b/src/osmo_gsm_tester/bts_osmotrx.py
index 9b1f077..fb13545 100644
--- a/src/osmo_gsm_tester/bts_osmotrx.py
+++ b/src/osmo_gsm_tester/bts_osmotrx.py
@@ -20,23 +20,13 @@
import os
import pprint
import tempfile
-from . import log, config, util, template, process, event_loop, pcu_osmo
+from . import log, config, util, template, process, event_loop, pcu_osmo, bts_osmo
-class OsmoBtsTrx(log.Origin):
- suite_run = None
- bsc = None
- sgsn = None
+class OsmoBtsTrx(bts_osmo.OsmoBtsMainUnit):
run_dir = None
inst = None
env = None
trx = None
- pcu_sk_tmp_dir = None
- lac = None
- rac = None
- cellid = None
- bvci = None
- proc_bts = None
- _pcu = None
BIN_BTS_TRX = 'osmo-bts-trx'
BIN_PCU = 'osmo-pcu'
@@ -44,32 +34,8 @@ class OsmoBtsTrx(log.Origin):
CONF_BTS_TRX = 'osmo-bts-trx.cfg'
def __init__(self, suite_run, conf):
- super().__init__(log.C_RUN, OsmoBtsTrx.BIN_BTS_TRX)
- self.suite_run = suite_run
- self.conf = conf
+ super().__init__(suite_run, conf, OsmoBtsTrx.BIN_BTS_TRX)
self.env = {}
- self.pcu_sk_tmp_dir = tempfile.mkdtemp('', 'ogtpcusk')
- if len(self.pcu_socket_path().encode()) > 107:
- raise log.Error('Path for pcu socket is longer than max allowed len for unix socket path (107):', self.pcu_socket_path())
-
- def cleanup(self):
- if self.pcu_sk_tmp_dir:
- try:
- os.remove(self.pcu_socket_path())
- except OSError:
- pass
- os.rmdir(self.pcu_sk_tmp_dir)
-
- def pcu(self):
- if self._pcu is None:
- self._pcu = pcu_osmo.OsmoPcu(self.suite_run, self, self.conf)
- return self._pcu
-
- def pcu_socket_path(self):
- return os.path.join(self.pcu_sk_tmp_dir, 'pcu_bts')
-
- def remote_addr(self):
- return self.conf.get('addr')
def trx_remote_ip(self):
conf_ip = self.conf.get('trx_remote_ip', None)
@@ -163,29 +129,6 @@ class OsmoBtsTrx(log.Origin):
self.dbg(conf=values)
return values
- def ready_for_pcu(self):
- if not self.proc_bts or not self.proc_bts.is_running:
- return False
- return 'BTS is up' in (self.proc_bts.get_stderr() or '')
-
- def set_bsc(self, bsc):
- self.bsc = bsc
-
- def set_sgsn(self, sgsn):
- self.sgsn = sgsn
-
- def set_lac(self, lac):
- self.lac = lac
-
- def set_rac(self, rac):
- self.rac = rac
-
- def set_cellid(self, cellid):
- self.cellid = cellid
-
- def set_bvci(self, bvci):
- self.bvci = bvci
-
class OsmoTrx(log.Origin):
suite_run = None
run_dir = None
diff --git a/src/osmo_gsm_tester/bts_sysmo.py b/src/osmo_gsm_tester/bts_sysmo.py
index 1d2dbf6..d286d02 100644
--- a/src/osmo_gsm_tester/bts_sysmo.py
+++ b/src/osmo_gsm_tester/bts_sysmo.py
@@ -19,33 +19,20 @@
import os
import pprint
-from . import log, config, util, template, process, pcu_sysmo
+from . import log, config, util, template, process, pcu_sysmo, bts_osmo
-class SysmoBts(log.Origin):
- suite_run = None
- bsc = None
- sgsn = None
+class SysmoBts(bts_osmo.OsmoBts):
run_dir = None
inst = None
remote_inst = None
- remote_env = None
remote_dir = None
- lac = None
- rac = None
- cellid = None
- bvci = None
- proc_bts = None
- _pcu = None
REMOTE_DIR = '/osmo-gsm-tester-bts'
BTS_SYSMO_BIN = 'osmo-bts-sysmo'
BTS_SYSMO_CFG = 'osmo-bts-sysmo.cfg'
def __init__(self, suite_run, conf):
- super().__init__(log.C_RUN, self.BTS_SYSMO_BIN)
- self.suite_run = suite_run
- self.conf = conf
- self.remote_env = {}
+ super().__init__(suite_run, conf, SysmoBts.BTS_SYSMO_BIN)
self.remote_user = 'root'
def start(self):
@@ -91,15 +78,9 @@ class SysmoBts(log.Origin):
self.proc_bts = self.launch_remote('osmo-bts-sysmo', args, remote_cwd=remote_run_dir)
- def cleanup(self):
- pass
-
def _direct_pcu_enabled(self):
return util.str2bool(self.conf.get('direct_pcu'))
- def pcu_socket_path(self):
- return os.path.join(SysmoBts.REMOTE_DIR, 'pcu_bts')
-
def _process_remote(self, name, popen_args, remote_cwd=None):
run_dir = self.run_dir.new_dir(name)
return process.RemoteProcess(name, run_dir, self.remote_user, self.remote_addr(), remote_cwd,
@@ -128,13 +109,8 @@ class SysmoBts(log.Origin):
log.ctx(proc)
raise log.Error('Exited in error')
- def pcu(self):
- if self._pcu is None:
- self._pcu = pcu_sysmo.OsmoPcuSysmo(self.suite_run, self, self.conf)
- return self._pcu
-
- def remote_addr(self):
- return self.conf.get('addr')
+ def create_pcu(self):
+ return pcu_sysmo.OsmoPcuSysmo(self.suite_run, self, self.conf)
def pcu_socket_path(self):
return os.path.join(SysmoBts.REMOTE_DIR, 'pcu_bts')
@@ -182,27 +158,4 @@ class SysmoBts(log.Origin):
self.dbg(conf=values)
return values
- def ready_for_pcu(self):
- if not self.proc_bts or not self.proc_bts.is_running:
- return False
- return 'BTS is up' in (self.proc_bts.get_stderr() or '')
-
- def set_bsc(self, bsc):
- self.bsc = bsc
-
- def set_sgsn(self, sgsn):
- self.sgsn = sgsn
-
- def set_lac(self, lac):
- self.lac = lac
-
- def set_rac(self, rac):
- self.rac = rac
-
- def set_cellid(self, cellid):
- self.cellid = cellid
-
- def set_bvci(self, bvci):
- self.bvci = bvci
-
# vim: expandtab tabstop=4 shiftwidth=4