Browse Source

Initial commit

Adam Rutkowski 6 years ago
commit
365eb4f028
6 changed files with 216 additions and 0 deletions
  1. 8 0
      .gitignore
  2. 9 0
      rebar.config
  3. 18 0
      src/README.md
  4. 9 0
      src/example.erl
  5. 13 0
      src/ranch_proxy.app.src
  6. 159 0
      src/ranch_proxy.erl

+ 8 - 0
.gitignore

@@ -0,0 +1,8 @@
+.eunit
+ebin
+deps
+priv
+*.o
+*.beam
+*.dump
+

+ 9 - 0
rebar.config

@@ -0,0 +1,9 @@
+%M% -*- tab-width: 4;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%%% ex: ft=erlang ts=4 sw=4 et
+%%%
+%%% This file is part of cowboy_revproxy released under the MIT license.
+%%% See the NOTICE for more information.
+
+{deps,[
+    {ranch, ".*", {git, "git://github.com/extend/ranch.git", {tag, "0.6.1"}}}
+]}.

+ 18 - 0
src/README.md

@@ -0,0 +1,18 @@
+# ranch_proxy
+
+
+Simple TCP routing proxy based on @benoitc's cowboy_proxy.
+This one uses ranch 0.6.1 underneath.
+
+Expect more stuff soon, for now it's just a quick outline.
+
+## Example usage:
+
+
+    application:start(ranch).
+    ranch_proxy:serve(100, ranch_tcp, [{port, 5555}], [{proxy, {example_module, proxy_function}}]).
+
+### See also:
+
+    * [cowboy_revproxy](https://github.com/benoitc/cowboy_revproxy)
+    * [ranch](https://github.com/extend/ranch)

+ 9 - 0
src/example.erl

@@ -0,0 +1,9 @@
+-module(example).
+-export([proxy/1]).
+-export([start/0]).
+
+start() ->
+    ranch_proxy:serve(100, ranch_tcp, [{port, 5555}], [{proxy, {?MODULE, proxy}}]).
+
+proxy(_Data) ->
+    {remote, {"localhost", 7777}}.

+ 13 - 0
src/ranch_proxy.app.src

@@ -0,0 +1,13 @@
+{application, ranch_proxy,
+ [
+  {description, ""},
+  {vsn, "1"},
+  {registered, []},
+  {applications, [
+                  kernel,
+                  stdlib,
+                  ranch
+                 ]},
+  {mod, {}},
+  {env, []}
+ ]}.

+ 159 - 0
src/ranch_proxy.erl

@@ -0,0 +1,159 @@
+-module(ranch_proxy).
+-behaviour(ranch_protocol).
+
+%% API
+-export([serve/4]).
+
+%% Behaviour callbacks
+-export([start_link/4, init/4]).
+
+%% Defaults
+-define(DEFAULT_SOCKET_OPTS, [{packet, 0}, {active, once}]).
+-define(DEFAULT_TIMEOUT, 5000).
+
+%% Internal state
+-record(state, {
+        socket                                    :: inet:socket(),
+        socket_opts        = ?DEFAULT_SOCKET_OPTS :: list(gen_tcp:option()),
+        transport                                 :: module(),
+        proxy                                     :: {module(), function()},
+        buffer             = <<>>                 :: binary(),
+        remote_endpoint                           :: any(),
+        remote_socket                             :: inet:socket(),
+        remote_transport                          :: module(),
+        remote_socket_opts = ?DEFAULT_SOCKET_OPTS :: list(gen_tcp:option()),
+        remote_connect_fun = fun remote_connect/1 :: function(),
+        timeout                                   :: non_neg_integer()
+    }).
+
+%% ----------------------------------------------------------
+%% API
+%% ----------------------------------------------------------
+
+serve(Listeners, Protocol, ProtocolOpts, ProxyOpts) ->
+    {ok, _} = ranch:start_listener(?MODULE, Listeners, Protocol,
+                                   ProtocolOpts, ?MODULE, ProxyOpts).
+
+%% ----------------------------------------------------------
+%% Callbacks
+%% ----------------------------------------------------------
+
+start_link(ListenerPid, Socket, Transport, Opts) ->
+        Pid = spawn_link(?MODULE, init, [ListenerPid, Socket, Transport, Opts]),
+        {ok, Pid}.
+
+init(ListenerPid, Socket, Transport, Opts) ->
+        ok       = ranch:accept_ack(ListenerPid),
+        {M, F}   = proplists:get_value(proxy, Opts),
+        Timeout  = proplists:get_value(timeout, Opts, ?DEFAULT_TIMEOUT),
+        SOpts    = proplists:get_value(source_opts, Opts, ?DEFAULT_SOCKET_OPTS),
+        ROpts    = proplists:get_value(remote_opts, Opts, ?DEFAULT_SOCKET_OPTS),
+        RConnFun = proplists:get_value(remote_connect, Opts, fun remote_connect/1),
+
+        loop(#state{
+                socket             = Socket,
+                transport          = Transport,
+                proxy              = {M, F},
+                timeout            = Timeout,
+                socket_opts        = SOpts,
+                remote_socket_opts = ROpts,
+                remote_connect_fun = RConnFun
+            }).
+
+%% ----------------------------------------------------------
+%% Proxy internals
+%% ----------------------------------------------------------
+
+loop(State = #state{ socket    = Socket,
+                     transport = Transport,
+                     proxy     = Proxy,
+                     buffer    = Buffer,
+                     timeout   = Timeout }) ->
+        case Transport:recv(Socket, 0, Timeout) of
+                {ok, Data} ->
+                        Buffer1 = <<Buffer/binary, Data/binary>>,
+                        case run_proxy(Proxy, Buffer1) of
+                            stop ->
+                                terminate(State);
+                            ignore ->
+                                loop(State);
+                            {buffer, NewData} ->
+                                loop(State#state{ buffer = NewData });
+                            {remote, Remote} ->
+                                start_proxy_loop(State#state{
+                                    buffer          = Buffer1,
+                                    remote_endpoint = Remote
+                                });
+                            [{remote, Remote}, {data, NewData}] ->
+                                start_proxy_loop(State#state{
+                                    buffer          = NewData,
+                                    remote_endpoint = Remote
+                                });
+                            [{remote, Remote}, {data, NewData}, {reply, Reply}] ->
+                                Transport:send(Socket, Reply),
+                                start_proxy_loop(State#state{
+                                    buffer          = NewData,
+                                    remote_endpoint = Remote
+                                });
+                            _ ->
+                                loop(State#state{ buffer = Buffer1 })
+                        end;
+                _ ->
+                        terminate(State)
+        end.
+
+start_proxy_loop(State = #state{ remote_endpoint = Remote, buffer = Buffer }) ->
+    case remote_connect(Remote) of
+        {Transport, {ok, Socket}} ->
+            Transport:send(Socket, Buffer),
+            proxy_loop(State#state{ remote_socket = Socket,
+                                    remote_transport = Transport,
+                                    buffer = <<>> });
+        {_, {error, _Error}} ->
+            terminate(State)
+    end.
+
+proxy_loop(State = #state{ socket             = SSock,
+                           transport          = STrans,
+                           socket_opts        = SOpts,
+                           remote_socket      = RSock,
+                           remote_transport   = RTrans,
+                           remote_socket_opts = ROpts }) ->
+
+    STrans:setopts(SSock, SOpts),
+    RTrans:setopts(RSock, ROpts),
+
+    receive
+        {_, SSock, Data} ->
+            RTrans:send(RSock, Data),
+            proxy_loop(State);
+        {_, RSock, Data} ->
+            STrans:send(SSock, Data),
+            proxy_loop(State);
+        {tcp_closed, RSock} ->
+            terminate(State);
+        {tcp_closed, SSock} ->
+            terminate_remote(State);
+        _ ->
+            terminate_all(State)
+    end.
+
+remote_connect({Ip, Port}) ->
+    {ranch_tcp, gen_tcp:connect(Ip, Port, [binary,
+                {packet, 0}, {delay_send, true}])}.
+
+run_proxy({M, F}, Data) ->
+    M:F(Data).
+
+terminate(#state{ socket = Socket, transport = Transport }) ->
+    Transport:close(Socket).
+
+terminate_remote(#state{remote_socket = Socket, remote_transport = Transport}) ->
+    Transport:close(Socket),
+    ok.
+
+terminate_all(State) ->
+    terminate_remote(State),
+    terminate(State).
+
+