From 7b75b218c03b00c14a1d3cf2116052c675d0fb93 Mon Sep 17 00:00:00 2001 From: Neels Hofmeyr Date: Tue, 3 Dec 2019 15:38:31 +0100 Subject: add utils/osmo-gsm-shark Change-Id: Ifdfd261f776d9bf2bbfb0a584deac3e9a11bfe47 --- debian/libosmocore-utils.install | 1 + utils/Makefile.am | 1 + utils/osmo-gsm-shark | 2174 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 2176 insertions(+) create mode 100755 utils/osmo-gsm-shark diff --git a/debian/libosmocore-utils.install b/debian/libosmocore-utils.install index d23cc73a..83f4677f 100644 --- a/debian/libosmocore-utils.install +++ b/debian/libosmocore-utils.install @@ -1,3 +1,4 @@ usr/bin/osmo-arfcn usr/bin/osmo-auc-gen usr/bin/osmo-config-merge +usr/bin/osmo-gsm-shark diff --git a/utils/Makefile.am b/utils/Makefile.am index 653b7190..0533aab2 100644 --- a/utils/Makefile.am +++ b/utils/Makefile.am @@ -6,6 +6,7 @@ LDADD = $(top_builddir)/src/libosmocore.la $(top_builddir)/src/gsm/libosmogsm.la EXTRA_DIST = conv_gen.py conv_codes_gsm.py bin_PROGRAMS = osmo-arfcn osmo-auc-gen osmo-config-merge +bin_SCRIPTS = osmo-gsm-shark osmo_arfcn_SOURCES = osmo-arfcn.c diff --git a/utils/osmo-gsm-shark b/utils/osmo-gsm-shark new file mode 100755 index 00000000..55d59d9c --- /dev/null +++ b/utils/osmo-gsm-shark @@ -0,0 +1,2174 @@ +#!/usr/bin/env python3 + +# osmo-gsm-shark: produce a ladder diagram from and/or filter a GSM network pcap by subscriber. +# Copyright (C) 2019 by Neels Hofmeyr +# +# All Rights Reserved +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with this program; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. + +'''osmo-gsm-shark: produce a ladder diagram from and/or filter a GSM network pcap by subscriber. + +Copyright (C) 2019 by Neels Hofmeyr +SPDX-License-Identifier: GPL-2.0+ + +This tool uses tshark (pyshark) to analyze a pcap file or a live network capture to: + +- Associate almost all messages with a subscriber. It is possible to filter by subscriber. +- Separate the different network elements (BSC, MSC, hNodeB, ...). +- Output a ladder diagram. +- Combine repetitive messages. +- Combine/abstract messages into short activity summary. + +Examples: + + osmo-gsm-shark -f trace.pcapng + osmo-gsm-shark -l any + + osmo-gsm-shark -l any --filter-imsi 901701234567123 +''' + +import collections +import pyshark +import re +import sys +import types +import time + +SHOW_ALL_LAYERS = False +SCCP_COLLAPSE_STP = True +IUH_COLLAPSE_HNBGW = True + +re_msgtype_label = re.compile('^[01 =.-]*message.type *:* *', re.I) +re_msgtype_brace = re.compile(' *\([^)]*\) *') + +class Color: + codes = ( + ('red', '\033[1;31m'), + ('green', '\033[1;32m'), + ('yellow', '\033[1;33m'), + ('blue', '\033[1;34m'), + ('purple', '\033[1;35m'), + ('cyan', '\033[1;36m'), + ('darkred', '\033[31m'), + ('darkgreen', '\033[32m'), + ('darkyellow', '\033[33m'), + ('darkblue', '\033[34m'), + ('darkpurple', '\033[35m'), + ('darkcyan', '\033[36m'), + ('darkgrey', '\033[1;30m'), + ('grey', '\033[37m'), + ('brightwhite', '\033[1;37m'), + ) + codes_dict = dict(codes) + end = '\033[0;m' + + def colored(code, text): + if type(code) is int: + code = Color.codes[code % len(Color.codes)][1] + else: + code = Color.codes_dict[code] + return f'{code}{text}{Color.end}' + + +def _self_init(s, **kwargs): + for k,v in kwargs.items(): + setattr(s, k, v) + +def set_instance_vars_from_args(**ignore): + f = sys._getframe(1) + _self_init(**f.f_locals) + +def same_nonempty(a, b): + if isinstance(a, types.GeneratorType): + return list(a) == list(b) + return a and a == b + +def str_drop(a_str, drop_str): + if a_str and a_str.startswith(drop_str): + return a_str[len(drop_str):] + return a_str + +def sane_msgtype(msgtype): + if not msgtype: + return msgtype + msgtype = re_msgtype_label.sub('', msgtype) + msgtype = re_msgtype_brace.sub('', msgtype) + return msgtype.strip().replace(' ','-') + +def dir_vals(elem): + strs = [] + for name in dir(elem): + if name.startswith('_'): + continue + strs.append('%r=%r' % (name, getattr(elem, name))) + return '\n' + '\n'.join(strs) + +def out_text(*args, **kwargs): + print(*args, **kwargs) + + +g_current_msg = None + +def out_error(*args, **kwargs): + out_text(Color.colored('red', '*** ERROR:'), *args, **kwargs) + if g_current_msg: + out_text(Color.colored('red', '*** ERROR: while processing msg'), g_current_msg.str(traits=True, conversations=True)) + +# a dict containing a list for each key; l.add(name, item) adds item to the list at key=name. +class listdict(dict): + def _have(ld, name): + l = ld.get(name) + if not l: + l = [] + ld[name] = l + return l + + def add(ld, name, item): + l = ld._have(name) + l.append(item) + return ld + + def add_dict(ld, d): + for k,v in d.items(): + ld.add(k, v) + + def update(ld, other_ld): + for name, items in other_ld.items(): + ld.extend(name, items) + return ld + + def extend(ld, name, vals): + l = ld._have(name) + l.extend(vals) + return ld + +class Packet: + def __init__(s, idx, cap_p): + set_instance_vars_from_args() + # sanitize impossible attr with dot in its name, + # seen gsm_a.bssmap and gsm_a.dtap + for name in dir(s.cap_p): + if '.' in name: + new_name = name.replace('.', '_') + elif not name: + new_name = 'unnamed' + else: + continue + setattr(s.cap_p, new_name, getattr(s.cap_p, name)) + + @classmethod + def pget(cls, cap_p, tokens, ifnone=None): + if cap_p is None or len(tokens) < 1: + return ifnone + p_field = getattr(cap_p, tokens[0], None) + if p_field is None: + return ifnone + if len(tokens) > 1: + return Packet.pget(p_field, tokens[1:], ifnone=ifnone) + return p_field + + def get(s, field, ifnone=None): + return Packet.pget(s.cap_p, field.split('.'), ifnone=ifnone) + + def str(s, elem_name=None): + strs = [] + if elem_name: + elem = s.get(elem_name) + else: + elem = s.cap_p + return dir_vals(elem); + + def field_names(s, elem_name=None, elem=None): + strs = ['', f'=== {elem_name} ==='] + if elem is None: + elem = s.get(elem_name) + if not elem: + strs.append('None') + else: + for f in elem._get_all_fields_with_alternates(): + for n in dir(f): + if n.startswith('_'): + continue + strs.append('%r=%r' % (n, getattr(f, n))) + return '\n'.join(strs) + + def all_str(s, elem_name=None, elem=None, depth=1000): + strs = [] + if elem is None: + if elem_name: + elem = s.get(elem_name) + else: + elem = s.cap_p + elem_name = '/' + strs.append('%s:' % elem_name) + for name in dir(elem): + if name.startswith('_') or name.endswith('_value') or name in [ + 'sort', 'reverse', 'remove', 'pop', 'insert', 'index', 'extend', 'count', + 'copy', 'clear', 'append', 'zfill', 'max', 'min', 'resolution', + ]: + continue + try: + full_name = '%s.%s' % (elem_name, name) + val = getattr(elem, name) + if callable(val) or name in ['base16_value']: + continue + strs.append('%r=%r' % (full_name, val)) + if depth and type(val) not in [int, str]: + strs.append(s.all_str(full_name, val, depth-1)) + except: + pass + if hasattr(elem, '_get_all_fields_with_alternates'): + for f in elem._get_all_fields_with_alternates(): + for n in dir(f): + if n.startswith('_'): + continue + full_name = '%r[%r]' % (elem_name, n) + val = getattr(f, n) + if callable(val): + continue + strs.append('%s=%r' % (full_name, val)) + if depth: + strs.append(s.all_str(full_name, val, depth-1)) + return '\n'.join(strs) + +class IpPort: + def __init__(s, ip:str=None, port:str=None): + set_instance_vars_from_args() + + def __repr__(s): + return '%s:%s' % (s.ip, s.port) + + def __str__(s): + return repr(s) + + def __eq__(s, other): + return str(s) == str(other) + + @classmethod + def from_sdp(p:Packet): + ip = p.get('sdp.connection_info_address') + port = p.get('sdp.media_port') + return IpPort(ip, port) + + @classmethod + def from_udp_source(cls, p:Packet): + ip = p.get('ip.src') + port = p.get('udp.srcport') + return IpPort(ip, port) + + @classmethod + def from_udp_dest(cls, p:Packet): + ip = p.get('ip.dst') + port = p.get('udp.dstport') + return IpPort(ip, port) + + @classmethod + def from_tcp_source(cls, p:Packet): + ip = p.get('ip.src') + port = p.get('tcp.srcport') + return IpPort(ip, port) + + @classmethod + def from_tcp_dest(cls, p:Packet): + ip = p.get('ip.dst') + port = p.get('tcp.dstport') + return IpPort(ip, port) + + @classmethod + def from_sctp_source(cls, p:Packet): + ip = p.get('ip.src') + port = p.get('sctp.srcport') + return IpPort(ip, port) + + @classmethod + def from_sctp_dest(cls, p:Packet): + ip = p.get('ip.dst') + port = p.get('sctp.dstport') + return IpPort(ip, port) + +class Message: + pass + +class Trait: + def __init__(s, **kwargs): + if len(kwargs) > 1: + raise Exception('only one trait allowed per Trait(): %r' % kwargs) + for k, v in kwargs.items(): + s.name = k + s.val = v + + def __repr__(s): + return '%r=%r' % (s.name, s.val) + +class Traits(collections.OrderedDict): + def __init__(s, *args, **kwargs): + for arg in args: + s.add(arg) + s.set_vals(**kwargs) + + def add(s, trait:Trait): + s[trait.name] = trait + + def set(s, name, val): + if val is not None: + s.add(Trait(**{name: val})) + + def set_vals(s, **kwargs): + for k,v in kwargs.items(): + s.set(k, v) + + def __repr__(s): + strs = [] + for k,trait in s.items(): + assert k == trait.name + strs.append(repr(trait)) + return '{%s}' % ', '.join(strs) + +def find_same_trait(m:Message, messages:list, my_idx:int, proto:str, name:str, max_t=1, operator=any): + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if prev_msg.finalized: + return None + if m.timestamp - prev_msg.timestamp > max_t: + return None + if m.same_traits(prev_msg, proto, name, operator=operator): + yield prev_msg + return None + +class Layer: + handler_classes = {} + traits = None + + def __init__(s, m:Message, proto:str, msgtype:str, traits:Traits, minor=False, hidden=False): + set_instance_vars_from_args() + if proto in m.layers: + raise Exception(f'duplicate proto {proto} for message') + m.layers[proto] = s + s.msgtype = sane_msgtype(s.msgtype) + + def label(s): + if s.msgtype: + return f'{s.proto}.{s.msgtype}' + else: + return s.proto + + def identify_entities(s, m:Message, messages, my_idx): + '''return a list of Message.EntityIdent instances describing source and/or destination entity that message m identifies.''' + return None + + def collapse(s, messages, my_idx): + '''return the message itself if it remains in messages, if another absorbed it return the other message, + or if if it is dropped completely return None''' + return messages[my_idx] + + def associate(s, messages, my_idx): + '''return the message itself if it remains in messages, if another absorbed it return the other message, + or if if it is dropped completely return None''' + return messages[my_idx] + + @classmethod + def parse(cls, m:Message): + if not Layer.handler_classes: + for cls in Layer.__subclasses__(): + name = cls.__name__ + if not name.startswith('Layer_'): + continue + proto_name = name[len('Layer_'):] + Layer.handler_classes[proto_name] = cls + + for proto_name,child_class in Layer.handler_classes.items(): + if not m.p.get(proto_name): + continue + child_class(m) + + def __getattr__(s, name): + if name.startswith('_') or name in dir(s): + return s.__getattribute__(name) + traits = s.__getattribute__('traits') + trait = traits.get(name, None) + if trait is None: + raise Exception('No such trait: %s.%r' % (s.proto, name)) + return trait.val + + +class Message: + + def __init__(s, p:Packet, finalized=False): + set_instance_vars_from_args() + s.layers = collections.OrderedDict() + s.conversation = None + s.count = 1 + s.count_back = 0 + s.timestamp = float(p.cap_p.sniff_timestamp) + s.hide = False + s.src_entity = None + s.dst_entity = None + s.absorbed = [] + s.request = None + s.response = None + + def is_minor(s): + return all(l.minor for l in s.layers.values()) + + def is_response(s): + return s.response is s + + def is_request(s): + return s.request is s + + def get_trait(s, proto:str, name:str, ifnone=None): + # allow alternative lists for proto, like s.get_trait(('tcp', 'udp'), 'src') + if type(proto) is not str: + for proto_ in proto: + val = s.get_trait(proto_, name, None) + if val is not None: + return val + return ifnone + # allow alternative lists for name, like s.get_trait('tcp', ('src', 'dst)) + if type(name) is not str: + for name_ in name: + val = s.get_trait(proto, name_, None) + if val is not None: + return val + return ifnone + + layer = s.layers.get(proto, None) + if not layer: + return ifnone + if name == 'msgtype': + return layer.msgtype + trait = layer.traits.get(name, None) + if trait is None: + return ifnone + if trait.val is None: + return ifnone + return trait.val + + def get_traits(s, proto=None, names=None, proto_and_names=None): + pn = [] + if proto or names: + pn = [(proto, names)] + if proto_and_names: + pn.extend(proto_and_names) + for proto, names in pn: + if names is None: + l = s.layers.get(proto, None) + if not l: + continue + names = l.traits.keys() + if type(names) is str: + names = [names] + for name in names: + result = s.get_trait(proto, name, ifnone=None) + if result is not None: + yield (proto, name, result) + + def get_all_traits(s, proto:str): + layer = s.layers.get(proto) + if not layer: + return {} + return layer.traits + + def same_traits(s, other_msg, proto:str, name:str, allow_unset=False, operator=all): + if type(proto) is not str: + return operator( + s.same_traits(other_msg, proto_, name, allow_unset=allow_unset) + for proto_ in proto + ) + + if name is None: + my_traits = s.get_all_traits(proto) + other_traits = other_msg.get_all_traits(proto) + names = set(my_traits.keys()) + names.update(other_traits.keys()) + name = list(names) + + if type(name) is not str: + return operator( + s.same_traits(other_msg, proto, name_, allow_unset=allow_unset) + for name_ in name + ) + + val = s.get_trait(proto, name) + other_val = other_msg.get_trait(proto, name) + if not allow_unset: + if val is None or other_val is None: + return False + return val == other_val + + def set_trait(s, proto, name, val): + layer = s.layers.get(proto, None) + if layer is None: + layer = Layer(s, proto, None, Traits(Trait(name, val))) + else: + layer.traits.set(name, val) + + def collapse(s, messages, my_idx): + '''iterate backwards over recent messages and see if messages can be combined''' + orig_msg = messages[my_idx] + for layer in s.layers.values(): + msg = layer.collapse(messages, my_idx) + if orig_msg is not msg: + break + return msg + + def associate(s, messages, my_idx): + '''iterate backwards over recent messages and figure out which belong to the same conversation.''' + orig_msg = messages[my_idx] + for layer in s.layers.values(): + msg = layer.associate(messages, my_idx) + if orig_msg is not msg: + break + return msg + + class EntityIdent: + def __init__(s, proto=None, src_kind=None, src_entity=None, dst_kind=None, dst_entity=None): + set_instance_vars_from_args() + + def identify_entities(s, messages, my_idx): + '''From protocol and message discriminators, see if we can identify the src and dst port of the message + to be of a specific core network entity.''' + for layer in s.layers.values(): + identifieds = layer.identify_entities(s, messages, my_idx) + if identifieds is None: + continue + if isinstance(identifieds, Message.EntityIdent): + identifieds = [identifieds] + + for ident in identifieds: + s.src_entity = Entity.find_or_create(ident.proto, ident.src_kind, s.src(), + ident.src_entity, from_msg=s) or s.src_entity + s.dst_entity = Entity.find_or_create(ident.proto, ident.dst_kind, s.dst(), + ident.dst_entity, from_msg=s) or s.dst_entity + + + + def src(s): + return s.get_trait(('tcp','udp','sctp'), 'src') + + def dst(s): + return s.get_trait(('tcp','udp','sctp'), 'dst') + + def entity(s, kind): + if s.src_entity and s.src_entity.kind == kind: + return m.src_entity + if s.dst_entity and s.dst_entity.kind == kind: + return m.dst_entity + + def src_entity_is(s, kind): + return s.src_entity and s.src_entity.kind == kind + + def dst_entity_is(s, kind): + return s.dst_entity and s.dst_entity.kind == kind + + def entity_port(s, kind): + if s.src_entity and s.src_entity.kind == kind: + return s.src() + if s.dst_entity and s.dst_entity.kind == kind: + return s.dst() + + def same_src_dst(s, other, forward=None, reverse=None): + # assume forward and reverse if neither are set. + # if one of them is set to True, assume the other as False. + if forward is None and reverse is None: + forward = True + reverse = True + a = (str(s.src()), str(s.dst())) + b = (str(other.src()), str(other.dst())) + if forward and reverse: + return sorted(a) == sorted(b) + elif forward: + return a == b + elif reverse: + return a == tuple(reversed(b)) + else: + return False + + def conv_layer(s, label): + if not s.conversation: + return None + return s.conversation.layers.get(label) + + @classmethod + def parse(cls, p:Packet): + m = Message(p) + Layer.parse(m) + return m + + def label(s): + label = [] + for l in s.layers.values(): + if not SHOW_ALL_LAYERS: + if l.minor: + continue + if l.hidden and not all((ll.minor or ll.hidden) for ll in s.layers.values()): + continue + label.insert(0, l.label()) + return '/'.join(label) + + def subscriber(s): + if not s.conversation: + return None + return s.conversation.subscriber + + def __repr__(s): + return s.__str__() + + def __str__(s): + return s.str() + + def str(s, ladder=False, one_column_per_kind=False, traits=False, conversations=False): + name = s.label() + subscr = s.subscriber() + if subscr: + name = subscr.label() + ' ' + name + src = str(s.src()) + dst = str(s.dst()) + + if s.src_entity is not None: + src_str = s.src_entity.label() + else: + src_str = src + + if dst == src: + dst_str = '(self)' + elif s.dst_entity is not None: + dst_str = s.dst_entity.label() + else: + dst_str = dst + + src_pos = 0 + dst_pos = 0 + if s.src_entity: + src_pos = s.src_entity.textcolumn(one_column_per_kind) + if s.dst_entity: + dst_pos = s.dst_entity.textcolumn(one_column_per_kind) + + if not ladder: + if src > dst: + src_pos = 1 + dst_pos = 0 + else: + src_pos = 0 + dst_pos = 1 + + if src_pos <= dst_pos: + left_pos = src_pos + right_pos = dst_pos + left_label = src_str + right_label = dst_str + to_left_count = s.count_back + to_right_count = s.count + else: + left_pos = dst_pos + right_pos = src_pos + left_label = dst_str + right_label = src_str + to_left_count = s.count + to_right_count = s.count_back + + left_strs = [] + left_strs.append(left_label) + if to_left_count: + left_strs.append('<') + if to_left_count > 1: + left_strs.append(f'{to_left_count}') + + right_strs = [] + if to_right_count: + if to_right_count > 1: + right_strs.append(f'{to_right_count}') + right_strs.append('>') + right_strs.append(right_label) + + real_left_pos = max(0, left_pos - (len(left_label)/2)) + real_right_pos = right_pos + len(right_label) - (len(right_label)/2) + + left_str = ''.join(left_strs) + right_str = ''.join(right_strs) + + mid_gap = real_right_pos - real_left_pos - len(right_str) - len(left_str) + mid_gap = max(1, mid_gap) + + if not ladder: + mid_name_margin = 6 + else: + mid_name_margin = mid_gap - len(name) + if mid_name_margin > 5: + mid_gap_strs = ['-' * int(mid_name_margin / 2), + name, + '-' * int(mid_name_margin - (mid_name_margin/2))] + name = '' + else: + mid_gap_strs = ['-' * int(mid_gap)] + + strs = [' ' * int(real_left_pos), + left_str,] + strs.extend(mid_gap_strs) + strs.append(right_str) + + if ladder: + strs = [''.join(strs)] + right_end = len(strs[0]) + label_pos = Entity.textcolumn_for_label + diff = label_pos - right_end + if diff > 0: + strs.append(' ' * int(diff)) + + name_items = [] + if name: + name_items.append(name) + + if traits: + if isinstance(traits, str): + traits = [traits] + for proto,l in s.layers.items(): + if not l.traits: + continue + if (traits is not True) and (proto not in traits): + continue + name_items.append('%s%s' % (proto, l.traits)) + + if conversations and s.conversation: + name_items.append(s.conversation.str(conversations)) + + idxs = [s.p.idx] + [a.p.idx for a in s.absorbed] + if len(idxs) <= 3: + name_items.append('+'.join(str(i) for i in sorted(idxs))) + else: + name_items.append(f'{min(idxs)}-{max(idxs)}') + strs.append(' ') + strs.append(' '.join(name_items)) + return ''.join(strs) + + + def absorb_msg(s, other_msg): + if other_msg and other_msg is not s: + s.absorbed.append(other_msg) + + if s.conversation: + s.conversation = s.conversation.absorb_conversation(other_msg.conversation) + for conv_layer in s.conversation.layers.values(): + conv_layer.refresh() + +class Conversation: + next_id = 0 + + def __init__(s, conv_layer=None): + s.subscriber = None + s.messages = [] + s.layers = {} + s.id = Conversation.next_id + Conversation.next_id += 1 + if conv_layer: + s.layers[conv_layer.label] = conv_layer + conv_layer.conversation = s + + def set_subscriber(s, subscriber): + if s.subscriber is subscriber: + return + was = s.subscriber + if not subscriber: + s.subscriber = None + elif s.subscriber: + s.subscriber = s.subscriber.absorb_subscriber(subscriber) + else: + s.subscriber = subscriber + if was is not s.subscriber: + if was: + was.rm_conversation(s) + if s.subscriber: + s.subscriber.add_conversation(s) + + def absorb_conversation(s, other): + if not other or other is s: + return s + if s.id > other.id: + return other.absorb_conversation(s) + + if other.subscriber is not None: + s.set_subscriber(other.subscriber.absorb_subscriber(s.subscriber)) + other.set_subscriber(None) + + for m in other.messages: + m.conversation = s + if m not in s.messages: + s.messages.append(m) + m.conversation = s + + for l in other.layers.values(): + l = l.absorb_conv_layer(s.layers.get(l.label)) + l.conversation = s + s.layers[l.label] = l + other.layers = {} + + for l in s.layers.values(): + l.refresh() + return s + + def rm_conv_layer(s, conv_layer): + my_l = s.layers.get(conv_layer.label) + if my_l is conv_layer: + s.layers.pop(conv_layer.label) + conv_layer.conversation = None + + def discard_if_empty(s): + if s.layers: + return + for m in s.messages: + m.conversation = None + del s + + def add_message(s, msg): + if msg is None: + return + msg.conversation = s + if msg not in s.messages: + s.messages.append(msg) + + def str(s, layers:list=True): + strs = [f'c{s.id}'] + if layers is not False: + strs.append('[') + items = [] + for conv_layer in s.layers.values(): + if (layers is not True) and (conv_layer.label not in layers): + continue + items.append(f'{conv_layer}') + strs.append(','.join(items)) + strs.append(']') + return ''.join(strs) + + def __repr__(s): + return s.str() + def __str__(s): + return repr(s) + + +class ConversationLayer: + label = 'unnamed_conversation_layer' + next_id = {} + + def __init__(s): + s._match_strs = None + s.id = ConversationLayer.get_next_id(s.label) + s.conversation = None + + @classmethod + def get_next_id(cls, label): + n = ConversationLayer.next_id.get(label, 0) + ConversationLayer.next_id[label] = n + 1 + return n + + def ensure_conversation(s): + if not s.conversation: + Conversation(s) + assert s.conversation + return s.conversation + + def refresh(s): + s._match_strs = None + + def absorb_conv_layer(s, other): + if other is None: + return s + + # the default way of ConversationLayer doesn't need to take along anything. + # The match_strs will simply be regenerated from available messages. + # Stateful ConversationLayer impls could overload this to adopt state. + + # If another conversation is adding messages with the same layer, + # make sure to update the match strings. + keep = s + clean = other + if other.id < s.id: + keep = other + clean = s + keep.refresh() + clean.conversation = None + return keep + + def __repr__(s): + return f'{s.label}{s.id}{s.match_strs()}'# + ( + #f'@{s.conversation.str(layers=False)}' if s.conversation else '@no-conversation') + def __str__(s): + return Color.colored(s.id, repr(s)) + + def matches_msg(s, msg:Message): + if not s.messages(): + return True + if not msg: + return False + msa = s.match_strs() + msb = s.msg_match_strs(msg) + if type(msa) is str: + msa = [msa] + if type(msb) is str: + msb = [msb] + r = any(msa_ in msb for msa_ in msa) + return r + + def matches_conv_layer(s, other_conv_layer): + if not s.messages(): + return True + if not other_conv_layer: + return False + if not other_conv_layer.messages(): + return True + match_strs = s.match_strs() + other_match_strs = other_conv_layer.match_strs() + r = any(match_str in other_match_strs for match_str in match_strs) + return r + + def merge_conversation_with_msg(s, other_msg): + if other_msg.conversation: + result = other_msg.conversation.absorb_conversation(s.conversation) + conv_layer = result.layers.get(s.label) + if conv_layer is None: + result.layers[s.label] = s + s.conversation = result + conv_layer = s + return conv_layer + else: + s.ensure_conversation().add_message(other_msg) + s.refresh() + return s + + @classmethod + def impl_associate(cls, conv_layer, messages, my_idx): + msg = messages[my_idx] + if conv_layer.association_start(msg): + return conv_layer, msg + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if prev_msg is None: + continue + if prev_msg.finalized: + break + prev_c = prev_msg.conv_layer(conv_layer.label) + if not conv_layer.matches_msg(prev_msg) and not conv_layer.matches_conv_layer(prev_c): + continue + conv_layer = conv_layer.merge_conversation_with_msg(prev_msg) + if conv_layer.association_start(prev_msg): + break + + return conv_layer, msg + + @classmethod + def associate(cls, messages, my_idx): + msg = messages[my_idx] + if msg is None: + return msg + conv_layer = cls().merge_conversation_with_msg(msg) + + conv_layer, msg = cls.impl_associate(conv_layer, messages, my_idx) + c = conv_layer.conversation + conv_layer.discard_if_empty() + return msg + + def discard_if_empty(s): + if s.conversation is None: + return + if s.match_strs(): + return + c = s.conversation + c.rm_conv_layer(s) + c.discard_if_empty() + + def association_start(s, m:Message): + '''If the associations of this ConversationLayer are limited to a period of time, for example if a + channel is assigned only from the time of the channel activation, and the channel belongs to a different + conversation before that: return True as soon as the given message is such a start condition.''' + return False + + def impl_msg_match_strs(s, m:Message): + return None + + def msg_match_strs(s, m:Message): + imms = s.impl_msg_match_strs(m) + if type(imms) is str: + imms = [imms] + elif imms is None: + imms = [] + return list(sorted(str(match_str) for match_str in imms if match_str)) + + def regenerate_match_strs(s): + messages = s.messages() + if not messages: + return [] + match_tokens = set() + for m in messages: + tokens = s.msg_match_strs(m) + if not tokens: + continue + match_tokens.update(tokens) + return list(sorted(t for t in match_tokens if t)) + + def match_strs(s): + if s._match_strs is None: + s._match_strs = s.regenerate_match_strs() + return s._match_strs + + def messages(s): + if not s.conversation: + return [] + return s.conversation.messages + + +class Entity: + '''A core network program like BSC, MSC, ...''' + KINDS_SORTING = ('MS', 'BTS', 'hNodeB', 'BSC', 'MGW@BSC', 'HNBGW', 'STP', 'MSC', 'MGW@MSC', 'MGW', 'SGSN', 'HLR', 'SIPcon', 'PBX', 'GGSN') + KINDS_SORTING_EXIST = () + entities = listdict() + state_version = 1 # whether to update cached text columns + spacing = 5 + label_spacing = 2 + textcolumn_for_label = 0 + + # proxy / forwarding addresses to ignore, like the STP port + blacklist = [] + + def __init__(s, kind:str, conversations:list=[]): + set_instance_vars_from_args() + s.idx = None + s.state_version = 0 + s._textcolumn = None + s._kind_textcolumn = None + s.ports = listdict() + Entity.add(s) + + @classmethod + def add(cls, entity): + Entity.entities.add(entity.kind, entity) + entity.idx = entity.idx_in_kind() + if entity.kind not in Entity.KINDS_SORTING_EXIST: + # a new kind has come up, refresh Entity.KINDS_SORTING_EXIST + exist = [] + for k in Entity.KINDS_SORTING: + if k in Entity.entities.keys(): + exist.append(k) + for k in Entity.entities.keys(): + if k not in exist: + exist.append(k) + Entity.KINDS_SORTING_EXIST = tuple(exist) + + Entity.textcolumn_for_label = Entity.calculate_textcolumn(None) + Entity.state_version += 1 + + @classmethod + def count_entities(cls, kind): + l = Entity.entities.get(kind) + return len(l) + + @classmethod + def calculate_kind_textcolumn(cls, kind): + '''In text rendering of a ladder diagram, return the text column for this entity kind, + if rendering all entities of the same kind in one ladder diagram column.''' + col = 0 + for k in Entity.KINDS_SORTING_EXIST: + if k == kind: + break + col += len(k) + Entity.spacing + col += len(kind) / 2 + return int(col) + + @classmethod + def add_to_blacklist(cls, port:IpPort): + if port in cls.blacklist: + return + cls.blacklist.append(port); + + @classmethod + def set_known(cls, m:Message): + for entities in Entity.entities.values(): + for e in entities: + if e.has_port(m.src()): + m.src_entity = e + if e.has_port(m.dst()): + m.dst_entity = e + + @classmethod + def find_or_create(cls, proto, kind, port, matched_entity=None, from_msg=None): + if port in Entity.blacklist: + return None + if matched_entity: + matched_entity.add_port(proto, port, from_msg=from_msg) + return matched_entity + if kind is None: + for lkind, l in Entity.entities.items(): + for e in l: + if e.has_port(port): + return e + return None + else: + l = Entity.entities.get(kind) + if l: + for e in l: + if e.has_port(port): + return e + e = Entity(kind) + e.add_port(proto, port, from_msg=from_msg) + return e + + def label(s): + idx = '' + if s.idx: + idx = str(s.idx + 1) + return f'{s.kind}{idx}' + + def __repr__(s): + return s.label() + def __str__(s): + return repr(s) + + def kind_idx(s): + '''this entity kind's position in the currently known entity kinds: + For 'BSC', if we've seen BTS, BSC and MSC, return 1.''' + return Entity.KINDS_SORTING_EXIST.index(s.kind) + + def idx_in_all(s): + '''this entity kind's position in all currently known entities: + For the second 'BSC', if we've seen 2 BTS, 3 BSC and 1 MSC, return 2 (BTS) + 1 (second BSC) = 3.''' + idx = 0 + for k in Entity.KINDS_SORTING_EXIST: + if k == s.kind: + idx += Entity.entities.get(s.kind).index(s) + return idx + idx += Entity.count_entities(k) + return idx + + def idx_in_kind(s): + '''this entity kind's position in the list of entities of the same kind''' + return Entity.entities.get(s.kind).index(s) + + def check_update_state(s): + if s.state_version == Entity.state_version: + return + s._kind_textcolumn = Entity.calculate_kind_textcolumn(s.kind) + s._textcolumn = Entity.calculate_textcolumn(s) + s.state_version = Entity.state_version + + def textcolumn(s, one_column_per_kind=False, mid=True): + s.check_update_state() + if one_column_per_kind: + midcol = s._kind_textcolumn + else: + midcol = s._textcolumn + if mid: + return midcol + return int(midcol - (len(s.label()) / 2)) + + @classmethod + def calculate_textcolumn(cls, s): + '''In text rendering of a ladder diagram, return the text column for this entity, + if rendering each entity in its own column (not sharing one column per entity kind)''' + col = 0 + for k in Entity.KINDS_SORTING_EXIST: + l = Entity.entities.get(k) + if s is not None and k == s.kind: + for e in l: + if e is s: + col += len(s.kind) / 2 + return int(col) + col += len(e.label()) + Entity.spacing + for e in l: + col += len(e.label()) + Entity.spacing + if s: + raise Exception('entity doesnt exist') + return int(col - Entity.spacing + Entity.label_spacing) + + def has_port(s, port, proto=None): + if proto: + if port in s.ports.get(proto, []): + return proto + return None + for proto,l in s.ports.items(): + if port in l: + return proto + return None + + def add_port(s, proto, port, from_msg=None): + if s.has_port(port, proto=proto): + return + s.ports.add(proto, port) + out_text(' ' * s.textcolumn(mid=False) + 'New:', s.label(), proto, port) + + +class Subscriber: + next_ident = 1 + imsis = {} + tmsis = {} + + def __init__(s, imsi:str=None, tmsis=[], msisdn=None): + s.conversations = [] + set_instance_vars_from_args() + s.ident = Subscriber.next_ident + Subscriber.next_ident += 1 + s.imsi = None + s.set_imsi(imsi) + s.tmsis = [] + s.add_tmsis(*tmsis) + s.conversations = [] + + def label(s): + if s.imsi: + l = [s.imsi] + else: + l = ['subscr', str(s.ident)] + if s.tmsis: + l.append(':') + l.append(s.tmsis[-1]) + if s.msisdn: + l.append(':') + l.append(s.msisdn) + return Color.colored(s.ident, ''.join(l)) + + def add_conversation(s, c): + if c not in s.conversations: + s.conversations.append(c) + + def rm_conversation(s, c): + if c in s.conversations: + s.conversations.remove(c) + + def __repr__(s): + return s.label() + def __str__(s): + return repr(s) + + def set_imsi(s, imsi): + if not imsi or s.imsi == imsi: + return + if imsi in Subscriber.imsis: + out_error('duplicate Subscriber for IMSI', imsi) + raise Exception('duplicate Subscriber for IMSI', imsi) + s.imsi = imsi + Subscriber.imsis[imsi] = s + + def add_tmsis(s, *tmsis): + if not tmsis: + return + s.tmsis.extend(tmsis) + for tmsi in tmsis: + Subscriber.tmsis[tmsi] = s + + @classmethod + def identify(cls, msg:Message, messages:list, my_idx:int): + subscr = msg.subscriber() + + imsi = msg.get_trait('dtap', 'imsi') + tmsi = msg.get_trait('dtap', 'tmsi') + + if imsi: + imsi_subscr = Subscriber.by_imsi(imsi) + if subscr: + subscr = imsi_subscr.absorb_subscriber(subscr) + else: + subscr = imsi_subscr + if tmsi: + tmsi_subscr = Subscriber.by_tmsi(tmsi) + if subscr: + subscr = tmsi_subscr.absorb_subscriber(subscr) + else: + subscr = tmsi_subscr + + if subscr: + subscr.add_message(msg) + + @classmethod + def by_imsi(cls, imsi): + subscr = Subscriber.imsis.get(imsi) + if not subscr: + subscr = Subscriber(imsi=imsi) + return subscr + + @classmethod + def by_tmsi(cls, tmsi): + subscr = Subscriber.tmsis.get(tmsi) + if not subscr: + subscr = Subscriber(tmsis=[tmsi]) + return subscr + + def absorb_subscriber(s, other): + '''It is important to use the return value! + s.subscriber = s.subscriber.absorb_subscriber(other_subscriber) + Because this freely decides which Subscriber instance should survive.''' + if other is None or s is other: + return s + if not s.imsi and s.ident > other.ident: + return other.absorb_subscriber(s) + + if s.imsi and other.imsi and s.imsi != other.imsi: + out_error(f'subscriber changes IMSI: {other.imsi} -> {s.imsi}') + raise Exception(f'subscriber changes IMSI: {other.imsi} -> {s.imsi}') + s.set_imsi(other.imsi or s.imsi) + s.add_tmsis(*other.tmsis) + + for c in other.conversations: + s.add_conversation(c) + return s + + def add_conversation(s, conversation): + if not conversation: + return + s.conversations.append(conversation) + conversation.subscriber = s + + def add_message(s, message): + if not message: + return + if not message.conversation: + message.conversation = Conversation() + s.add_conversation(message.conversation) + +class Layer_tcp(Layer): + def __init__(s, m:Message): + p = m.p + traits = Traits( + src=IpPort.from_tcp_source(p), + dst=IpPort.from_tcp_dest(p), + ) + super().__init__(m=m, proto='tcp', msgtype=None, traits=traits, minor=True) + +class Layer_udp(Layer): + def __init__(s, m:Message): + p = m.p + traits = Traits( + src=IpPort.from_udp_source(p), + dst=IpPort.from_udp_dest(p), + ) + super().__init__(m=m, proto='udp', msgtype=None, traits=traits, minor=True) + +class Layer_sctp(Layer): + def __init__(s, m:Message): + p = m.p + traits = Traits( + src=IpPort.from_sctp_source(p), + dst=IpPort.from_sctp_dest(p), + stream_id = p.get('sctp.data_sid'), + stream_seq = p.get('sctp.data_ssn'), + ) + super().__init__(m=m, proto='sctp', msgtype=None, traits=traits, minor=True) + +class ConversationRtpPort(ConversationLayer): + label = 'rtp_port' + + def impl_msg_match_strs(s, m:Message): + rtp_ports = [] + if 'rtp' in m.layers: + rtp_ports.append(m.src()) + rtp_ports.append(m.dst()) + rtp_ports.append(m.get_trait('rsl', 'rtp_port')) + rtp_ports.append(m.get_trait('mgcp', 'rtp_port')) + if not any(rtp_ports): + return None + return rtp_ports + +class Layer_rtp(Layer): + def __init__(s, m:Message): + pt = m.p.get('rtp.p_type') + traits = Traits( + pt=pt, + ) + super().__init__(m=m, proto='rtp', msgtype=pt, traits=traits) + + def collapse(s, messages, my_idx): + pt = s.m.get_trait('rtp', 'pt') + src = s.m.src() + dst = s.m.dst() + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if prev_msg.finalized: + break + if not 'rtp' in prev_msg.layers: + if prev_msg.is_minor(): + continue + else: + break + if prev_msg.get_trait('rtp', 'pt') != pt: + continue + if s.m.same_src_dst(prev_msg, forward=True): + # found a recent RTP similar RTP packet, combine + prev_msg.count += 1 + messages[my_idx] = None + prev_msg.absorb_msg(s.m) + return prev_msg + if s.m.same_src_dst(prev_msg, reverse=True): + # same but backwards + prev_msg.count_back += 1 + messages[my_idx] = None + prev_msg.absorb_msg(s.m) + return prev_msg + return s.m + + def associate(s, messages, my_idx): + ConversationRtpPort.associate(messages, my_idx) + return s.m + +class Layer_mgcp(Layer): + def __init__(s, m:Message): + p = m.p + verb = p.get('mgcp.req_verb') + rsp = p.get('mgcp.rsp_rspstring') + msgtype = verb or rsp or '?' + tid = p.get('mgcp.transid', '') + + rtp_port = None + sdp_rtp_ip = p.get('sdp.connection_info_address') + sdp_rtp_port = p.get('sdp.media_port') + if sdp_rtp_ip and sdp_rtp_port: + rtp_port = IpPort(sdp_rtp_ip, sdp_rtp_port) + + traits = Traits( + tid=tid, + endp=p.get('mgcp.req_endpoint'), + ci=p.get('mgcp.param_connectionid'), + verb=verb, + rsp=rsp, + rtp_port=rtp_port, + ) + + super().__init__(m=m, proto='mgcp', msgtype=msgtype, traits=traits) + + def label(s): + return f'mgcp{s.tid}.{s.msgtype}' + + def identify_entities(s, m:Message, messages, my_idx): + if m.get_trait('mgcp', 'verb') == 'CRCX': + return Message.EntityIdent(proto='mgcp', dst_kind='MGW') + elif m.is_response() and m.src_entity and m.src_entity.kind == 'MGW': + rtp = m.get_trait('mgcp', 'rtp_port') + if rtp: + m.src_entity.add_port('rtp', rtp) + return None + + def associate(s, messages, my_idx): + msg = messages[my_idx] + if msg.get_trait('mgcp', 'rsp'): + t = msg.timestamp + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if t - prev_msg.timestamp > 1: + break + if msg.same_traits(prev_msg, 'mgcp', 'tid') and msg.same_src_dst(prev_msg, reverse=True): + msg.request = prev_msg + msg.response = msg + prev_msg.request = prev_msg + prev_msg.response = msg + break + + Layer_mgcp.ConvMgcp.associate(messages, my_idx) + ConversationRtpPort.associate(messages, my_idx) + return s.m + + class ConvMgcp(ConversationLayer): + label = 'mgcp' + + def impl_msg_match_strs(s, m:Message): + mgw = m.entity_port('MGW') + if not mgw: + return None + + return [f'{mgw}:{name}={val}' for proto, name, val in m.get_traits('mgcp', ('endp', 'ci', 'tid'))] + + +class Layer_sccp(Layer): + def __init__(s, m:Message): + p = m.p + msgtype = p.get('sccp.message_type.showname') + traits = Traits( + src_lref=p.get('sccp.slr'), + dst_lref=p.get('sccp.dlr'), + ) + super().__init__(m=m, proto='sccp', msgtype=msgtype, traits=traits, hidden=True) + + def collapse(s, messages, my_idx): + msg = s.m + + # cut out STP hop + if SCCP_COLLAPSE_STP: + src = msg.get_trait('sctp', 'src') + t = msg.timestamp + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if t - prev_msg.timestamp > 1: + break + prev_sccp = prev_msg.layers.get(s.proto, None) + if prev_sccp is None: + continue + if src != prev_msg.get_trait('sctp', 'dst'): + continue + if s.msgtype != prev_sccp.msgtype: + continue + if not msg.same_traits(prev_msg, 'sccp', ('src_lref', 'dst_lref'), allow_unset=True): + continue + if not msg.same_traits(prev_msg, 'sctp', 'stream_id'): + continue + if not msg.same_traits(prev_msg, 'm3ua', ('opc', 'dpc')): + continue + + prev_msg.set_trait('sctp', 'dst', msg.get_trait('sctp', 'dst')) + prev_msg.absorb_msg(msg) + messages[i] = None + messages[my_idx] = prev_msg + Entity.add_to_blacklist(src) + return prev_msg + return msg + + def associate(s, messages, my_idx): + msg = s.m + + Layer_sccp.ConvSccp.associate(messages, my_idx) + return msg + + class ConvSccp(ConversationLayer): + label = 'sccp' + + @classmethod + def sccp_ref_str(cls, m:Message, src_or_dest:bool): + if src_or_dest: + addr = m.src() + refname = 'src_lref' + else: + addr = m.dst() + refname = 'dst_lref' + ref = m.get_trait('sccp', refname) + if not ref: + return None + return '%s#%s' % (addr, ref) + + def impl_msg_match_strs(s, m:Message): + return [s.sccp_ref_str(m, src_or_dest=True), s.sccp_ref_str(m, src_or_dest=False)] + +class Layer_m3ua(Layer): + def __init__(s, m:Message): + traits = Traits( + opc = m.p.get('m3ua.protocol_data_opc'), + dpc = m.p.get('m3ua.protocol_data_dpc'), + ) + super().__init__(m=m, proto='m3ua', msgtype=None, traits=traits, minor=True) + +# wireshark commonly falsely classifies a BSSMAP Ciphering Mode Command as RNSAP PDU +class Layer_rnsap(Layer): + def __init__(s, m:Message): + p = m.p + traits = Traits() + msgtype = 'Cipher Mode Command' + super().__init__(m=m, proto='bssmap', msgtype=msgtype, traits=traits) + +class Layer_bssap(Layer): + def __init__(s, m:Message): + msgtype = m.p.get('bssap.msgtype.showname') + traits = Traits( + msgtype_nr=int(m.p.get('bssap.pdu_type'), 16), + ) + + super().__init__(m=m, proto='bssap', msgtype=msgtype, traits=traits, minor=True) + +class Layer_hnbap(Layer): + def __init__(s, m:Message): + def strip_till_dash(dashstr): + if not dashstr or not '-' in dashstr: + return dashstr + dash = dashstr.rindex('-') + return dashstr[dash+1:] + + msgtype = strip_till_dash(m.p.get('hnbap.procedurecode.showname')) + pdutype = strip_till_dash(sane_msgtype(m.p.get('hnbap.hnbap_pdu.showname'))) + pdutype_nr = m.p.get('hnbap.hnbap_pdu') + traits = Traits( + msgtype_nr=int(m.p.get('hnbap.procedurecode')), + pdutype=pdutype, + pdutype_nr=int(pdutype_nr), + ) + super().__init__(m=m, proto='hnbap', msgtype=msgtype, traits=traits) + + def identify_entities(s, m:Message, messages, my_idx): + if m.get_trait('hnbap', 'msgtype_nr') == 1 and m.get_trait('hnbap', 'pdutype_nr') == 0: + # HNBRegister + return Message.EntityIdent(proto='Iuh', src_kind='hNodeB', dst_kind='HNBGW') + return None + +class Layer_rua(Layer): + def __init__(s, m:Message): + def strip_till_dash(dashstr): + if not dashstr or not '-' in dashstr: + return dashstr + dash = dashstr.rindex('-') + return dashstr[dash+1:] + + msgtype = strip_till_dash(m.p.get('rua.procedurecode.showname')) + pdutype = strip_till_dash(sane_msgtype(m.p.get('rua.rua_pdu.showname'))) + pdutype_nr = m.p.get('rua.rua_pdu') + traits = Traits( + msgtype_nr=int(m.p.get('rua.procedurecode')), + pdutype=pdutype, + pdutype_nr=int(pdutype_nr), + ) + super().__init__(m=m, proto='iuh', msgtype=msgtype, traits=traits) + + +class Layer_ranap(Layer): + def __init__(s, m:Message): + traits = Traits() + super().__init__(m=m, proto='ranap', msgtype=None, traits=traits) + + def collapse(s, messages, my_idx): + msg = s.m + + # cut out HNBGW hop + if IUH_COLLAPSE_HNBGW: + src = msg.src() + t = msg.timestamp + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if t - prev_msg.timestamp > 1: + break + if src != prev_msg.dst(): + continue + if msg.src_entity is not prev_msg.dst_entity: + continue + # DOESNT WORK + if not msg.same_traits(prev_msg, 'ranap', None): + continue + if not msg.same_traits(prev_msg, 'sccp', ('src_lref', 'dst_lref'), allow_unset=True): + continue + if not msg.same_traits(prev_msg, 'sctp', 'stream_id'): + continue + if not msg.same_traits(prev_msg, 'm3ua', ('opc', 'dpc')): + continue + + prev_msg.set_trait('sctp', 'dst', msg.get_trait('sctp', 'dst')) + prev_msg.absorb_msg(msg) + messages[i] = None + messages[my_idx] = prev_msg + Entity.add_to_blacklist(src) + return prev_msg + return msg + + def identify_entities(s, m:Message, messages, my_idx): + if m.get_trait('dtap', 'msgtype_nr') == 8 and m.dst_entity_is('MSC'): + # Location Updating Request coming in over RANAP (IuCS) + + # find a HNBGW that has recently received the same LU, + # associate IuCS port + src_entity = None + for match in find_same_trait(m, messages, my_idx, 'dtap', None): + if 'iuh' not in match.layers: + continue + if not match.dst_entity or match.dst_entity.kind != 'HNBGW': + continue + src_entity = match.dst_entity + if src_entity: + break + + return Message.EntityIdent(proto='IuCS', src_kind='HNBGW', src_entity=src_entity) + return None + + + +class Layer_gsm_a_bssmap(Layer): + def __init__(s, m:Message): + p = m.p + msgtype = p.get('gsm_a_bssmap.msgtype.showname') + traits = Traits( + msgtype_nr=int(p.get('gsm_a_bssmap.msgtype')), + ) + super().__init__(m=m, proto='bssmap', msgtype=msgtype, traits=traits) + + def identify_entities(s, m:Message, messages, my_idx): + if m.get_trait('bssmap', 'msgtype_nr') == 0x57: + # Complete Layer 3 Information + + # associate BSC BSSMAP port with BSC RSL port + src_entity = None + for match in find_same_trait(m, messages, my_idx, 'dtap', ('tmsi', 'imsi')): + if 'rsl' not in match.layers: + continue + if not match.dst_entity or match.dst_entity.kind != 'BSC': + continue + src_entity = match.dst_entity + if src_entity: + break + + return Message.EntityIdent(proto='bssmap', src_kind='BSC', dst_kind='MSC', src_entity = src_entity) + return None + +class Layer_gsm_abis_rsl(Layer): + def __init__(s, m:Message): + p = m.p + msgtype = p.get('gsm_abis_rsl.msg_type.showname') + msgtype_nr = p.get('gsm_abis_rsl.msg_type') + msgtype_nr = int(msgtype_nr) if msgtype_nr else None + + sdcch = None + tch = None + + # For Immediate Assignment, the assigned TS/chan is more interesting + ts = p.get('gsm_a_ccch.gsm_a_rr_timeslot') + if ts is not None: + cbits = p.get('gsm_a_ccch.gsm_a_rr_sdcch4_sdcchc4_cbch') + else: + # normal RSL messages on a given TS/chan + ts = p.get('gsm_abis_rsl.ch_no_tn') + cbits = p.get('gsm_abis_rsl.ch_no_cbits') + if ts is not None and cbits is not None: + sdcch = f'{ts}.{cbits}' + if ts != '0': + tch = sdcch + sdcch = None + + tch_ts = p.get('gsm_a_dtap.gsm_a_rr_timeslot') + tch_ss = p.get('gsm_a_dtap.gsm_a_rr_tch_facch_sacchf') + if tch_ts and tch_ss: + tch = f'{tch_ts}.{tch_ss}' + + rtp_port = None + ipacc_rtp_ip = p.get('gsm_abis_rsl.ipacc_local_ip') + ipacc_rtp_port = p.get('gsm_abis_rsl.ipacc_local_port') + if ipacc_rtp_ip and ipacc_rtp_port: + rtp_port = IpPort(ipacc_rtp_ip, ipacc_rtp_port) + + traits = Traits( + msgtype_nr=msgtype_nr, + sdcch=sdcch, + tch=tch, + chan_type=p.get('gsm_abis_rsl.ch_type'), + rtp_port=rtp_port, + ) + super().__init__(m=m, proto='rsl', msgtype=msgtype, traits=traits) + # ignore CCCH Load INDication + #if msgtype_nr == 18: + # m.hide = True + + def identify_entities(s, m:Message, messages, my_idx): + ids = [] + if m.get_trait('rsl', 'msgtype') == 'RF-RESource-INDication': + # RSL RESource INDication from BTS to BSC + ids.append(Message.EntityIdent(proto='rsl', src_kind='BTS', dst_kind='BSC')) + if (m.get_trait('rsl', 'rtp_port') and m.src_entity_is('BTS') + and m.get_trait('rsl', 'msgtype_nr') in ( + 113, # ip.access-CRCX-ACK + 116, # ip.access-MDCX-ACK + )): + ids.append(Message.EntityIdent(proto='rtp', src_kind='BTS', src_entity=m.src_entity)) + return ids + + def collapse(s, messages, my_idx): + # combine duplicates like rsl.CCCH-LOAD-INDication + for i in reversed(range(my_idx)): + prev_msg = messages[i] + if not prev_msg: + continue + if prev_msg.finalized: + break + # stop combining at any non-rsl (and non-minor) message + if not 'rsl' in prev_msg.layers: + if all(l.minor for l in prev_msg.layers.values()): + continue + else: + break + if not same_nonempty(prev_msg.get_traits('rsl'), s.m.get_traits('rsl')): + continue + if s.m.same_src_dst(prev_msg, forward=True): + # found a recent similar packet, combine + prev_msg.count += 1 + messages[my_idx] = None + prev_msg.absorb_msg(s.m) + return prev_msg + return s.m + + def associate(s, messages, my_idx): + Layer_gsm_abis_rsl.ConvRslSdcch.associate(messages, my_idx) + Layer_gsm_abis_rsl.ConvRslTch.associate(messages, my_idx) + ConversationRtpPort.associate(messages, my_idx) + return s.m + + class ConvRslSdcch(ConversationLayer): + label = 'rsl_sdcch' + + def association_start(s, m:Message): + msgt = m.get_trait('rsl', 'msgtype_nr') + r = msgt in (#0x16, # Immediate Assignment + 33, # Channel Activation + ) + return r and s.matches_msg(m) + + def impl_msg_match_strs(s, m:Message): + sdcch = m.get_trait('rsl', 'sdcch') + if sdcch is None: + return None + bts = m.entity_port('BTS') + if bts is None: + return None + add_addr = str(bts) + ':' + return [add_addr + sdcch] + + class ConvRslTch(ConversationLayer): + label = 'rsl_tch' + + def association_start(s, m:Message): + msgt = m.get_trait('rsl', 'msgtype_nr') + r = msgt in (1, #Assignment-Command + ) + return r and s.matches_msg(m) + + def impl_msg_match_strs(s, m:Message): + tch = m.get_trait('rsl', 'tch') + if tch is None: + return None + bts = m.entity_port('BTS') + if bts is None: + return None + add_addr = str(bts) + ':' + return [add_addr + tch] + + +class Layer_gsm_a_dtap(Layer): + def __init__(s, m:Message): + dtap = m.p.get('gsm_a_dtap') + assert dtap is not None + + msgtype = None + msgtype_nr = None + for f in dtap._get_all_fields_with_alternates(): + if f.name.startswith('gsm_a.dtap.msg_') and f.name.endswith('_type'): + msgtype = f.showname_value + try: + msgtype_nr = int(f.raw_value) + except: + pass + + traits = Traits( + msgtype_nr=msgtype_nr, + imsi=m.p.get('gsm_a_dtap.e212_imsi') or m.p.get('gsm_a_dtap.gsm_a_imsi'), + tmsi=m.p.get('gsm_a_dtap.gsm_a_tmsi'), + ) + super().__init__(m=m, proto='dtap', msgtype=msgtype, traits=traits) + + + +class Results: + def __init__(s, opts, flush_seconds=5): + set_instance_vars_from_args() + s.messages = [] + s.finalized_idx = -1 + s.show_traits = None + if opts.show_traits: + if opts.show_traits == 'all': + s.show_traits = True + else: + s.show_traits = opts.show_traits.split(',') + s.show_conversations = None + if opts.show_conversations: + if opts.show_conversations == 'all': + s.show_conversations = True + else: + s.show_conversations = opts.show_conversations.split(',') + + def out_msg(s, msg): + out_text(msg.str(ladder=True, traits=s.show_traits, conversations=s.show_conversations)) + + def flush_msg(s, msg): + msg.finalized = True + if all(l.minor for l in msg.layers.values()): + return + if msg.hide: + return + if s.opts.filter_imsi: + subscr = msg.subscriber() + if not subscr or subscr.imsi != s.opts.filter_imsi: + return + s.out_msg(msg) + + def flush(s, timestamp_now=0, flush_seconds=0): + flush_t = timestamp_now - flush_seconds + for i in range(s.finalized_idx+1, len(s.messages)): + msg = s.messages[i] + if not msg: + continue + if timestamp_now and msg.timestamp > flush_t: + break + s.finalized_idx = i + s.flush_msg(msg) + + def add_msg(s, msg): + global g_current_msg + s.flush(msg.timestamp, s.flush_seconds) + try: + g_current_msg = msg + if not msg.layers: + return + s.messages.append(msg) + idx = len(s.messages) - 1 + changed_msg = msg.collapse(s.messages, idx) + # if the received message was absorbed by another, continue to identify the modified message using the + # new index + if changed_msg is not None and changed_msg is not msg: + msg = changed_msg + idx = s.messages.index(msg) + Entity.set_known(msg) + changed_msg = msg.associate(s.messages, idx) + if changed_msg is not None and changed_msg is not msg: + msg = changed_msg + idx = s.messages.index(msg) + msg.identify_entities(s.messages, idx) + Subscriber.identify(msg, s.messages, idx) + except: + s.flush() + + out_error('Exception while processing message:') + s.out_msg(msg) + raise + + def process_cap(s, cap): + p_idx = 0 + start_t = time.time() + p_min_t = None + p_max_t = None + warn_t = start_t + warn_p_t = None + for cap_p in cap: + p_idx += 1 + if p_idx < opts.packet_start: + continue + if opts.packet_count and (p_idx - opts.packet_start) > opts.packet_count: + break + if opts.packet_end and p_idx > opts.packet_end: + break + msg = Message.parse(Packet(p_idx, cap_p)) + p_min_t = msg.timestamp if p_min_t is None else min(p_min_t, msg.timestamp) + p_max_t = msg.timestamp if p_max_t is None else max(p_max_t, msg.timestamp) + s.add_msg(msg) + + now = time.time() + if warn_p_t is None or now > warn_t + 3: + if warn_p_t: + packet_time = p_max_t - warn_p_t + real_time = now - warn_t + if packet_time < real_time: + out_text(f'! taking longer to calculate than packets arrive by {100.*(real_time - packet_time)/packet_time:.1f}%') + warn_t = now + warn_p_t = p_max_t + s.flush() + end_t = time.time() + out_text(f'packet time: {p_max_t - p_min_t:.1f} in real time: {end_t - start_t:.1f}') + + def process_file(s, path): + cap = pyshark.FileCapture(path) + s.process_cap(cap) + + def process_live(s, iface): + cap = pyshark.LiveCapture(iface) + p_idx = 0 + try: + for cap_p in cap.sniff_continuously(): + p_idx += 1 + msg = Message.parse(Packet(p_idx, cap_p)) + s.add_msg(msg) + except KeyboardInterrupt: + pass + s.flush() + + def __str__(s): + return '\n'.join(str(m) for m in s.messages) + + +def run_tests(): + def out_test(*args, **kwargs): + print(*args, **kwargs) + + def dump_conv(label, c): + if not c: + out_test(label, 'conversation=None') + return + out_test(label, c, '{' if c.messages else 'empty') + for m in c.messages: + out_test(' ' * len(label), ' -', m.str(traits=True)) + if c.messages: + out_test(' ' * len(label), ' }') + for i in range(len(c.messages)): + m = c.messages[i] + if m.conversation is not c: + out_test(f'ERROR: c->messages[{i}].conversation != c for') + out_test(m) + assert False + + class FakeConvLayer(ConversationLayer): + label = 'fake' + + class fake_obj: + def __init__(s, sniff_timestamp=12345, **kwargs): + set_instance_vars_from_args() + + cap_p1 = fake_obj(foo='foo', bar='bar') + p1 = Packet(1, cap_p1) + + cap_p2 = fake_obj(foo='moo', bar='mar') + p2 = Packet(2, cap_p2) + + cap_p3 = fake_obj(foo='goo', bar='gar') + p3 = Packet(3, cap_p3) + + + + out_test('Test absorb_conversation') + + def test_conv_absorb_conv(a, b): + dump_conv('\na', a) + dump_conv('b', b) + a = a.absorb_conversation(b) + dump_conv('=', a) + return a + + c = test_conv_absorb_conv(Conversation(), Conversation()) + assert not c.messages + + m1 = Message(p1) + cl1 = FakeConvLayer().merge_conversation_with_msg(m1) + c = test_conv_absorb_conv(Conversation(), cl1.conversation) + assert m1 in c.messages + + m1 = Message(p1) + cl1 = FakeConvLayer().merge_conversation_with_msg(m1) + c = test_conv_absorb_conv(cl1.conversation, Conversation()) + assert m1 in c.messages + + m1 = Message(p1) + cl1 = FakeConvLayer().merge_conversation_with_msg(m1) + m2 = Message(p2) + cl2 = FakeConvLayer().merge_conversation_with_msg(m2) + c = test_conv_absorb_conv(m1.conversation, m2.conversation) + assert m1 in c.messages + assert m2 in c.messages + + out_test('\nTest Conversation.add_message') + def test_conv_add_msg(conv_layer, msg): + conv = conv_layer.ensure_conversation() + dump_conv('\n', conv) + out_test('+', msg.str(conversations=True,traits=True)) + conv.add_message(msg) + dump_conv('=', conv) + return conv + + cl1 = FakeConvLayer() + m2 = Message(p2) + c = test_conv_add_msg(cl1, m2) + assert m2 in c.messages + + m1 = Message(p1) + cl1 = FakeConvLayer().merge_conversation_with_msg(m1) + m2 = Message(p2) + c = test_conv_add_msg(cl1, m2) + assert m1 in c.messages + assert m2 in c.messages + + m1 = Message(p1) + cl1 = FakeConvLayer() + cl1 = cl1.merge_conversation_with_msg(m1) + c = test_conv_add_msg(cl1, m1) + assert m1 in c.messages + + m1 = Message(p1) + cl1 = FakeConvLayer().merge_conversation_with_msg(m1) + m2 = Message(p2) + cl2 = FakeConvLayer().merge_conversation_with_msg(m2) + c = test_conv_add_msg(cl1, m2) + assert m1 in c.messages + assert m2 in c.messages + assert c.layers.get(FakeConvLayer.label) + + out_test('\nTest ConversationLayer.merge_conversation_with_msg') + def test_conv_layer_add_msg(conv_layer, msg): + dump_conv('\n', conv_layer.conversation) + out_test('+', msg.str(conversations=True,traits=True)) + if msg.conversation: + dump_conv(' with', msg.conversation) + conv_layer = conv_layer.merge_conversation_with_msg(msg) + dump_conv('=', conv_layer.conversation) + + m1 = Message(p1) + cl1 = FakeConvLayer().merge_conversation_with_msg(m1) + m2 = Message(p2) + test_conv_layer_add_msg(cl1, m2) + assert m1 in cl1.conversation.messages + assert m2 in cl1.conversation.messages + assert cl1.conversation.layers.get(FakeConvLayer.label) + + m1 = Message(p1) + cl1 = FakeConvLayer().merge_conversation_with_msg(m1) + m2 = Message(p2) + cl2 = FakeConvLayer().merge_conversation_with_msg(m2) + test_conv_layer_add_msg(cl1, m2) + assert m1 in cl1.conversation.messages + assert m2 in cl1.conversation.messages + assert cl1.conversation.layers.get(FakeConvLayer.label) + assert cl2.conversation is None + + m1 = Message(p1) + cl1 = FakeConvLayer().merge_conversation_with_msg(m1) + m3 = Message(p3) + cl1 = cl1.merge_conversation_with_msg(m3) + m2 = Message(p2) + cl2 = FakeConvLayer().merge_conversation_with_msg(m2) + test_conv_layer_add_msg(cl2, m1) + dump_conv('m1', m1.conversation) + dump_conv('m2', m2.conversation) + assert m1 in m2.conversation.messages + assert m2 in m1.conversation.messages + assert cl1.conversation.layers.get(FakeConvLayer.label) + assert cl2.conversation is None + + +def parse_args(): + import argparse + parser = argparse.ArgumentParser(description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('--pcap-file', '-f', metavar='file') + parser.add_argument('--live-capture', '-l', metavar='interface') + parser.add_argument('--packet-start', '-S', default=0, type=int) + parser.add_argument('--packet-count', '-C', default=0, type=int) + parser.add_argument('--packet-end', '-E', default=0, type=int) + parser.add_argument('--filter-imsi', default=None) + parser.add_argument('--show-traits', default=None) + parser.add_argument('--show-conversations', default=None) + parser.add_argument('--test', action='store_true') + return parser.parse_args() + +if __name__ == '__main__': + opts = parse_args() + + if opts.test: + run_tests() + else: + r = Results(opts) + if opts.pcap_file: + r.process_file(opts.pcap_file) + if opts.live_capture: + r.process_live(opts.live_capture) + r.flush() + +# vim: noexpandtab tabstop=8 shiftwidth=8 -- cgit v1.2.3