ipa_proto.erl: Implement TCP/IPA reassembly

Related: OS#6377
Change-Id: I322e6ab9368ad66be66a9e8d113575f51f9c91c3
This commit is contained in:
Pau Espin 2024-02-27 21:15:36 +01:00
parent 414a618804
commit 2286c1b873
1 changed files with 32 additions and 23 deletions

View File

@ -129,12 +129,18 @@ request({ipa_unblock, Socket}, CcmOptions) ->
Ret = inet:setopts(Socket, [{active, once}]),
io:format("Unblocking socket ~p:~p~n", [Socket, Ret]).
% split an incoming IPA message and split it into Length/StreamID/Payload
% split an incoming IPA message and split it into StreamID/Payload/Trailer
split_ipa_msg(IPALen, _StreamID, DataRemainBin) when byte_size(DataRemainBin) < IPALen ->
need_more_data;
split_ipa_msg(IPALen, StreamID, DataRemainBin) ->
<<Payload:IPALen/binary, Trailer/binary>> = DataRemainBin,
io:format("Stream ~p, ~p bytes~n", [StreamID, IPALen]),
{ok, StreamID, Payload, Trailer}.
split_ipa_msg(DataBin) when byte_size(DataBin) < 3 ->
need_more_data;
split_ipa_msg(DataBin) ->
% FIXME: This will throw an exception if DataBin doesn't contain all payload
<<Length:16/big-unsigned-integer, StreamID:8, Payload:Length/binary, Trailer/binary>> = DataBin,
io:format("Stream ~p, ~p bytes~n", [StreamID, Length]),
{StreamID, Payload, Trailer}.
<<IPALen:16/big-unsigned-integer, StreamID:8, DataRemainBin/binary>> = DataBin,
split_ipa_msg(IPALen, StreamID, DataRemainBin).
% deliver an incoming message to the process that is registered for the socket/stream_id
deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
@ -177,19 +183,22 @@ try_encode(StreamID, Data) ->
% process (split + deliver) an incoming IPA message
process_rx_ipa_msg(_S, _StreamMap, _, <<>>) ->
ok;
{ok, <<>>};
process_rx_ipa_msg(S, StreamMap, CcmOptions, Data) ->
{StreamID, PayloadBin, Trailer} = split_ipa_msg(Data),
case StreamID of
?IPAC_PROTO_CCM ->
process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin);
?IPAC_PROTO_OSMO ->
<<ExtStreamID:8, PayloadExt/binary>> = PayloadBin,
deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt);
_ ->
deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
end,
process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer).
case split_ipa_msg(Data) of
need_more_data -> {ok, Data};
{ok, StreamID, PayloadBin, Trailer} ->
case StreamID of
?IPAC_PROTO_CCM ->
process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin);
?IPAC_PROTO_OSMO ->
<<ExtStreamID:8, PayloadExt/binary>> = PayloadBin,
deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt);
_ ->
deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
end,
process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer)
end.
send_close_signal([]) ->
ok;
@ -246,9 +255,9 @@ init_sock(Socket, CallingPid) ->
StreamMap = ets:new(stream_map, [set]),
ets:insert(ipa_sockets, #ipa_socket{socket=Socket, ipaPid=self(), streamTbl=StreamMap}),
CallingPid ! {ipa_init_sock_done, Socket},
loop(Socket, StreamMap, #ipa_ccm_options{}).
loop(Socket, StreamMap, #ipa_ccm_options{}, <<>>).
loop(S, StreamMap, CcmOptions) ->
loop(S, StreamMap, CcmOptions, RxPendingData) ->
receive
{request, From, Request} ->
case ipa_proto:request(Request, CcmOptions) of
@ -260,15 +269,15 @@ loop(S, StreamMap, CcmOptions) ->
Reply = EmbeddedReply
end,
ipa_proto:reply(From, Reply),
ipa_proto:loop(S, StreamMap, NextCcmOptions);
ipa_proto:loop(S, StreamMap, NextCcmOptions, RxPendingData);
{ipa_send, S, StreamId, Data} ->
send(S, StreamId, Data),
ipa_proto:loop(S, StreamMap, CcmOptions);
ipa_proto:loop(S, StreamMap, CcmOptions, RxPendingData);
{tcp, S, Data} ->
% process incoming IPA message and mark socket active once more
ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, Data),
{ok, NewRxPendingData} = ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, <<RxPendingData/binary, Data/binary>>),
inet:setopts(S, [{active, once}]),
ipa_proto:loop(S, StreamMap, CcmOptions);
ipa_proto:loop(S, StreamMap, CcmOptions, NewRxPendingData);
{tcp_closed, S} ->
io:format("Socket ~w closed [~w]~n", [S,self()]),
ipa_proto:process_tcp_closed(S, StreamMap),