Closing the gen_server socket

What I'm trying to do is the gen_server process to accept a new client and immediately give birth to a new child to handle the next. The problem that I see is that when the socket terminates and therefore ends, it also closes the listening socket, and I cannot understand why, although it no longer refers to it.

Any idea what I'm doing wrong?

gen_server:

-module(simple_tcp). -behaviour(gen_server). %% API -export([start_link/1, stop/0, start/0, start/1]). %% gen-server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). -define(DEFAULT_PORT, 1055). -record(state, {port, lsock}). start_link({port, Port}) -> gen_server:start_link(?MODULE, [{port, Port}], []); start_link({socket, Socket}) -> gen_server:start_link(?MODULE, [{socket, Socket}], []). start({port, Port}) -> simple_tcp_sup:start_child({port, Port}); start({socket, Socket}) -> simple_tcp_sup:start_child({socket, Socket}). start() -> start({port, ?DEFAULT_PORT}). stop() -> gen_server:cast(?SERVER, stop). % Callback functions init([{port, Port}]) -> {ok, LSock} = gen_tcp:listen(Port, [{active, true},{reuseaddr, true}]), init([{socket, LSock}]); init([{socket, Socket}]) -> io:fwrite("Starting server with socket: ~p~n", [self()]), {ok, Port} = inet:port(Socket), {ok, #state{port=Port, lsock=Socket}, 0}. handle_call(_Msg, _From, State) -> {noreply, State}. handle_cast(stop, State) -> {stop, ok, State}. handle_info({tcp, Socket, RawData}, State) -> gen_tcp:send(Socket, io_lib:fwrite("Received raw data: ~p~n", [RawData])), {noreply, State}; handle_info({tcp_error, _Socket, Reason}, State) -> io:fwrite("Error: ~p~n", [Reason]), {stop, normal, State}; handle_info(timeout, #state{lsock = LSock} = State) -> case gen_tcp:accept(LSock) of {ok, Sock} -> io:fwrite("Accepting connection...~p~n", [self()]), start({socket, LSock}), {noreply, #state{lsock=Sock}}; {error, Reason} -> io:fwrite("Error: ~p, ~p~n", [Reason, self()]), {stop, normal, State} end; handle_info({tcp_closed, _Port}, State) -> io:fwrite("Socket closed: ~p~n", [self()]), simple_tcp_sup:kill_child(self()), {stop, normal, State}. terminate(_Reason, _State) -> io:fwrite("Shutting down server: ~p~n", [self()]), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. 

supervisor:

 -module(simple_tcp_sup). -behaviour(supervisor). -export([start_link/0, start_child/1 ]). -export([init/1]). -define(SERVER, ?MODULE). start_link() -> supervisor:start_link({local, ?SERVER}, ?MODULE, []). start_child({socket, Socket}) -> io:fwrite("Spawning child with socket...~n"), supervisor:start_child(?SERVER, [{socket, Socket}]); start_child({port, Port}) -> io:fwrite("Spawning child with port...~n"), supervisor:start_child(?SERVER, [{port, Port}]). init([]) -> Element = {simple_tcp, {simple_tcp, start_link, []}, temporary, brutal_kill, worker, [simple_tcp]}, Children = [Element], RestartStrategy = {simple_one_for_one, 0, 1}, {ok, {RestartStrategy, Children}}. 
+4
source share
2 answers

Your third handle_info cancels the roles of Sock and LSock . It must pass the Sock child process and leave its state unchanged.

BTW: Bad karma to restore State from scratch ( #state{lsock=Sock} ), you should always #state{lsock=Sock} new State from the current State ( State#state{lsock=Sock} ) if you add more state variables later. In fact, this one has an error right now (albeit benign), since you are throwing away the port number.

+5
source

Well, I suggest that you allow Socket files to be processed by separate processes that exchange asynchronously with gen_server and linked to it. I have an example code snippet that will show you how to do this. The generator starts and starts the TCP listener, which after receiving the listening socket successfully tells our gen_server to change its internal state. I installed the code from top to bottom. All relevant functions have been shown. Focus on socket processing and its interaction with gen_server

  -define (PEER_CLIENT_TIMEOUT, timer: seconds (20)).
 -define (PORT_RANGE, {10245,10265}).
 -define (DEBUG (X, Y), error_logger: info_msg (X, Y)).
 -define (ERROR (L), error_logger: error_report (L)).
 -define (SOCKET_OPTS (IP), [inet, binary, {backlog, 100}, {packet, 0},
                             {reuseaddr, true}, {active, true},
                             {ip, IP}]).

 %% ------------------------------------------------ ----
 %% gen_server starts here ....

 start (PeerName) -> 
     gen_server: start_link ({local,? MODULE} ,? MODULE, PeerName, []).

 %%% -------------------------------------------
 %% Gen_server init / 1 function

 init (PeerName) ->
     process_flag (trap_exit, true),
     %% starting the whole Socket chain below ..
     start_link_listener (),
     %% Socket stuff started, gen_server can now wait for async
     %% messages
     {ok, []}.

 %%% ---- Socket handling functions ---------

 %% Function: start_link_listener / 0
 %% Purpose: Starts the whole chain of listening
 %% and waiting for connections.  Executed
 %% directly by the gen_server process, But
 %% spawns a separate process to do the rest

 start_link_listener () -> 
     Ip_address = get_myaddr (),  
     spawn_link (fun () -> listener (? SOCKET_OPTS (Ip_address)) end).

 %%% ----------------------------------------------   
 %% Function: get_myaddr / 0
 %% Purpose: To pick the active IP address on my machine to
 %% listen on

 get_myaddr () -> 
     ? DEBUG ("Server> Trying to extract My Local Ip Address ....", []),
     {ok, Name} = inet: gethostname (),
     {ok, IP} = inet: getaddr (Name, inet),
     ? DEBUG ("Server> Found Alive Local IP address: ~ p ..... ~ n", [IP]),
     IP

 %%% ----------------------------------------------- ---
 %% Function: listener / 1, executed in a separate process
 %% Purpose: Tries a given? PORT_RANGE, with the given Socket Options
 %% Once it acquires a ListenSocket, it will cast the gen_server!

 listener (SocketOpts) ->
     process_flag (trap_exit, true),
     Ports = lists: seq (element (1,? PORT_RANGE), element (2,? PORT_RANGE)),
     case try_listening (SocketOpts, Ports) of
         {ok, Port, LSocket} ->              
                 PP = proplists: get_value (ip, SocketOpts),
                 ? MODULE: started_listener (Port, PP, LSocket),              
                 accept_connection (LSocket);
         {error, failed} -> {error, failed, SocketOpts}
     end.

 try_listening (_Opts, []) -> {error, failed};
 try_listening (Opts, [Port | Rest]) ->
     case gen_tcp: listen (Port, Opts) of
         {ok, Listen_Socket} -> {ok, Port, Listen_Socket};
         {error, _} -> try_listening (Opts, Rest)
     end.
 %%% ----------------------------------------------- ----------
 %% Helper Functions for Converting IP Address from tuple
 %% to string and vice versa

 str (X) when is_integer (X) -> integer_to_list (X).

 formalise_ipaddress ({A, B, C, D}) -> 
     str (A) ++ "."  ++ str (B) ++ "."  ++ str (C) ++ "."  ++ str (D).

 unformalise_address (String) -> 
     [A, B, C, D] = string: tokens (String, "."),
     {list_to_integer (A), list_to_integer (B), list_to_integer (C), list_to_integer (D)}.

 %%% ----------------------------------------------- ---
 %% Function: get_source_connection / 1
 %% Purpose: Retrieving the IP and Port at the other
 %% end of the connection

 get_source_connection (Socket) ->
     try inet: peername (Socket) of
         {ok, {IP_Address, Port}} -> 
             [{ipAddress, formalise_ipaddress (IP_Address)}, {port, Port}];
         _ -> failed_to_retrieve_address
     catch
         _: _ -> failed_to_retrieve_address
     end.

 %%% ----------------------------------------------- ------
 %% Function: accept_connection / 1
 %% Purpose: waits for a connection and re-uses the 
 %% ListenSocket by spawning another thread
 %% to take it and listen too.  It casts the gen_server
 %% at each connection and provides details about it.

 accept_connection (ListenSocket) ->    
     case gen_tcp: accept (ListenSocket, infinity) of
         {ok, Socket} -> 
             %% re-use the ListenSocket below .....
             spawn_link (fun () -> accept_connection (ListenSocket) end),            
             OtherEnd = get_source_connection (Socket),
             ? MODULE: accepted_connection (OtherEnd),          
             loop (Socket, OtherEnd);
         {error, _} = Reason -> 
             ? ERROR (["Listener has failed to accept a connection",
                     {listener, self ()}, {reason, Reason}])
     end.

 %%% ----------------------------------------------- --------------------------
 %% Function: loop / 2
 %% Purpose: TCP reception loop, it casts the gen_server
 %% as soon as it receives something.  gen_server
 %% is responsible for generating reponse
 %% OtherEnd :: = [{ipAddress, StringIPAddress}, {Port, Port}] or 'failed_to_retrieve_address'

 loop (Socket, OtherEnd) -> 
     receive
         {tcp, Socket, Data} -> 
             ? DEBUG ("Acceptor: ~ p has received a binary message from: ~ p ~ n", [self (), OtherEnd]),
             Reply =? MODULE: incoming_binary_message (Data, OtherEnd),
             gen_tcp: send (Socket, Reply),         
             gen_tcp: close (Socket),
             exit (normal);
         {tcp_closed, Socket} -> 
             ? DEBUG ("Acceptor: ~ p. Socket closed by other end: ~ p ~ n", [self (), OtherEnd]),
             ? MODULE: socket_closed (OtherEnd),
             exit (normal);
         Any ->? DEBUG ("Acceptor: ~ p has received a message: ~ p ~ n", [self (), Any])
     end.

 %%% ----------------------------------------------
 %% Gen_server Asynchronous APIs

 accepted_connection (failed_to_retrieve_address) -> ok;
 accepted_connection ([{ipAddress, StringIPAddress}, {Port, Port}]) ->     
     gen_server: cast (? MODULE, {connected, StringIPAddress, Port}).

 socket_closed (failed_to_retrieve_address) -> ok;
 socket_closed ([{ipAddress, StringIPAddress}, {Port, Port}]) ->
     gen_server: cast (? MODULE, {socket_closed, StringIPAddress, Port}).

 incoming_binary_message (Data, _OtherEnd) -> %% expecting a binary reply
     case analyse_protocol (Data) of
         wrong -> term_to_binary ("protocol violation!");
         Val -> gen_server: call (? MODULE, {request, Val}, infinity)
     end.

 %%% -------------------- handle cast ------------------------- -----------------

 handle_cast ({listener_starts, _Port, _MyTupleIP, _LSocket} = Object, State) ->
     NewState = do_something_with_the_listen_report (Object),
     {noreply, NewState};
 handle_cast ({connected, _StringIPAddress, _Port} = Object, State) ->
     NewState = do_something_with_the_connection_report (Object),
     {noreply, NewState};
 handle_cast ({socket_closed, _StringIPAddress, _Port} = Object, State) ->
     NewState = do_something_with_the_closed_connection_report (Object),
     {noreply, NewState};
 handle_cast (Any, State) ->
     ? DEBUG ("Server> I have been casted some unknown message: ~ p ~ n", [Any]),
     {noreply, State}.


 %%%% ---------------------- handle call --------------
 handle_call ({request, Val}, _, State) ->
     {NewState, Reply} = req (Val, State),
     {reply, Reply, NewState};
 handle_call (_, _, State) -> {reply, [], State}.

 req (Val, State) ->
     %% modify gen_server state and 
     %% build reply
     {NewState, Reply} = modify_state_and_get_reply (State, Val),
     {NewState, Reply}.

 %% ------------------- terminate / 2 --------------------

 terminate (_Reason, _State) -> ok.  

 %% ----------------- code_change / 3 ------------------

 code_change (_, State, _) -> {ok, State}.

With the asynchronous capability of gen_server, we can process socket details from separate related processes. Then these processes will contact gen_server via cast and will not block gen_server from its parallel nature.

+1
source

All Articles