diff options
author | Alexey Shchepin <alexey@process-one.net> | 2006-01-13 04:55:20 +0300 |
---|---|---|
committer | Alexey Shchepin <alexey@process-one.net> | 2006-01-13 04:55:20 +0300 |
commit | 6bb510d99e4ad29dbf85b4ffec9d1d8c6516b4ba (patch) | |
tree | 1fe149f5ee097f2770c8e4693bd833a0e2724b46 /src | |
parent | 8401a5ac55e756f7a4e74afd7916b38e0d65e02a (diff) |
* src/ejabberd_service.erl: Bugfix
* src/ejabberd_receiver.erl: Rewritten to use {active, once} mode
for socket
* src/ejabberd_c2s.erl: Update
* src/ejabberd_listener.erl: Likewise
* src/ejabberd_s2s_in.erl: Likewise
* src/ejabberd_s2s_out.erl: Likewise
* src/ejabberd_service.erl: Likewise
* src/shaper.erl: Likewise
* src/tls/tls.erl: Likewise
* src/web/ejabberd_http.erl: Likewise
SVN Revision: 483
Diffstat (limited to 'src')
-rw-r--r-- | src/ejabberd_c2s.erl | 17 | ||||
-rw-r--r-- | src/ejabberd_listener.erl | 2 | ||||
-rw-r--r-- | src/ejabberd_receiver.erl | 270 | ||||
-rw-r--r-- | src/ejabberd_s2s_in.erl | 18 | ||||
-rw-r--r-- | src/ejabberd_s2s_out.erl | 11 | ||||
-rw-r--r-- | src/ejabberd_service.erl | 12 | ||||
-rw-r--r-- | src/shaper.erl | 27 | ||||
-rw-r--r-- | src/tls/tls.erl | 8 | ||||
-rw-r--r-- | src/web/ejabberd_http.erl | 4 |
9 files changed, 267 insertions, 102 deletions
diff --git a/src/ejabberd_c2s.erl b/src/ejabberd_c2s.erl index 64ac65d81..891756acf 100644 --- a/src/ejabberd_c2s.erl +++ b/src/ejabberd_c2s.erl @@ -17,6 +17,7 @@ start_link/2, send_text/2, send_element/2, + become_controller/1, get_presence/1]). %% gen_fsm callbacks @@ -97,6 +98,9 @@ start(SockData, Opts) -> start_link(SockData, Opts) -> gen_fsm:start_link(ejabberd_c2s, [SockData, Opts], ?FSMOPTS). +become_controller(Pid) -> + gen_fsm:send_all_state_event(Pid, become_controller). + %% Return Username, Resource and presence information get_presence(FsmRef) -> gen_fsm:sync_send_all_state_event(FsmRef, {get_presence}, 1000). @@ -791,6 +795,12 @@ session_established(closed, StateData) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- +handle_event(become_controller, StateName, StateData) -> + ok = (StateData#state.sockmod):controlling_process( + StateData#state.socket, + StateData#state.receiver), + ejabberd_receiver:become_controller(StateData#state.receiver), + {next_state, StateName, StateData}; handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. @@ -975,7 +985,10 @@ handle_info({route, From, To, Packet}, StateName, StateData) -> {next_state, StateName, NewState}; true -> {next_state, StateName, NewState} - end. + end; +handle_info(Info, StateName, StateData) -> + ?ERROR_MSG("Unexpected info: ~p", [Info]), + {next_state, StateName, StateData}. %%---------------------------------------------------------------------- %% Func: terminate/3 @@ -1035,7 +1048,7 @@ terminate(_Reason, StateName, StateData) -> _ -> ok end, - (StateData#state.sockmod):close(StateData#state.socket), + ejabberd_receiver:close(StateData#state.receiver), ok. %%%---------------------------------------------------------------------- diff --git a/src/ejabberd_listener.erl b/src/ejabberd_listener.erl index 6e84b65e2..d387de7d4 100644 --- a/src/ejabberd_listener.erl +++ b/src/ejabberd_listener.erl @@ -99,6 +99,7 @@ accept(ListenSocket, Module, Opts) -> {error, _Reason} -> gen_tcp:close(Socket) end, + Module:become_controller(Pid), accept(ListenSocket, Module, Opts); {error, Reason} -> ?INFO_MSG("(~w) Failed TCP accept: ~w", @@ -146,6 +147,7 @@ accept_ssl(ListenSocket, Module, Opts) -> end, {ok, Pid} = Module:start({ssl, Socket}, Opts), catch ssl:controlling_process(Socket, Pid), + Module:become_controller(Pid), accept_ssl(ListenSocket, Module, Opts); {error, timeout} -> accept_ssl(ListenSocket, Module, Opts); diff --git a/src/ejabberd_receiver.erl b/src/ejabberd_receiver.erl index 204771c1a..d09bef666 100644 --- a/src/ejabberd_receiver.erl +++ b/src/ejabberd_receiver.erl @@ -10,20 +10,68 @@ -author('alexey@sevcom.net'). -vsn('$Revision$ '). +-behaviour(gen_server). + +%% API -export([start/3, - receiver/4, change_shaper/2, reset_stream/1, - starttls/2]). + starttls/2, + become_controller/1, + close/1]). --include("ejabberd.hrl"). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). +-include("ejabberd.hrl"). +-record(state, {socket, + sock_mod, + shaper_state, + c2s_pid, + xml_stream_state, + timeout}). + +%%==================================================================== +%% API +%%==================================================================== +%%-------------------------------------------------------------------- +%% Function: start() -> {ok,Pid} | ignore | {error,Error} +%% Description: Starts the server +%%-------------------------------------------------------------------- start(Socket, SockMod, Shaper) -> - proc_lib:spawn(?MODULE, receiver, [Socket, SockMod, Shaper, self()]). + {ok, Pid} = gen_server:start( + ?MODULE, [Socket, SockMod, Shaper, self()], []), + Pid. + +change_shaper(Pid, Shaper) -> + gen_server:cast(Pid, {change_shaper, Shaper}). +reset_stream(Pid) -> + gen_server:call(Pid, reset_stream). -receiver(Socket, SockMod, Shaper, C2SPid) -> +starttls(Pid, TLSSocket) -> + gen_server:call(Pid, {starttls, TLSSocket}). + +become_controller(Pid) -> + gen_server:call(Pid, become_controller). + +close(Pid) -> + gen_server:cast(Pid, close). + +%%==================================================================== +%% gen_server callbacks +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: init(Args) -> {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%-------------------------------------------------------------------- +init([Socket, SockMod, Shaper, C2SPid]) -> XMLStreamState = xml_stream:new(C2SPid), ShaperState = shaper:new(Shaper), Timeout = case SockMod of @@ -32,77 +80,149 @@ receiver(Socket, SockMod, Shaper, C2SPid) -> _ -> infinity end, - receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout). - -receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout) -> - Res = (catch SockMod:recv(Socket, 0, Timeout)), - receive - {starttls, TLSSocket} -> - xml_stream:close(XMLStreamState), - XMLStreamState1 = xml_stream:new(C2SPid), - TLSRes = case Res of - {ok, Data} -> - tls:recv_data(TLSSocket, Data); - _ -> - tls:recv_data(TLSSocket, "") - end, - receiver1(TLSSocket, tls, - ShaperState, C2SPid, XMLStreamState1, Timeout, - TLSRes); - {change_timeout, NewTimeout} -> % Dirty hack - receiver1(Socket, SockMod, - ShaperState, C2SPid, XMLStreamState, NewTimeout, - Res) - after 0 -> - receiver1(Socket, SockMod, - ShaperState, C2SPid, XMLStreamState, Timeout, - Res) - end. - - -receiver1(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, Timeout, Res) -> - case Res of - {ok, Text} -> - ShaperSt1 = receive - {change_shaper, Shaper} -> - shaper:new(Shaper) - after 0 -> - ShaperState - end, - NewShaperState = shaper:update(ShaperSt1, size(Text)), - XMLStreamState1 = receive - reset_stream -> - xml_stream:close(XMLStreamState), - xml_stream:new(C2SPid) - after 0 -> - XMLStreamState - end, - XMLStreamState2 = xml_stream:parse(XMLStreamState1, Text), - receiver(Socket, SockMod, NewShaperState, C2SPid, XMLStreamState2, - Timeout); - {error, timeout} -> - receiver(Socket, SockMod, ShaperState, C2SPid, XMLStreamState, - Timeout); - {error, Reason} -> - xml_stream:close(XMLStreamState), - gen_fsm:send_event(C2SPid, closed), - ok; - {'EXIT', Reason} -> - ?ERROR_MSG("(~w) abnormal ~w:recv termination:~n\t~p~n", - [Socket, SockMod, Reason]), - xml_stream:close(XMLStreamState), - gen_fsm:send_event(C2SPid, closed), - ok + {ok, #state{socket = Socket, + sock_mod = SockMod, + shaper_state = ShaperState, + c2s_pid = C2SPid, + xml_stream_state = XMLStreamState, + timeout = Timeout}}. + +%%-------------------------------------------------------------------- +%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | +%% {stop, Reason, State} +%% Description: Handling call messages +%%-------------------------------------------------------------------- +handle_call({starttls, TLSSocket}, _From, + #state{xml_stream_state = XMLStreamState, + c2s_pid = C2SPid} = State) -> + xml_stream:close(XMLStreamState), + NewXMLStreamState = xml_stream:new(C2SPid), + NewState = State#state{socket = TLSSocket, + sock_mod = tls, + xml_stream_state = NewXMLStreamState}, + case tls:recv_data(TLSSocket, "") of + {ok, TLSData} -> + {reply, ok, process_data(TLSData, NewState)}; + {error, _Reason} -> + {stop, normal, ok, NewState} + end; +handle_call(reset_stream, _From, + #state{xml_stream_state = XMLStreamState, + c2s_pid = C2SPid} = State) -> + xml_stream:close(XMLStreamState), + NewXMLStreamState = xml_stream:new(C2SPid), + Reply = ok, + {reply, Reply, State#state{xml_stream_state = NewXMLStreamState}}; +handle_call(become_controller, _From, State) -> + activate_socket(State), + Reply = ok, + {reply, Reply, State}; +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_cast(Msg, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling cast messages +%%-------------------------------------------------------------------- +handle_cast({change_shaper, Shaper}, State) -> + NewShaperState = shaper:new(Shaper), + {noreply, State#state{shaper_state = NewShaperState}}; +handle_cast(close, State) -> + {stop, normal, State}; +handle_cast(_Msg, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: handle_info(Info, State) -> {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} +%% Description: Handling all non call/cast messages +%%-------------------------------------------------------------------- +handle_info({Tag, _TCPSocket, Data}, + #state{socket = Socket, + sock_mod = SockMod} = State) + when (Tag == tcp) or (Tag == ssl) -> + case SockMod of + tls -> + case tls:recv_data(Socket, Data) of + {ok, TLSData} -> + {noreply, process_data(TLSData, State)}; + {error, _Reason} -> + {stop, normal, State} + end; + _ -> + {noreply, process_data(Data, State)} + end; +handle_info({Tag, _TCPSocket}, State) + when (Tag == tcp_closed) or (Tag == ssl_closed) -> + {stop, normal, State}; +handle_info({Tag, _TCPSocket, Reason}, State) + when (Tag == tcp_error) or (Tag == ssl_error) -> + case Reason of + timeout -> + {noreply, State}; + _ -> + {stop, normal, State} + end; +handle_info({timeout, _Ref, activate}, State) -> + activate_socket(State), + {noreply, State}; +handle_info(_Info, State) -> + {noreply, State}. + +%%-------------------------------------------------------------------- +%% Function: terminate(Reason, State) -> void() +%% Description: This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any necessary +%% cleaning up. When it returns, the gen_server terminates with Reason. +%% The return value is ignored. +%%-------------------------------------------------------------------- +terminate(_Reason, #state{xml_stream_state = XMLStreamState, + c2s_pid = C2SPid} = State) -> + xml_stream:close(XMLStreamState), + gen_fsm:send_event(C2SPid, closed), + catch (State#state.sock_mod):close(State#state.socket), + ok. + +%%-------------------------------------------------------------------- +%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState} +%% Description: Convert process state when code is changed +%%-------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + +activate_socket(#state{socket = Socket, + sock_mod = SockMod}) -> + case SockMod of + gen_tcp -> + inet:setopts(Socket, [{active, once}]); + _ -> + SockMod:setopts(Socket, [{active, once}]) end. -change_shaper(Pid, Shaper) -> - Pid ! {change_shaper, Shaper}. - -reset_stream(Pid) -> - Pid ! reset_stream. - -starttls(Pid, TLSSocket) -> - Pid ! {starttls, TLSSocket}. - +process_data(Data, + #state{xml_stream_state = XMLStreamState, + shaper_state = ShaperState} = State) -> + XMLStreamState1 = xml_stream:parse(XMLStreamState, Data), + {NewShaperState, Pause} = shaper:update(ShaperState, size(Data)), + if + Pause > 0 -> + erlang:start_timer(Pause, self(), activate); + true -> + activate_socket(State) + end, + State#state{xml_stream_state = XMLStreamState1, + shaper_state = NewShaperState}. diff --git a/src/ejabberd_s2s_in.erl b/src/ejabberd_s2s_in.erl index d8280d1fb..ed4a7af0f 100644 --- a/src/ejabberd_s2s_in.erl +++ b/src/ejabberd_s2s_in.erl @@ -1,7 +1,7 @@ %%%---------------------------------------------------------------------- %%% File : ejabberd_s2s_in.erl %%% Author : Alexey Shchepin <alexey@sevcom.net> -%%% Purpose : +%%% Purpose : Serve incoming s2s connection %%% Created : 6 Dec 2002 by Alexey Shchepin <alexey@sevcom.net> %%% Id : $Id$ %%%---------------------------------------------------------------------- @@ -14,7 +14,9 @@ %% External exports -export([start/2, - start_link/2,match_domain/2]). + start_link/2, + become_controller/1, + match_domain/2]). %% gen_fsm callbacks -export([init/1, @@ -29,7 +31,6 @@ -include("ejabberd.hrl"). -include("jlib.hrl"). -%-include_lib("ssl/pkix/SSL-PKIX.hrl"). -include_lib("ssl/pkix/PKIX1Explicit88.hrl"). -include_lib("ssl/pkix/PKIX1Implicit88.hrl"). -include("XmppAddr.hrl"). @@ -87,6 +88,9 @@ start(SockData, Opts) -> start_link(SockData, Opts) -> gen_fsm:start_link(ejabberd_s2s_in, [SockData, Opts], ?FSMOPTS). +become_controller(Pid) -> + gen_fsm:send_all_state_event(Pid, become_controller). + %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm %%%---------------------------------------------------------------------- @@ -455,6 +459,12 @@ stream_established(closed, StateData) -> %% {next_state, NextStateName, NextStateData, Timeout} | %% {stop, Reason, NewStateData} %%---------------------------------------------------------------------- +handle_event(become_controller, StateName, StateData) -> + ok = (StateData#state.sockmod):controlling_process( + StateData#state.socket, + StateData#state.receiver), + ejabberd_receiver:become_controller(StateData#state.receiver), + {next_state, StateName, StateData}; handle_event(_Event, StateName, StateData) -> {next_state, StateName, StateData}. @@ -499,7 +509,7 @@ handle_info(_, StateName, StateData) -> %%---------------------------------------------------------------------- terminate(Reason, _StateName, StateData) -> ?INFO_MSG("terminated: ~p", [Reason]), - (StateData#state.sockmod):close(StateData#state.socket), + ejabberd_receiver:close(StateData#state.receiver), ok. %%%---------------------------------------------------------------------- diff --git a/src/ejabberd_s2s_out.erl b/src/ejabberd_s2s_out.erl index 6b3fb659d..13ffb81a5 100644 --- a/src/ejabberd_s2s_out.erl +++ b/src/ejabberd_s2s_out.erl @@ -164,6 +164,8 @@ open_socket(init, StateData) -> case Res of {ok, Socket} -> ReceiverPid = ejabberd_receiver:start(Socket, gen_tcp, none), + ok = gen_tcp:controlling_process(Socket, ReceiverPid), + ejabberd_receiver:become_controller(ReceiverPid), Version = if StateData#state.use_v10 -> " version='1.0'"; @@ -342,7 +344,6 @@ wait_for_features({xmlstreamelement, El}, StateData) -> StateData#state{try_auth = false}}; StartTLS and StateData#state.tls and (not StateData#state.tls_enabled) -> - StateData#state.receiver ! {change_timeout, 100}, send_element(StateData, {xmlelement, "starttls", [{"xmlns", ?NS_TLS}], []}), @@ -462,7 +463,6 @@ wait_for_starttls_proceed({xmlstreamelement, El}, StateData) -> {ok, TLSSocket} = tls:tcp_to_tls(Socket, TLSOpts), ejabberd_receiver:starttls( StateData#state.receiver, TLSSocket), - StateData#state.receiver ! {change_timeout, infinity}, NewStateData = StateData#state{sockmod = tls, socket = TLSSocket, streamid = new_id(), @@ -630,8 +630,7 @@ handle_info(_, StateName, StateData) -> %%---------------------------------------------------------------------- terminate(Reason, StateName, StateData) -> ?INFO_MSG("terminated: ~p", [Reason]), - Error = ?ERR_REMOTE_SERVER_NOT_FOUND, - bounce_queue(StateData#state.queue, Error), + bounce_queue(StateData#state.queue, ?ERR_REMOTE_SERVER_NOT_FOUND), case StateData#state.new of false -> ok; @@ -642,8 +641,8 @@ terminate(Reason, StateName, StateData) -> case StateData#state.socket of undefined -> ok; - Socket -> - (StateData#state.sockmod):close(Socket) + _Socket -> + ejabberd_receiver:close(StateData#state.receiver) end, ok. diff --git a/src/ejabberd_service.erl b/src/ejabberd_service.erl index d1003295b..a8c1989b4 100644 --- a/src/ejabberd_service.erl +++ b/src/ejabberd_service.erl @@ -13,7 +13,12 @@ -behaviour(gen_fsm). %% External exports --export([start/2, start_link/2, receiver/3, send_text/2, send_element/2]). +-export([start/2, + start_link/2, + receiver/3, + send_text/2, + send_element/2, + become_controller/1]). %% gen_fsm callbacks -export([init/1, @@ -75,6 +80,9 @@ start(SockData, Opts) -> start_link(SockData, Opts) -> gen_fsm:start_link(ejabberd_service, [SockData, Opts], ?FSMOPTS). +become_controller(_Pid) -> + ok. + %%%---------------------------------------------------------------------- %%% Callback functions from gen_fsm %%%---------------------------------------------------------------------- @@ -175,7 +183,7 @@ wait_for_handshake({xmlstreamelement, El}, StateData) -> {stop, normal, StateData} end; _ -> - {next_state, wait_for_key, StateData} + {next_state, wait_for_handshake, StateData} end; wait_for_handshake({xmlstreamend, _Name}, StateData) -> diff --git a/src/shaper.erl b/src/shaper.erl index 811315f0f..19a557c73 100644 --- a/src/shaper.erl +++ b/src/shaper.erl @@ -33,25 +33,26 @@ new1({maxrate, MaxRate}) -> lasttime = now_to_usec(now())}. -update(none, Size) -> - none; +update(none, _Size) -> + {none, 0}; update(#maxrate{} = State, Size) -> MinInterv = 1000 * Size / (2 * State#maxrate.maxrate - State#maxrate.lastrate), Interv = (now_to_usec(now()) - State#maxrate.lasttime) / 1000, %io:format("State: ~p, Size=~p~nM=~p, I=~p~n", % [State, Size, MinInterv, Interv]), - if - MinInterv > Interv -> - timer:sleep(1 + trunc(MinInterv - Interv)); - true -> - ok - end, - Now = now_to_usec(now()), - State#maxrate{ - lastrate = (State#maxrate.lastrate + - 1000000 * Size / (Now - State#maxrate.lasttime))/2, - lasttime = Now}. + Pause = if + MinInterv > Interv -> + 1 + trunc(MinInterv - Interv); + true -> + 0 + end, + NextNow = now_to_usec(now()) + Pause * 1000, + {State#maxrate{ + lastrate = (State#maxrate.lastrate + + 1000000 * Size / (NextNow - State#maxrate.lasttime))/2, + lasttime = NextNow}, + Pause}. now_to_usec({MSec, Sec, USec}) -> diff --git a/src/tls/tls.erl b/src/tls/tls.erl index 7290160e1..7c7d07bad 100644 --- a/src/tls/tls.erl +++ b/src/tls/tls.erl @@ -16,6 +16,8 @@ tcp_to_tls/2, tls_to_tcp/1, send/2, recv/2, recv/3, recv_data/2, + setopts/2, + controlling_process/2, close/1, get_peer_certificate/1, get_verify_result/1, @@ -175,6 +177,12 @@ send(#tlssock{tcpsock = TCPSocket, tlsport = Port}, Packet) -> end. +setopts(#tlssock{tcpsock = TCPSocket}, Opts) -> + inet:setopts(TCPSocket, Opts). + +controlling_process(#tlssock{tcpsock = TCPSocket}, Pid) -> + gen_tcp:controlling_process(TCPSocket, Pid). + close(#tlssock{tcpsock = TCPSocket, tlsport = Port}) -> gen_tcp:close(TCPSocket), port_close(Port). diff --git a/src/web/ejabberd_http.erl b/src/web/ejabberd_http.erl index 7d1f074e8..9e166d694 100644 --- a/src/web/ejabberd_http.erl +++ b/src/web/ejabberd_http.erl @@ -13,6 +13,7 @@ %% External exports -export([start/2, start_link/2, + become_controller/1, receive_headers/1, url_encode/1]). @@ -81,6 +82,9 @@ start_link({SockMod, Socket}, Opts) -> use_web_admin = UseWebAdmin}])}. +become_controller(_Pid) -> + ok. + send_text(State, Text) -> (State#state.sockmod):send(State#state.socket, Text). |