Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBadlop <badlop@process-one.net>2009-03-06 14:42:56 +0300
committerBadlop <badlop@process-one.net>2009-03-06 14:42:56 +0300
commit75a8ebf29320e70352000a72509bb30b0bbf6435 (patch)
tree9d507119cc2904db5ba27b95a85845af119f85bb
parent4e17b1463b5a291c0c31d05c7c5ab26ce5757f47 (diff)
Merge 1855 from trunk.
* src/eldap/eldap.erl: implemented queue for pending queries (thanks to Evgeniy Khramtsov) SVN Revision: 1973
-rw-r--r--ChangeLog3
-rw-r--r--src/eldap/eldap.erl208
2 files changed, 109 insertions, 102 deletions
diff --git a/ChangeLog b/ChangeLog
index 1ab428396..ff3490c77 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,8 @@
2009-03-06 Badlop <badlop@process-one.net>
+ * src/eldap/eldap.erl: implemented queue for pending
+ queries (thanks to Evgeniy Khramtsov)
+
* src/eldap/eldap.erl: Close a connection on tcp_error (thanks to
Evgeniy Khramtsov)
diff --git a/src/eldap/eldap.erl b/src/eldap/eldap.erl
index 799012fc5..24e234cf7 100644
--- a/src/eldap/eldap.erl
+++ b/src/eldap/eldap.erl
@@ -85,6 +85,10 @@
-define(RETRY_TIMEOUT, 500).
-define(BIND_TIMEOUT, 10000).
-define(CMD_TIMEOUT, 100000).
+%% Used in gen_fsm sync calls.
+-define(CALL_TIMEOUT, ?CMD_TIMEOUT + ?BIND_TIMEOUT + ?RETRY_TIMEOUT).
+%% Used as a timeout for gen_tcp:send/2
+-define(SEND_TIMEOUT, 30000).
-define(MAX_TRANSACTION_ID, 65535).
-define(MIN_TRANSACTION_ID, 0).
@@ -98,7 +102,7 @@
id = 0, % LDAP Request ID
bind_timer, % Ref to bind timeout
dict, % dict holding operation params and results
- bind_q % Queue for bind() requests
+ req_q % Queue for requests
}).
%%%----------------------------------------------------------------------
@@ -141,7 +145,8 @@ close(Handle) ->
%%% --------------------------------------------------------------------
add(Handle, Entry, Attributes) when list(Entry),list(Attributes) ->
Handle1 = get_handle(Handle),
- gen_fsm:sync_send_event(Handle1, {add, Entry, add_attrs(Attributes)}).
+ gen_fsm:sync_send_event(Handle1, {add, Entry, add_attrs(Attributes)},
+ ?CALL_TIMEOUT).
%%% Do sanity check !
add_attrs(Attrs) ->
@@ -166,7 +171,7 @@ add_attrs(Attrs) ->
%%% --------------------------------------------------------------------
delete(Handle, Entry) when list(Entry) ->
Handle1 = get_handle(Handle),
- gen_fsm:sync_send_event(Handle1, {delete, Entry}).
+ gen_fsm:sync_send_event(Handle1, {delete, Entry}, ?CALL_TIMEOUT).
%%% --------------------------------------------------------------------
%%% Modify an entry. Given an entry a number of modification
@@ -181,7 +186,7 @@ delete(Handle, Entry) when list(Entry) ->
%%% --------------------------------------------------------------------
modify(Handle, Object, Mods) when list(Object), list(Mods) ->
Handle1 = get_handle(Handle),
- gen_fsm:sync_send_event(Handle1, {modify, Object, Mods}).
+ gen_fsm:sync_send_event(Handle1, {modify, Object, Mods}, ?CALL_TIMEOUT).
%%%
%%% Modification operations.
@@ -214,7 +219,10 @@ m(Operation, Type, Values) ->
modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup)
when list(Entry),list(NewRDN),atom(DelOldRDN),list(NewSup) ->
Handle1 = get_handle(Handle),
- gen_fsm:sync_send_event(Handle1, {modify_dn, Entry, NewRDN, bool_p(DelOldRDN), optional(NewSup)}).
+ gen_fsm:sync_send_event(
+ Handle1,
+ {modify_dn, Entry, NewRDN, bool_p(DelOldRDN), optional(NewSup)},
+ ?CALL_TIMEOUT).
%%% --------------------------------------------------------------------
@@ -228,7 +236,7 @@ modify_dn(Handle, Entry, NewRDN, DelOldRDN, NewSup)
bind(Handle, RootDN, Passwd)
when list(RootDN),list(Passwd) ->
Handle1 = get_handle(Handle),
- gen_fsm:sync_send_event(Handle1, {bind, RootDN, Passwd}, infinity).
+ gen_fsm:sync_send_event(Handle1, {bind, RootDN, Passwd}, ?CALL_TIMEOUT).
%%% Sanity checks !
@@ -273,7 +281,7 @@ search(Handle, L) when list(L) ->
call_search(Handle, A) ->
Handle1 = get_handle(Handle),
- gen_fsm:sync_send_event(Handle1, {search, A}, infinity).
+ gen_fsm:sync_send_event(Handle1, {search, A}, ?CALL_TIMEOUT).
parse_search_args(Args) ->
parse_search_args(Args, #eldap_search{scope = wholeSubtree}).
@@ -382,7 +390,7 @@ init({Hosts, Port, Rootdn, Passwd}) ->
passwd = Passwd,
id = 0,
dict = dict:new(),
- bind_q = queue:new()}, 0}.
+ req_q = queue:new()}, 0}.
%%----------------------------------------------------------------------
%% Func: StateName/2
@@ -405,38 +413,20 @@ connecting(timeout, S) ->
%% {stop, Reason, NewStateData} |
%% {stop, Reason, Reply, NewStateData}
%%----------------------------------------------------------------------
-connecting(_Event, _From, S) ->
- Reply = {error, connecting},
- {reply, Reply, connecting, S}.
+connecting(Event, From, S) ->
+ Q = queue:in({Event, From}, S#eldap.req_q),
+ {next_state, connecting, S#eldap{req_q=Q}}.
-wait_bind_response(_Event, _From, S) ->
- Reply = {error, wait_bind_response},
- {reply, Reply, wait_bind_response, S}.
+wait_bind_response(Event, From, S) ->
+ Q = queue:in({Event, From}, S#eldap.req_q),
+ {next_state, wait_bind_response, S#eldap{req_q=Q}}.
-active(Event, From, S) ->
- case catch send_command(Event, From, S) of
- {ok, NewS} ->
- case Event of
- {bind, _, _} ->
- {next_state, active_bind, NewS};
- _ ->
- {next_state, active, NewS}
- end;
- {error, Reason} ->
- {reply, {error, Reason}, active, S};
- {'EXIT', Reason} ->
- {reply, {error, Reason}, active, S}
- end.
-
-active_bind({bind, RootDN, Passwd}, From, #eldap{bind_q=Q} = S) ->
- NewQ = queue:in({{bind, RootDN, Passwd}, From}, Q),
- {next_state, active_bind, S#eldap{bind_q=NewQ}};
active_bind(Event, From, S) ->
- case catch send_command(Event, From, S) of
- {ok, NewS} -> {next_state, active_bind, NewS};
- {error, Reason} -> {reply, {error, Reason}, active_bind, S};
- {'EXIT', Reason} -> {reply, {error, Reason}, active_bind, S}
- end.
+ Q = queue:in({Event, From}, S#eldap.req_q),
+ {next_state, active_bind, S#eldap{req_q=Q}}.
+
+active(Event, From, S) ->
+ process_command(S, Event, From).
%%----------------------------------------------------------------------
%% Func: handle_event/3
@@ -446,21 +436,8 @@ active_bind(Event, From, S) ->
%% {stop, Reason, NewStateData}
%%----------------------------------------------------------------------
handle_event(close, _StateName, S) ->
- gen_tcp:close(S#eldap.fd),
- {stop, closed, S};
-
-handle_event(process_bind_q, active_bind, #eldap{bind_q=Q} = S) ->
- case queue:out(Q) of
- {{value, {BindEvent, To}}, NewQ} ->
- NewStateData = case catch send_command(BindEvent, To, S) of
- {ok, NewS} -> NewS;
- {error, Reason} -> gen_fsm:reply(To, {error, Reason}), S;
- {'EXIT', Reason} -> gen_fsm:reply(To, {error, Reason}), S
- end,
- {next_state, active_bind, NewStateData#eldap{bind_q=NewQ}};
- {empty, Q} ->
- {next_state, active, S}
- end;
+ catch gen_tcp:close(S#eldap.fd),
+ {stop, normal, S};
handle_event(_Event, StateName, S) ->
{next_state, StateName, S}.
@@ -489,50 +466,61 @@ handle_sync_event(_Event, _From, StateName, S) ->
%% Packets arriving in various states
%%
handle_info({tcp, _Socket, Data}, connecting, S) ->
- ?DEBUG("eldap. tcp packet received when disconnected!~n~p", [Data]),
+ ?DEBUG("tcp packet received when disconnected!~n~p", [Data]),
{next_state, connecting, S};
handle_info({tcp, _Socket, Data}, wait_bind_response, S) ->
cancel_timer(S#eldap.bind_timer),
case catch recvd_wait_bind_response(Data, S) of
- bound -> {next_state, active, S};
- {fail_bind, _Reason} -> close_and_retry(S),
- {next_state, connecting, S#eldap{fd = null}};
- {'EXIT', _Reason} -> close_and_retry(S),
- {next_state, connecting, S#eldap{fd = null}};
- {error, _Reason} -> close_and_retry(S),
- {next_state, connecting, S#eldap{fd = null}}
+ bound ->
+ dequeue_commands(S);
+ {fail_bind, _Reason} ->
+ {next_state, connecting, close_and_retry(S)};
+ {'EXIT', _Reason} ->
+ {next_state, connecting, close_and_retry(S)};
+ {error, _Reason} ->
+ {next_state, connecting, close_and_retry(S)}
end;
handle_info({tcp, _Socket, Data}, StateName, S)
- when StateName==active; StateName==active_bind ->
+ when StateName == active orelse StateName == active_bind ->
case catch recvd_packet(Data, S) of
- {reply, Reply, To, NewS} -> gen_fsm:reply(To, Reply),
- {next_state, StateName, NewS};
- {ok, NewS} -> {next_state, StateName, NewS};
- {'EXIT', _Reason} -> {next_state, StateName, S};
- {error, _Reason} -> {next_state, StateName, S}
+ {response, Response, RequestType} ->
+ NewS = case Response of
+ {reply, Reply, To, S1} ->
+ gen_fsm:reply(To, Reply),
+ S1;
+ {ok, S1} ->
+ S1
+ end,
+ if (StateName == active_bind andalso
+ RequestType == bindRequest) orelse
+ (StateName == active) ->
+ dequeue_commands(NewS);
+ true ->
+ {next_state, StateName, NewS}
+ end;
+ _ ->
+ {next_state, StateName, S}
end;
handle_info({tcp_closed, _Socket}, Fsm_state, S) ->
?WARNING_MSG("LDAP server closed the connection: ~s:~p~nIn State: ~p",
[S#eldap.host, S#eldap.port ,Fsm_state]),
- {ok, NextState, NewS} = close_and_rebind(S, tcp_closed),
- {next_state, NextState, NewS};
+ {next_state, connecting, close_and_retry(S)};
handle_info({tcp_error, _Socket, Reason}, Fsm_state, S) ->
?DEBUG("eldap received tcp_error: ~p~nIn State: ~p", [Reason, Fsm_state]),
- {ok, NextState, NewS} = close_and_rebind(S, tcp_error),
- {next_state, NextState, NewS};
+ {next_state, connecting, close_and_retry(S)};
%%
%% Timers
%%
-handle_info({timeout, Timer, {cmd_timeout, Id}}, active, S) ->
+handle_info({timeout, Timer, {cmd_timeout, Id}}, StateName, S) ->
case cmd_timeout(Timer, Id, S) of
{reply, To, Reason, NewS} -> gen_fsm:reply(To, Reason),
- {next_state, active, NewS};
- {error, _Reason} -> {next_state, active, S}
+ {next_state, StateName, NewS};
+ {error, _Reason} -> {next_state, StateName, S}
end;
handle_info({timeout, retry_connect}, connecting, S) ->
@@ -540,8 +528,7 @@ handle_info({timeout, retry_connect}, connecting, S) ->
{next_state, NextState, NewS};
handle_info({timeout, _Timer, bind_timeout}, wait_bind_response, S) ->
- close_and_retry(S),
- {next_state, connecting, S#eldap{fd = null}};
+ {next_state, connecting, close_and_retry(S)};
%%
%% Make sure we don't fill the message queue with rubbish
@@ -570,6 +557,34 @@ code_change(_OldVsn, StateName, S, _Extra) ->
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
+dequeue_commands(S) ->
+ case queue:out(S#eldap.req_q) of
+ {{value, {Event, From}}, Q} ->
+ case process_command(S#eldap{req_q=Q}, Event, From) of
+ {_, active, NewS} ->
+ dequeue_commands(NewS);
+ Res ->
+ Res
+ end;
+ {empty, _} ->
+ {next_state, active, S}
+ end.
+
+process_command(S, Event, From) ->
+ case send_command(Event, From, S) of
+ {ok, NewS} ->
+ case Event of
+ {bind, _, _} ->
+ {next_state, active_bind, NewS};
+ _ ->
+ {next_state, active, NewS}
+ end;
+ {error, _Reason} ->
+ Q = queue:in_r({Event, From}, S#eldap.req_q),
+ NewS = close_and_retry(S#eldap{req_q=Q}),
+ {next_state, connecting, NewS}
+ end.
+
send_command(Command, From, S) ->
Id = bump_id(S),
{Name, Request} = gen_req(Command),
@@ -640,6 +655,7 @@ recvd_packet(Pkt, S) ->
Dict = S#eldap.dict,
Id = Msg#'LDAPMessage'.messageID,
{Timer, From, Name, Result_so_far} = get_op_rec(Id, Dict),
+ Answer =
case {Name, Op} of
{searchRequest, {searchResEntry, R}} when
record(R,'SearchResultEntry') ->
@@ -687,14 +703,14 @@ recvd_packet(Pkt, S) ->
New_dict = dict:erase(Id, Dict),
cancel_timer(Timer),
Reply = check_bind_reply(Result, From),
- gen_fsm:send_all_state_event(self(), process_bind_q),
{reply, Reply, From, S#eldap{dict = New_dict}};
{OtherName, OtherResult} ->
New_dict = dict:erase(Id, Dict),
cancel_timer(Timer),
{reply, {error, {invalid_result, OtherName, OtherResult}},
From, S#eldap{dict = New_dict}}
- end;
+ end,
+ {response, Answer, Name};
Error -> Error
end.
@@ -775,13 +791,9 @@ check_tag(Data) ->
end.
close_and_retry(S) ->
- gen_tcp:close(S#eldap.fd),
- retry_connect().
-
-retry_connect() ->
- erlang:send_after(?RETRY_TIMEOUT, self(),
- {timeout, retry_connect}).
-
+ catch gen_tcp:close(S#eldap.fd),
+ erlang:send_after(?RETRY_TIMEOUT, self(), {timeout, retry_connect}),
+ S#eldap{fd = null}.
%%-----------------------------------------------------------------------
%% Sort out timed out commands
@@ -832,7 +844,8 @@ polish([], Res, Ref) ->
%%-----------------------------------------------------------------------
connect_bind(S) ->
Host = next_host(S#eldap.host, S#eldap.hosts),
- TcpOpts = [{packet, asn1}, {active, true}, {keepalive, true}, binary],
+ TcpOpts = [{packet, asn1}, {active, true}, {keepalive, true},
+ {send_timeout, ?SEND_TIMEOUT}, binary],
?INFO_MSG("LDAP connection on ~s:~p", [Host, S#eldap.port]),
case gen_tcp:connect(Host, S#eldap.port, TcpOpts) of
{ok, Socket} ->
@@ -844,15 +857,16 @@ connect_bind(S) ->
host = Host,
bind_timer = Timer}};
{error, Reason} ->
- ?ERROR_MSG("LDAP bind failed on ~s:~p~nReason: ~p", [Host, S#eldap.port, Reason]),
- gen_tcp:close(Socket),
- retry_connect(),
- {ok, connecting, S#eldap{host = Host}}
+ ?ERROR_MSG("LDAP bind failed on ~s:~p~nReason: ~p",
+ [Host, S#eldap.port, Reason]),
+ NewS = close_and_retry(S),
+ {ok, connecting, NewS#eldap{host = Host}}
end;
{error, Reason} ->
- ?ERROR_MSG("LDAP connection failed on ~s:~p~nReason: ~p", [Host, S#eldap.port, Reason]),
- retry_connect(),
- {ok, connecting, S#eldap{host = Host}}
+ ?ERROR_MSG("LDAP connection failed on ~s:~p~nReason: ~p",
+ [Host, S#eldap.port, Reason]),
+ NewS = close_and_retry(S),
+ {ok, connecting, NewS#eldap{host = Host}}
end.
bind_request(Socket, S) ->
@@ -997,13 +1011,3 @@ bump_id(#eldap{id = Id}) when Id > ?MAX_TRANSACTION_ID ->
?MIN_TRANSACTION_ID;
bump_id(#eldap{id = Id}) ->
Id + 1.
-
-close_and_rebind(State, Err) ->
- F = fun(_Id, [{Timer, From, _Name}|_]) ->
- gen_fsm:reply(From, {error, Err}),
- cancel_timer(Timer)
- end,
- dict:map(F, State#eldap.dict),
- connect_bind(State#eldap{fd = null,
- dict = dict:new(),
- bind_q=queue:new()}).