summaryrefslogtreecommitdiffstats
path: root/src/osmo_gsm_tester/osmo_ctrl.py
blob: be27b7517d99efd3bacdf0f7219f89c09ad1fcc2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
# osmo_gsm_tester: specifics for running a sysmoBTS
#
# Copyright (C) 2016-2017 by sysmocom - s.f.m.c. GmbH
#
# Author: Neels Hofmeyr <neels@hofmeyr.de>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

import socket
import struct

from . import log

class CtrlInterfaceExn(Exception):
    pass

class OsmoCtrl(log.Origin):

    def __init__(self, host, port):
        super().__init__(log.C_BUS, 'Ctrl', host=host, port=port)
        self.host = host
        self.port = port
        self.sck = None

    def prefix_ipa_ctrl_header(self, data):
        if isinstance(data, str):
            data = data.encode('utf-8')
        s = struct.pack(">HBB", len(data)+1, 0xee, 0)
        return s + data

    def remove_ipa_ctrl_header(self, data):
        if (len(data) < 4):
            raise CtrlInterfaceExn("Answer too short!")
        (plen, ipa_proto, osmo_proto) = struct.unpack(">HBB", data[:4])
        if (plen + 3 > len(data)):
            self.err('Warning: Wrong payload length', expected=plen, got=len(data)-3)
        if (ipa_proto != 0xee or osmo_proto != 0):
            raise CtrlInterfaceExn("Wrong protocol in answer!")
        return data[4:plen+3], data[plen+3:]

    def connect(self):
        self.dbg('Connecting')
        self.sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sck.connect((self.host, self.port))
        self.sck.setblocking(1)

    def disconnect(self):
        self.dbg('Disconnecting')
        if self.sck is not None:
            self.sck.close()

    def _send(self, data):
        self.dbg('Sending', data=data)
        data = self.prefix_ipa_ctrl_header(data)
        self.sck.send(data)

    def receive(self, length = 1024):
        data = self.sck.recv(length)
        self.dbg('Receiving', data=data)
        return data

    def do_set(self, var, value, id=0):
        setmsg = "SET %s %s %s" %(id, var, value)
        self._send(setmsg)

    def do_get(self, var, id=0):
        getmsg = "GET %s %s" %(id, var)
        self._send(getmsg)

    def __enter__(self):
        self.connect()
        return self

    def __exit__(self, *exc_info):
        self.disconnect()

# vim: expandtab tabstop=4 shiftwidth=4