ipa_proto: Add notion of 'codecs'

A codec for a given Stream Identifier can be registered with encode
and decode functions.  This codec transcodes from the binary payload
of messages within that stream identifier and some abstract
representation.  Any received messages will be passed through decode,
while any to-be-transmitted messages will be passed through encode.

Change-Id: I8eaf888402545a1a871df9ae3dfbce690729dd03
This commit is contained in:
Harald Welte 2019-08-11 20:31:41 +02:00 committed by Harald Welte
parent d0f391ef60
commit 3b95a7cfc9
1 changed files with 44 additions and 13 deletions

View File

@ -37,11 +37,16 @@
-export([register_socket/1, register_stream/3, unregister_stream/2,
send/3, connect/3, connect/4, listen_accept_handle/2,
start_listen/3, controlling_process/3]).
start_listen/3, controlling_process/3, register_codec/3]).
-type stream_id() :: integer() | {osmo, integer()}.
-record(ipa_socket, {socket, ipaPid, streamTbl, listenType}).
-record(ipa_codec, {streamId :: stream_id(),
encodeFn :: fun(),
decodeFn :: fun()
}).
% register a TCP socket with this IPA protocol implementation
register_socket(Socket) ->
@ -124,15 +129,43 @@ split_ipa_msg(DataBin) ->
% deliver an incoming message to the process that is registered for the socket/stream_id
deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
DataDec = try_decode(StreamID, DataBin),
case ets:lookup(StreamMap, {Socket, StreamID}) of
[{_,{process_id, Pid}}] ->
Pid ! {ipa, Socket, StreamID, DataBin};
Pid ! {ipa, Socket, StreamID, DataDec};
[{_,{callback_fn, Fn, Args}}] ->
Fn(Socket, StreamID, DataBin, Args);
Fn(Socket, StreamID, DataDec, Args);
[] ->
io:format("No Pid registered for Socket ~p Stream ~p~n", [Socket, StreamID])
end.
% register a Codec with this IPA protocol implementation
-spec register_codec(stream_id(), fun(), fun()) -> boolean().
register_codec(StreamID, EncodeFn, DecodeFn) ->
ets:insert(ipa_codecs, #ipa_codec{streamId=StreamID, encodeFn=EncodeFn, decodeFn=DecodeFn}).
-spec try_decode(stream_id(), binary()) -> any().
try_decode(StreamID, Data) ->
case ets:lookup(ipa_codecs, StreamID) of
[IpaCodec] ->
Fun = IpaCodec#ipa_codec.decodeFn,
Fun(Data);
[] ->
Data
end.
-spec try_encode(stream_id(), any()) -> binary().
try_encode(_StreamID, Data) when is_binary(Data) ->
Data;
try_encode(StreamID, Data) ->
case ets:lookup(ipa_codecs, StreamID) of
[IpaCodec] ->
Fun = IpaCodec#ipa_codec.encodeFn,
Fun(Data);
[] ->
Data
end.
% process (split + deliver) an incoming IPA message
process_rx_ipa_msg(_S, _StreamMap, <<>>) ->
ok;
@ -173,10 +206,12 @@ process_tcp_closed(S, StreamMap) ->
% send a binary message through a given Socket / StreamID
send(Socket, {osmo, StreamIdExt}, DataBin) ->
send(Socket, ?IPAC_PROTO_OSMO, [StreamIdExt, DataBin]);
DataEnc = try_encode({osmo, StreamIdExt}, DataBin),
send(Socket, ?IPAC_PROTO_OSMO, [StreamIdExt, DataEnc]);
send(Socket, StreamID, DataBin) ->
Size = iolist_size(DataBin),
gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataBin])).
DataEnc = try_encode(StreamID, DataBin),
Size = iolist_size(DataEnc),
gen_tcp:send(Socket, iolist_to_binary([<<Size:2/big-unsigned-integer-unit:8>>, StreamID, DataEnc])).
call_sync(Pid, Request) ->
@ -194,12 +229,8 @@ reply({From, Ref}, Reply) ->
% global module initialization
init() ->
case ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]) of
ipa_sockets ->
ok;
_ ->
{error, ets_new_ipa_sockets}
end.
ipa_sockets = ets:new(ipa_sockets, [named_table, set, public, {keypos, #ipa_socket.socket}]),
ipa_codecs = ets:new(ipa_codecs, [named_table, set, public, {keypos, #ipa_codec.streamId}]).
% initialize a signle socket, create its handle process
init_sock(Socket, CallingPid) ->