Welcome to mirror list, hosted at ThFree Co, Russian Federation.

pgsql_tcp.erl « pgsql « src - github.com/processone/ejabberd.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 21740258c58495a16cd2377ce3d0b59aa93f4d13 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
%%% File    : pgsql_tcp.erl
%%% Author  : Blah <cos@local>
%%% Description : Unwrapping of TCP line protocol packages to postgres messages.
%%% Created : 22 Jul 2005

-module(pgsql_tcp).

-behaviour(gen_server).

-export([start/3, start_link/3]).

%% gen_server callbacks
-export([init/1,
	 handle_call/3,
	 handle_cast/2,
	 code_change/3,
	 handle_info/2,
	 terminate/2]).

-record(state, {socket, protopid, buffer, as_binary}).

start(Sock, ProtoPid, AsBin) ->
    gen_server:start(?MODULE, [Sock, ProtoPid, AsBin], []).

start_link(Sock, ProtoPid, AsBin) ->
    gen_server:start_link(?MODULE, [Sock, ProtoPid, AsBin], []).

init([Sock, ProtoPid, AsBin]) ->
    inet:setopts(Sock, [{active, once}]),
    {ok, #state{socket = Sock, protopid = ProtoPid,
                buffer = <<>>, as_binary = AsBin}}.

handle_call(_Request, _From, State) ->
    Reply = ok,
    {reply, Reply, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

handle_info({tcp, Sock, Bin},
	    #state{socket = Sock,
		   protopid = ProtoPid,
                   as_binary = AsBin,
		   buffer = Buffer} = State) ->
    {ok, Rest} = process_buffer(ProtoPid, AsBin, <<Buffer/binary, Bin/binary>>),
    inet:setopts(Sock, [{active, once}]),
    {noreply, State#state{buffer = Rest}};
handle_info({tcp_closed, Sock},
	    #state{socket = Sock,
		   protopid = ProtoPid} = State) ->
    io:format("Sock closed~n", []),
    ProtoPid ! {socket, Sock, closed},
    {stop, tcp_close, State};
handle_info({tcp_error, Sock, Reason},
	    #state{socket = Sock,
		   protopid = ProtoPid} = State) ->
    io:format("Sock error~n", []),
    ProtoPid ! {socket, Sock, {error, Reason}},
    {stop, tcp_error, State};
handle_info(_Info, State) ->
    {noreply, State}.


terminate(_Reason, _State) ->
    ok.


%% Given a binary that begins with a proper message header the binary
%% will be processed for each full message it contains, and it will
%% return any trailing incomplete messages.
process_buffer(ProtoPid, AsBin,
               Bin = <<Code:8/integer, Size:4/integer-unit:8, Rest/binary>>) ->
    Payload = Size - 4,
    if
	size(Rest) >= Payload ->
	    <<Packet:Payload/binary, Rest1/binary>> = Rest,
	    {ok, Message} = pgsql_proto:decode_packet(Code, Packet, AsBin),
	    ProtoPid ! {pgsql, Message},
	    process_buffer(ProtoPid, AsBin, Rest1);
	true ->
	    {ok, Bin}
    end;
process_buffer(_ProtoPid, _AsBin, Bin) when is_binary(Bin) ->
    {ok, Bin}.