summaryrefslogtreecommitdiffstats
path: root/src/osmo_gsm_tester/event_loop.py
blob: 6f64ef9e49a4d3f8d9b0301ff799f7919ad3e084 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# osmo_gsm_tester: Event loop
#
# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
#
# Author: Pau Espin Pedrol <pespin@sysmocom.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

# These will be initialized before each test run.
# A test script can thus establish its context by doing:
# from osmo_gsm_tester.test import *
import time
from . import log

poll_funcs = []

def register_poll_func(func):
    global poll_funcs
    poll_funcs.append(func)

def unregister_poll_func(func):
    global poll_funcs
    poll_funcs.remove(func)

def poll():
    global poll_funcs
    for func in poll_funcs:
        func()

def wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
    if not timeout or timeout < 0:
        self = log_obj
        raise log.Error('wait() *must* time out at some point.', timeout=timeout)
    if timestep < 0.1:
        timestep = 0.1

    started = time.time()
    while True:
        poll()
        if condition(*condition_args, **condition_kwargs):
            return True
        waited = time.time() - started
        if waited > timeout:
            return False
        time.sleep(timestep)

def wait(log_obj, condition, *condition_args, timeout=300, timestep=1, **condition_kwargs):
    if not wait_no_raise(log_obj, condition, condition_args, condition_kwargs, timeout, timestep):
        log.ctx(log_obj)
        raise log.Error('Wait timeout')

def sleep(log_obj, seconds):
    assert seconds > 0.
    wait_no_raise(log_obj, lambda: False, [], {}, timeout=seconds, timestep=min(seconds, 1))


# vim: expandtab tabstop=4 shiftwidth=4