|
|
|
@ -129,12 +129,18 @@ request({ipa_unblock, Socket}, CcmOptions) ->
|
|
|
|
|
Ret = inet:setopts(Socket, [{active, once}]),
|
|
|
|
|
io:format("Unblocking socket ~p:~p~n", [Socket, Ret]).
|
|
|
|
|
|
|
|
|
|
% split an incoming IPA message and split it into Length/StreamID/Payload
|
|
|
|
|
% split an incoming IPA message and split it into StreamID/Payload/Trailer
|
|
|
|
|
split_ipa_msg(IPALen, _StreamID, DataRemainBin) when byte_size(DataRemainBin) < IPALen ->
|
|
|
|
|
need_more_data;
|
|
|
|
|
split_ipa_msg(IPALen, StreamID, DataRemainBin) ->
|
|
|
|
|
<<Payload:IPALen/binary, Trailer/binary>> = DataRemainBin,
|
|
|
|
|
io:format("Stream ~p, ~p bytes~n", [StreamID, IPALen]),
|
|
|
|
|
{ok, StreamID, Payload, Trailer}.
|
|
|
|
|
split_ipa_msg(DataBin) when byte_size(DataBin) < 3 ->
|
|
|
|
|
need_more_data;
|
|
|
|
|
split_ipa_msg(DataBin) ->
|
|
|
|
|
% FIXME: This will throw an exception if DataBin doesn't contain all payload
|
|
|
|
|
<<Length:16/big-unsigned-integer, StreamID:8, Payload:Length/binary, Trailer/binary>> = DataBin,
|
|
|
|
|
io:format("Stream ~p, ~p bytes~n", [StreamID, Length]),
|
|
|
|
|
{StreamID, Payload, Trailer}.
|
|
|
|
|
<<IPALen:16/big-unsigned-integer, StreamID:8, DataRemainBin/binary>> = DataBin,
|
|
|
|
|
split_ipa_msg(IPALen, StreamID, DataRemainBin).
|
|
|
|
|
|
|
|
|
|
% deliver an incoming message to the process that is registered for the socket/stream_id
|
|
|
|
|
deliver_rx_ipa_msg(Socket, StreamID, StreamMap, DataBin) ->
|
|
|
|
@ -177,19 +183,22 @@ try_encode(StreamID, Data) ->
|
|
|
|
|
|
|
|
|
|
% process (split + deliver) an incoming IPA message
|
|
|
|
|
process_rx_ipa_msg(_S, _StreamMap, _, <<>>) ->
|
|
|
|
|
ok;
|
|
|
|
|
{ok, <<>>};
|
|
|
|
|
process_rx_ipa_msg(S, StreamMap, CcmOptions, Data) ->
|
|
|
|
|
{StreamID, PayloadBin, Trailer} = split_ipa_msg(Data),
|
|
|
|
|
case StreamID of
|
|
|
|
|
?IPAC_PROTO_CCM ->
|
|
|
|
|
process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin);
|
|
|
|
|
?IPAC_PROTO_OSMO ->
|
|
|
|
|
<<ExtStreamID:8, PayloadExt/binary>> = PayloadBin,
|
|
|
|
|
deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt);
|
|
|
|
|
_ ->
|
|
|
|
|
deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
|
|
|
|
|
end,
|
|
|
|
|
process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer).
|
|
|
|
|
case split_ipa_msg(Data) of
|
|
|
|
|
need_more_data -> {ok, Data};
|
|
|
|
|
{ok, StreamID, PayloadBin, Trailer} ->
|
|
|
|
|
case StreamID of
|
|
|
|
|
?IPAC_PROTO_CCM ->
|
|
|
|
|
process_rx_ccm_msg(S, StreamID, CcmOptions, PayloadBin);
|
|
|
|
|
?IPAC_PROTO_OSMO ->
|
|
|
|
|
<<ExtStreamID:8, PayloadExt/binary>> = PayloadBin,
|
|
|
|
|
deliver_rx_ipa_msg(S, {osmo, ExtStreamID}, StreamMap, PayloadExt);
|
|
|
|
|
_ ->
|
|
|
|
|
deliver_rx_ipa_msg(S, StreamID, StreamMap, PayloadBin)
|
|
|
|
|
end,
|
|
|
|
|
process_rx_ipa_msg(S, StreamMap, CcmOptions, Trailer)
|
|
|
|
|
end.
|
|
|
|
|
|
|
|
|
|
send_close_signal([]) ->
|
|
|
|
|
ok;
|
|
|
|
@ -246,9 +255,9 @@ init_sock(Socket, CallingPid) ->
|
|
|
|
|
StreamMap = ets:new(stream_map, [set]),
|
|
|
|
|
ets:insert(ipa_sockets, #ipa_socket{socket=Socket, ipaPid=self(), streamTbl=StreamMap}),
|
|
|
|
|
CallingPid ! {ipa_init_sock_done, Socket},
|
|
|
|
|
loop(Socket, StreamMap, #ipa_ccm_options{}).
|
|
|
|
|
loop(Socket, StreamMap, #ipa_ccm_options{}, <<>>).
|
|
|
|
|
|
|
|
|
|
loop(S, StreamMap, CcmOptions) ->
|
|
|
|
|
loop(S, StreamMap, CcmOptions, RxPendingData) ->
|
|
|
|
|
receive
|
|
|
|
|
{request, From, Request} ->
|
|
|
|
|
case ipa_proto:request(Request, CcmOptions) of
|
|
|
|
@ -260,15 +269,15 @@ loop(S, StreamMap, CcmOptions) ->
|
|
|
|
|
Reply = EmbeddedReply
|
|
|
|
|
end,
|
|
|
|
|
ipa_proto:reply(From, Reply),
|
|
|
|
|
ipa_proto:loop(S, StreamMap, NextCcmOptions);
|
|
|
|
|
ipa_proto:loop(S, StreamMap, NextCcmOptions, RxPendingData);
|
|
|
|
|
{ipa_send, S, StreamId, Data} ->
|
|
|
|
|
send(S, StreamId, Data),
|
|
|
|
|
ipa_proto:loop(S, StreamMap, CcmOptions);
|
|
|
|
|
ipa_proto:loop(S, StreamMap, CcmOptions, RxPendingData);
|
|
|
|
|
{tcp, S, Data} ->
|
|
|
|
|
% process incoming IPA message and mark socket active once more
|
|
|
|
|
ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, Data),
|
|
|
|
|
{ok, NewRxPendingData} = ipa_proto:process_rx_ipa_msg(S, StreamMap, CcmOptions, <<RxPendingData/binary, Data/binary>>),
|
|
|
|
|
inet:setopts(S, [{active, once}]),
|
|
|
|
|
ipa_proto:loop(S, StreamMap, CcmOptions);
|
|
|
|
|
ipa_proto:loop(S, StreamMap, CcmOptions, NewRxPendingData);
|
|
|
|
|
{tcp_closed, S} ->
|
|
|
|
|
io:format("Socket ~w closed [~w]~n", [S,self()]),
|
|
|
|
|
ipa_proto:process_tcp_closed(S, StreamMap),
|
|
|
|
@ -367,7 +376,7 @@ start_listen(LPort, NumServers, Opts) ->
|
|
|
|
|
{ok, ListenSock} ->
|
|
|
|
|
start_servers(NumServers, ListenSock, self()),
|
|
|
|
|
{ok, Port} = inet:port(ListenSock),
|
|
|
|
|
Port;
|
|
|
|
|
{ok, ListenSock, Port};
|
|
|
|
|
{error, Reason} ->
|
|
|
|
|
{error, Reason}
|
|
|
|
|
end.
|
|
|
|
|