aboutsummaryrefslogtreecommitdiffstats
path: root/apps/grgsm_decode
blob: dbac386250308035f17f84979baec35ce2060188 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @file
# @author (C) 2015 by Roman Khassraf <rkhassraf@gmail.com>
# @section LICENSE
#
# Gr-gsm is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3, or (at your option)
# any later version.
#
# Gr-gsm is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with gr-gsm; see the file COPYING.  If not, write to
# the Free Software Foundation, Inc., 51 Franklin Street,
# Boston, MA 02110-1301, USA.
#
#

from gnuradio import blocks
from gnuradio import eng_notation
from gnuradio import gr
from gnuradio.eng_option import eng_option
from optparse import OptionParser, OptionGroup
import collections
import grgsm
import pmt
import socket


class grgsm_decoder(gr.top_block):

    def __init__(self, timeslot=0, subslot=None, chan_mode='BCCH',
                 burst_file=None,
                 cfile=None, fc=None, samp_rate=2e6, 
                 a5=1, a5_kc=None,
                 speech_file=None, speech_codec=None,
                 enable_voice_boundary_detection=False,
                 verbose=False,
                 print_bursts=False, ppm=0):

        gr.top_block.__init__(self, "Gr-gsm Decode")

        ##################################################
        # Parameters
        ##################################################
        self.timeslot = timeslot
        self.subslot = subslot
        self.chan_mode = chan_mode
        self.burst_file = burst_file
        self.cfile = cfile
        self.fc = fc
        self.samp_rate = samp_rate
        self.a5 = a5
        self.kc = a5_kc
        if len(a5_kc) < 8:
            self.kc = [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]
        self.speech_file = speech_file
        self.speech_codec = speech_codec
        self.verbose = verbose
        self.print_bursts = print_bursts
        self.enable_voice_boundary_detection = enable_voice_boundary_detection

        ##################################################
        # Blocks
        ##################################################

        if self.burst_file:
            self.burst_file_source = grgsm.burst_file_source(burst_file)
        elif self.cfile:
            self.file_source = blocks.file_source(gr.sizeof_gr_complex*1, self.cfile, False)
            self.receiver = grgsm.receiver(4, ([0]), ([]))
            if self.fc is not None:
                self.input_adapter = grgsm.gsm_input(ppm=ppm, osr=4, fc=self.fc, samp_rate_in=samp_rate)
                self.offset_control = grgsm.clock_offset_control(self.fc, self.samp_rate)
            else:
                self.input_adapter = grgsm.gsm_input(ppm=ppm, osr=4, samp_rate_in=samp_rate)

        self.dummy_burst_filter = grgsm.dummy_burst_filter()
        self.timeslot_filter = grgsm.burst_timeslot_filter(self.timeslot)

        self.subslot_filter = None
        if self.chan_mode == 'BCCH_SDCCH4' and self.subslot is not None:
            self.subslot_filter = grgsm.burst_sdcch_subslot_filter(grgsm.SS_FILTER_SDCCH4, self.subslot)
        elif self.chan_mode == 'SDCCH8' and self.subslot is not None:
            self.subslot_filter = grgsm.burst_sdcch_subslot_filter(grgsm.SS_FILTER_SDCCH8, self.subslot)

        if self.chan_mode == 'BCCH':
            self.bcch_demapper = grgsm.gsm_bcch_ccch_demapper(self.timeslot)
        elif self.chan_mode == 'BCCH_SDCCH4':
            self.bcch_sdcch4_demapper = grgsm.gsm_bcch_ccch_sdcch4_demapper(self.timeslot)
        elif self.chan_mode == 'SDCCH8':
            self.sdcch8_demapper = grgsm.gsm_sdcch8_demapper(self.timeslot)
        elif self.chan_mode == 'TCHF':
            self.tch_f_demapper = grgsm.tch_f_chans_demapper(self.timeslot)
            self.tch_f_decoder = grgsm.tch_f_decoder(speech_codec, enable_voice_boundary_detection)
            self.tch_f_pdu_to_tagged_stream = blocks.pdu_to_tagged_stream(blocks.byte_t, "packet_len")
            self.tch_f_file_sink = blocks.file_sink(gr.sizeof_char*1, speech_file, False)

        if self.kc != [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]:
            self.decryption = grgsm.decryption(self.kc, self.a5)
            self.cch_decoder_decrypted = grgsm.control_channels_decoder()
            if self.chan_mode == 'TCHF':
                self.decryption_tch_sacch = grgsm.decryption(self.kc, self.a5)

        self.cch_decoder = grgsm.control_channels_decoder()

        self.socket_pdu_server = blocks.socket_pdu("UDP_SERVER", "127.0.0.1", "4729", 10000) #added in order to avoid generating ICMP messages
        self.socket_pdu = blocks.socket_pdu("UDP_CLIENT", "127.0.0.1", "4729", 10000)

        if self.verbose:
            self.message_printer = grgsm.message_printer(pmt.intern(""), True, True, False)

        if self.print_bursts:
            self.bursts_printer = grgsm.bursts_printer(pmt.intern(""), True, True, True, True)



        ##################################################
        # Asynch Message Connections
        ##################################################

        if self.burst_file:
            self.msg_connect(self.burst_file_source, "out", self.dummy_burst_filter, "in")
        elif self.cfile:
            self.connect((self.file_source, 0), (self.input_adapter, 0))
            self.connect((self.input_adapter, 0), (self.receiver, 0))
            if self.fc is not None:
                self.msg_connect(self.offset_control, "ctrl", self.input_adapter, "ctrl_in")
                self.msg_connect(self.receiver, "measurements", self.offset_control, "measurements")
            self.msg_connect(self.receiver, "C0", self.dummy_burst_filter, "in")

        self.msg_connect(self.dummy_burst_filter, "out", self.timeslot_filter, "in")
        if self.print_bursts:
            self.msg_connect(self.timeslot_filter, "out", self.bursts_printer, 'bursts')

        if (self.chan_mode == 'BCCH_SDCCH4' or self.chan_mode == 'SDCCH8') and self.subslot_filter is not None:
            self.msg_connect(self.timeslot_filter, "out", self.subslot_filter, "in")

        if self.chan_mode == 'BCCH':
            if self.subslot_filter is not None:
                self.msg_connect(self.subslot_filter, "out", self.bcch_demapper, "bursts")
            else:
                self.msg_connect(self.timeslot_filter, "out", self.bcch_demapper, "bursts")

            self.msg_connect(self.bcch_demapper, "bursts", self.cch_decoder, "bursts")
            self.msg_connect(self.cch_decoder, "msgs", self.socket_pdu, "pdus")
            if self.verbose:
                self.msg_connect(self.cch_decoder, "msgs", self.message_printer, "msgs")

        elif self.chan_mode == 'BCCH_SDCCH4':
            if self.subslot_filter is not None:
                self.msg_connect(self.subslot_filter, "out", self.bcch_sdcch4_demapper, "bursts")
            else:
                self.msg_connect(self.timeslot_filter, "out", self.bcch_sdcch4_demapper, "bursts")

            if self.kc != [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]:
                self.msg_connect(self.bcch_sdcch4_demapper, "bursts", self.decryption, "bursts")
                self.msg_connect(self.decryption, "bursts", self.cch_decoder_decrypted, "bursts")
                self.msg_connect(self.cch_decoder_decrypted, "msgs", self.socket_pdu, "pdus")
                if self.verbose:
                    self.msg_connect(self.cch_decoder_decrypted, "msgs", self.message_printer, "msgs")

            self.msg_connect(self.bcch_sdcch4_demapper, "bursts", self.cch_decoder, "bursts")
            self.msg_connect(self.cch_decoder, "msgs", self.socket_pdu, "pdus")
            if self.verbose:
                self.msg_connect(self.cch_decoder, "msgs", self.message_printer, "msgs")

        elif self.chan_mode == 'SDCCH8':
            if self.subslot_filter is not None:
                self.msg_connect(self.subslot_filter, "out", self.sdcch8_demapper, "bursts")
            else:
                self.msg_connect(self.timeslot_filter, "out", self.sdcch8_demapper, "bursts")

            if self.kc != [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]:
                self.msg_connect(self.sdcch8_demapper, "bursts", self.decryption, "bursts")
                self.msg_connect(self.decryption, "bursts", self.cch_decoder_decrypted, "bursts")
                self.msg_connect(self.cch_decoder_decrypted, "msgs", self.socket_pdu, "pdus")
                if self.verbose:
                    self.msg_connect(self.cch_decoder_decrypted, "msgs", self.message_printer, "msgs")

            self.msg_connect(self.sdcch8_demapper, "bursts", self.cch_decoder, "bursts")
            self.msg_connect(self.cch_decoder, "msgs", self.socket_pdu, "pdus")
            if self.verbose:
                self.msg_connect(self.cch_decoder, "msgs", self.message_printer, "msgs")

        elif self.chan_mode == 'TCHF':
            self.msg_connect(self.timeslot_filter, "out", self.tch_f_demapper, "bursts")
            if self.kc != [0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]:
                self.msg_connect(self.tch_f_demapper, "acch_bursts", self.decryption_tch_sacch, "bursts")
                self.msg_connect(self.tch_f_demapper, "tch_bursts", self.decryption, "bursts")

                self.msg_connect(self.decryption_tch_sacch, "bursts", self.cch_decoder, "bursts")
                self.msg_connect(self.decryption, "bursts", self.tch_f_decoder, "bursts")
            else:
                self.msg_connect(self.tch_f_demapper, "acch_bursts", self.cch_decoder, "bursts")
                self.msg_connect(self.tch_f_demapper, "tch_bursts", self.tch_f_decoder, "bursts")

            self.msg_connect(self.tch_f_decoder, "msgs", self.socket_pdu, "pdus")
            self.msg_connect(self.cch_decoder, "msgs", self.socket_pdu, "pdus")
            self.msg_connect(self.tch_f_decoder, "voice", self.tch_f_pdu_to_tagged_stream, "pdus")
            self.connect((self.tch_f_pdu_to_tagged_stream, 0), (self.tch_f_file_sink, 0))    
            

            if self.verbose:
                self.msg_connect(self.tch_f_decoder, "msgs", self.message_printer, "msgs")
                self.msg_connect(self.cch_decoder, "msgs", self.message_printer, "msgs")
            

