diff --git a/src/ipa_proto.erl b/src/ipa_proto.erl index c96d6bb..d88af7e 100644 --- a/src/ipa_proto.erl +++ b/src/ipa_proto.erl @@ -129,12 +129,18 @@ request({ipa_unblock, Socket}, CcmOptions) -> Ret = inet:setopts(Socket, [{active, once}]), io:format("Unblocking socket ~p:~p~n", [Socket, Ret]). -% split an incoming IPA message and split it into Length/StreamID/Payload +% split an incoming IPA message and split it into StreamID/Payload/Trailer +split_ipa_msg(IPALen, _StreamID, DataRemainBin) when byte_size(DataRemainBin) < IPALen -> + need_more_data; +split_ipa_msg(IPALen, StreamID, DataRemainBin) -> + <> = DataRemainBin, + io:format("Stream ~p, ~p bytes~n", [StreamID, IPALen]), + {ok, StreamID, Payload, Trailer}. +split_ipa_msg(DataBin) when byte_size(DataBin) < 3 -> + need_more_data; split_ipa_msg(DataBin) -> - % FIXME: This will throw an exception if DataBin doesn't contain all payload - <> = DataBin, - io:format("Stream ~p, ~p bytes~n", [StreamID, Length]), - {StreamID, Payload, Trailer}. + <> = DataBin, + split_ipa_msg(IPALen, StreamID, DataRemainBin). % deliver an incoming message to the process that is registered for the socket/stream_id deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) -> @@ -177,19 +183,22 @@ try_encode(StreamID, Data) -> % process (split + deliver) an incoming IPA message process_rx_ipa_msg(_S, _StreamMap, _, <<>>) -> - ok; + {ok, <<>>}; process_rx_ipa_msg(S, StreamMap, CcmOptions, Data) -> - {StreamID, PayloadBin, Trailer} = split_ipa_msg(Data), - case StreamID of - ?IPAC_PROTO_CCM -> - process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin); - ?IPAC_PROTO_OSMO -> - <> = PayloadBin, - deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt); - _ -> - deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin) - end, - process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer). + case split_ipa_msg(Data) of + need_more_data -> {ok, Data}; + {ok, StreamID, PayloadBin, Trailer} -> + case StreamID of + ?IPAC_PROTO_CCM -> + process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin); + ?IPAC_PROTO_OSMO -> + <> = PayloadBin, + deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt); + _ -> + deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin) + end, + process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer) + end. send_close_signal([]) -> ok; @@ -246,9 +255,9 @@ init_sock(Socket, CallingPid) -> StreamMap = ets:new(stream_map, [set]), ets:insert(ipa_sockets, #ipa_socket{socket=Socket, ipaPid=self(), streamTbl=StreamMap}), CallingPid ! {ipa_init_sock_done, Socket}, - loop(Socket, StreamMap, #ipa_ccm_options{}). + loop(Socket, StreamMap, #ipa_ccm_options{}, <<>>). -loop(S, StreamMap, CcmOptions) -> +loop(S, StreamMap, CcmOptions, RxPendingData) -> receive {request, From, Request} -> case ipa_proto:request(Request, CcmOptions) of @@ -260,15 +269,15 @@ loop(S, StreamMap, CcmOptions) -> Reply = EmbeddedReply end, ipa_proto:reply(From, Reply), - ipa_proto:loop(S, StreamMap, NextCcmOptions); + ipa_proto:loop(S, StreamMap, NextCcmOptions, RxPendingData); {ipa_send, S, StreamId, Data} -> send(S, StreamId, Data), - ipa_proto:loop(S, StreamMap, CcmOptions); + ipa_proto:loop(S, StreamMap, CcmOptions, RxPendingData); {tcp, S, Data} -> % process incoming IPA message and mark socket active once more - ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, Data), + {ok, NewRxPendingData} = ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, <>), inet:setopts(S, [{active, once}]), - ipa_proto:loop(S, StreamMap, CcmOptions); + ipa_proto:loop(S, StreamMap, CcmOptions, NewRxPendingData); {tcp_closed, S} -> io:format("Socket ~w closed [~w]~n", [S,self()]), ipa_proto:process_tcp_closed(S, StreamMap),