aboutsummaryrefslogtreecommitdiffstats
path: root/src/osmo_gsm_tester/core/test.py
blob: 45dfd41f460d54c88dd293afe5d15c088a431885 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# osmo_gsm_tester: test class
#
# Copyright (C) 2017 by sysmocom - s.f.m.c. GmbH
#
# Author: Pau Espin Pedrol <pespin@sysmocom.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import os
import sys
import time
import traceback

from . import log
from . import  util
from . import resource
from .event_loop import MainLoop

from .. import testenv

class Test(log.Origin):
    UNKNOWN = 'UNKNOWN' # matches junit 'error'
    SKIP = 'skip'
    PASS = 'pass'
    FAIL = 'FAIL'

    def __init__(self, suite_run, test_basename, test_specific_config):
        self.basename = test_basename
        super().__init__(log.C_TST, self.basename)
        self._run_dir = None
        self.suite_run = suite_run
        self._config_test_specific = test_specific_config
        self.path = os.path.join(self.suite_run.definition.suite_dir, self.basename)
        self.status = Test.UNKNOWN
        self.start_timestamp = 0
        self.duration = 0
        self.fail_type = None
        self.fail_message = None
        self.log_targets = []
        self._report_stdout = None

    def module_name(self):
        'Return test name without trailing .py'
        assert self.basename.endswith('.py')
        return self.basename[:-3]

    def get_run_dir(self):
        if self._run_dir is None:
            self._run_dir = util.Dir(self.suite_run.get_run_dir().new_dir(self._name))
        return self._run_dir

    def run(self):
        testenv_obj = None
        try:
            self.log_targets = [log.FileLogTarget(self.get_run_dir().new_child(log.FILE_LOG)).set_all_levels(log.L_DBG).style_change(trace=True),
                                log.FileLogTarget(self.get_run_dir().new_child(log.FILE_LOG_BRIEF)).style_change(src=False, all_origins_on_levels=(log.L_ERR, log.L_TRACEBACK))]
            log.large_separator(self.suite_run.trial().name(), self.suite_run.name(), self.name(), sublevel=3)
            self.status = Test.UNKNOWN
            self.start_timestamp = time.time()
            testenv_obj = testenv.setup(self.suite_run, self)
            with self.redirect_stdout():
                util.run_python_file('%s.%s' % (self.suite_run.definition.name(), self.basename),
                                     self.path)
            if self.status == Test.UNKNOWN:
                 self.set_pass()
        except Exception as e:
            if hasattr(e, 'msg'):
                msg = e.msg
            else:
                msg = str(e)
            if isinstance(e, AssertionError):
                # AssertionError lacks further information on what was
                # asserted. Find the line where the code asserted:
                msg += log.get_src_from_exc_info(sys.exc_info())
            # add source file information to failure report
            if hasattr(e, 'origins'):
                msg += ' [%s]' % e.origins
            tb_str = traceback.format_exc()
            if isinstance(e, resource.NoResourceExn):
                tb_str += self.suite_run.resource_status_str()
            self.set_fail(type(e).__name__, msg, tb_str, log.get_src_from_exc_info())
        except BaseException as e:
            # when the program is aborted by a signal (like Ctrl-C), escalate to abort all.
            self.err('TEST RUN ABORTED: %s' % type(e).__name__)
            raise
        finally:
            if testenv_obj:
                testenv_obj.stop()
            for log_tgt in self.log_targets:
                log_tgt.remove()

    def name(self):
        l = log.get_line_for_src(self.path)
        if l is not None:
            return '%s:%s' % (self._name, l)
        return super().name()

    def elapsed_time(self):
        'time elapsed since test was started'
        return time.time() - self.start_timestamp

    def set_fail(self, fail_type, fail_message, tb_str=None, src=4):
        self.status = Test.FAIL
        self.duration = self.elapsed_time()
        self.fail_type = fail_type
        self.fail_message = fail_message

        if tb_str is None:
            # populate an exception-less call to set_fail() with traceback info
            tb_str = ''.join(traceback.format_stack()[:-1])

        self.fail_tb = tb_str
        self.err('%s: %s' % (self.fail_type, self.fail_message), _src=src)
        if self.fail_tb:
            self.log(self.fail_tb, _level=log.L_TRACEBACK)
        self.log('Test FAILED (%.1f sec)' % self.duration)

    def set_pass(self):
        self.status = Test.PASS
        self.duration = self.elapsed_time()
        self.log('Test passed (%.1f sec)' % self.duration)

    def set_skip(self):
        self.status = Test.SKIP
        self.duration = 0

    def config_test_specific(self):
        return self._config_test_specific

    def set_report_stdout(self, text):
        'Overwrite stdout text stored in report from inside a test'
        self._report_stdout = text

    def report_stdout(self):
        # If test overwrote the text, provide it:
        if self._report_stdout is not None:
            return self._report_stdout
        # Otherwise vy default provide the entire test brief log:
        if len(self.log_targets) == 2 and self.log_targets[1].log_file_path() is not None:
            with open(self.log_targets[1].log_file_path(), 'r') as myfile:
                return myfile.read()
        else:
            return 'test log file not available'

# vim: expandtab tabstop=4 shiftwidth=4