if __name__ == '__main__':

    # List of channel configurations
    channel_modes = ['BCCH', 'BCCH_SDCCH4', 'SDCCH8', 'TCHF']

    # mapping options to grgsm's enums
    tch_codecs = collections.OrderedDict([
        ('FR', grgsm.TCH_FS),
        ('EFR', grgsm.TCH_EFR),
        ('AMR12.2', grgsm.TCH_AFS12_2),
        ('AMR10.2', grgsm.TCH_AFS10_2),
        ('AMR7.95', grgsm.TCH_AFS7_95),
        ('AMR7.4', grgsm.TCH_AFS7_4),
        ('AMR6.7', grgsm.TCH_AFS6_7),
        ('AMR5.9', grgsm.TCH_AFS5_9),
        ('AMR5.15', grgsm.TCH_AFS5_15),
        ('AMR4.75', grgsm.TCH_AFS4_75)
    ])

    kc = []

    parser = OptionParser(option_class=eng_option, usage="%prog: [options]")

    def kc_callback(option, opt_str, value, parser):

        """ Callback function that parses Kc """

        # format 0x12,0x34,0x56,0x78,0x90,0xAB,0xCD,0xEF
        if ',' in value:
            value_str = value.split(',')

            for s in value_str:
                val = int(s, 16)
                if val < 0 or val > 255:
                    parser.error("Invalid Kc % s\n" % s)
                kc.append(val)
            if len(kc) != 8:
                parser.error("Invalid Kc length")
        # format: 1234567890ABCDEF
        elif len(value) == 16:
            for i in range(8):
                s = value[2*i: 2*i + 2]
                val = int(s, 16)
                if val < 0 or val > 255:
                    parser.error("Invalid Kc % s\n" % s)
                kc.append(val)
        else:
            parser.error("Invalid Kc format")

    # define options
    parser.add_option("-m", "--mode", dest="chan_mode", default='BCCH',
                      type='choice', choices=channel_modes,
                      help="Channel mode. Valid options are 'BCCH' (Non-combined C0), "
                           "'BCCH_SDCCH4'(Combined C0), 'SDCCH8' (Stand-alone control channel) "
                           "and 'TCHF' (Traffic Channel, Full rate) ")
    parser.add_option("-t", "--timeslot", dest="timeslot", type="intx", default=0,
                      help="Timeslot to decode [default=%default]")
    parser.add_option("-u", "--subslot", dest="subslot", type="intx",
                      help="Subslot to decode. Use in combination with channel type BCCH_SDCCH4 and SDCCH8")
    parser.add_option("-b", "--burst-file", dest="burst_file", help="Input file (bursts)")
    parser.add_option("-c", "--cfile", dest="cfile", help="Input file (cfile)")
    parser.add_option("-v", "--verbose", action="store_true",
                      help="If set, the decoded messages (with frame number and count) are printed to stdout")
    parser.add_option("-p", "--print-bursts", action="store_true",
                      help="If set, the raw bursts (with frame number and count) are printed to stdout")

    # group cfile options together
    cfile_options = OptionGroup(
        parser, 'Cfile Options', 'Options for decoding cfile input.',
    )
    cfile_options.add_option("-f", "--fc", dest="fc", type="eng_float",
                             help="Frequency of cfile capture")
    cfile_options.add_option("-a", "--arfcn", dest="arfcn", type="intx",
                             help="Set ARFCN instead of frequency. "
                                  "In some cases you may have to provide the GSM band also")
    cfile_options.add_option("--band", dest="band",
                             help="Specify the GSM band for the frequency.\nAvailable bands are: "
                                  + ", ".join(grgsm.arfcn.get_bands()) + "."
                                  + "If no band is specified, it will be determined automatically, defaulting to 0.")
    cfile_options.add_option("-s", "--samp-rate", dest="samp_rate", type="eng_float",
                             default=eng_notation.num_to_str(1e6),
                             help="Sample rate of cfile capture [default=%default]")
    cfile_options.add_option("--ppm", dest="ppm", type="float", default=0,
                             help="Set frequency offset correction [default=%default]")

    parser.add_option_group(cfile_options)

    # group decryption options
    decryption_options = OptionGroup(
        parser, 'Decryption Options', 'Options for setting the A5 decryption parameters.',
    )
    decryption_options.add_option("-e", "--a5", dest="a5", type="intx", default=1,
                                  help="A5 version [default=%default]. A5 versions 1 - 3 supported")
    decryption_options.add_option("-k", "--kc", action="callback", callback=kc_callback, type="string",
                                  help="A5 session key Kc. Valid formats are "
                                       "'0x12,0x34,0x56,0x78,0x90,0xAB,0xCD,0xEF' and '1234567890ABCDEF'")
    parser.add_option_group(decryption_options)

    # group TCH options
    tch_options = OptionGroup(
        parser, 'TCH Options', 'Options for setting Traffic channel decoding parameters.',
    )
    tch_options.add_option("-d", "--speech-codec", dest="speech_codec", default='FR',
                           type='choice', choices=tch_codecs.keys(),
                           help="TCH-F speech codec [default=%default]. "
                                "Valid options are " + ", ".join(tch_codecs.keys()))
    tch_options.add_option("-o", "--output-tch", dest="speech_output_file", default="/tmp/speech.au.gsm",
                           help="TCH/F speech output file [default=%default].")
    tch_options.add_option("--voice-boundary", dest="enable_voice_boundary_detection", action="store_true", default=False,
                           help="Enable voice boundary detection for traffic channels. This can help reduce noice in the output.")
    parser.add_option_group(tch_options)

    # parse
    (options, args) = parser.parse_args()

    # some verifications
    if (options.cfile is None and options.burst_file is None) or \
            (options.cfile is not None and options.burst_file is not None):
        parser.error("Please provide a cfile or a burst file (but not both) as input\n")

    if options.timeslot < 0 or options.timeslot > 7:
        parser.error("Invalid timeslot. Must be a in range 0-7\n")

    if options.subslot is not None and (options.subslot < 0 or options.subslot > 7):
        parser.error("Invalid subslot. Must be a in range 0-7\n")

    if options.a5 < 0 or options.a5 > 3:
        parser.error("Invalid A5 version\n")

    if options.cfile and (options.fc is None and options.arfcn is None):
        print("You haven't provided a frequency or an ARFCN - working without automatic frequency offset correction.\n")

    # handle frequency / arfcn input
    arfcn = None
    fc = None
    if options.arfcn:
        if options.band:
            if options.band not in grgsm.arfcn.get_bands():
                parser.error("Invalid GSM band\n")
            elif not grgsm.arfcn.is_valid_arfcn(options.arfcn):
                parser.error("ARFCN is not valid\n")
            else:
                arfcn = options.arfcn
                fc = grgsm.arfcn.arfcn2downlink(arfcn)
        else:
            arfcn = options.arfcn
            grgsm.arfcn.arfcn2downlink(arfcn)
    elif options.fc:
        fc = options.fc
        if options.band:
            if options.band not in grgsm.arfcn.get_bands():
                parser.error("Invalid GSM band\n")
            elif not grgsm.arfcn.is_valid_downlink(options.fc, options.band):
                parser.error("Frequency is not valid in the specified band\n")
            else:
                arfcn = grgsm.arfcn.downlink2arfcn(options.fc, options.band)
        else:
            for band in grgsm.arfcn.get_bands():
                if grgsm.arfcn.is_valid_downlink(options.fc, band):
                    arfcn = grgsm.arfcn.downlink2arfcn(options.fc, band)
                    break

    # instanciate decoder
    tb = grgsm_decoder(timeslot=options.timeslot, subslot=options.subslot, chan_mode=options.chan_mode,
                          burst_file=options.burst_file,
                          cfile=options.cfile, fc=fc, samp_rate=options.samp_rate,
                          a5=options.a5, a5_kc=kc,
                          speech_file=options.speech_output_file, speech_codec=tch_codecs.get(options.speech_codec),
                          enable_voice_boundary_detection=options.enable_voice_boundary_detection,
                          verbose=options.verbose,
                          print_bursts=options.print_bursts, ppm=options.ppm)

    # run
    tb.start()
    tb.wait()