Wolfmans Howlings

A programmers Blog about Programming solutions and a few other issues

Using Cucumber to test Erlang Servers

Posted by Jim Morris on Sat May 02 00:09:36 -0700 2009

Background

I've been using cucumber to do all my integration testing. Usually testing over the wire to a live system, regardless of what the target system is written in (Ruby Rails, Merb, Java etc).

I have been writing a conference server in Erlang (actually rewriting), and have been testing it over the wire using JRuby and Cucumber.

I have a Java library which talks the binary protocol, so using JRuby allows me to use that library. However I need to test a lot of edge conditions in the server, and these cannot easily be tested using the wire protocol.

So after discovering JInterface I realized I could probably use JRuby and Cucumber to directly test my Erlang gen_server based module directly.

It works pretty well, first I wrap the JInterface code in a ruby library (actually JRuby), then I write my steps to talk to that wrapper. I can get pretty deep in my testing, even to the point of directly testing the returned tuplets.

I could use one of the Ruby to Erlang libraries, but the two I found are either no longer supported (Erltricity) or too new to be stable (erlix), so using JInterface under JRuby seems a good alternative for now. I'll be keeping my eye on erlix as it is under development and seems to be written the right way.

The Code

Erlang Wrapper

OK down to business, here is the wrapper, what I have tried to do is put all the JInterface knowledge and knowledge of Erlang types in this wrapper, I would also put any specific calls to the Erlang Node and unpacking of any return data here as well, but for now I'll keep it simple.

Put this in the libs directory of the cucumber test directory, given the standard cucumber directory hierarchy...

workdir
  features
    step_definitions
support
  libs

You need to put the OtpErlang.jar file in the libs directory as well.

# libs/erlang_wrapper.rb
#
# Wraps an erlang jinterface API to talk to an Erlang node
#
require 'java'

$LOAD_PATH.unshift(File.dirname(__FILE__)) # so the jar is found
require 'OtpErlang.jar'

import 'com.ericsson.otp.erlang.OtpNode'
import 'com.ericsson.otp.erlang.OtpSelf'
import 'com.ericsson.otp.erlang.OtpPeer'
import 'com.ericsson.otp.erlang.OtpConnection'
import 'com.ericsson.otp.erlang.OtpErlangList'
import 'com.ericsson.otp.erlang.OtpErlangObject'
import 'com.ericsson.otp.erlang.OtpErlangTuple'
import 'com.ericsson.otp.erlang.OtpErlangAtom'
import 'com.ericsson.otp.erlang.OtpErlangBinary'
import 'com.ericsson.otp.erlang.OtpErlangInt'
import 'com.ericsson.otp.erlang.OtpErlangString'

# helper method to monkey patch the given class
def add_method(meth, klass, &block)
  klass.instance_eval { define_method meth, &block }
end

# monkey patch to do rubyesqe conversions to erlang types
add_method(:to_erl, Symbol) {OtpErlangAtom.new(self.to_s)}
add_method(:to_erl, Integer) {OtpErlangInt.new(self)}

class Array
  # do a deep conversion of the array to an ErlangList of ErlangObjects
  def to_erl
    a= self.convert_to_erl
    OtpErlangList.new(a.to_java(OtpErlangObject))
  end

  # do a deep conversion of the array to an ErlangTuple of ErlangObjects
  def to_tuple
    a= self.convert_to_erl
    OtpErlangTuple.new(a.to_java(OtpErlangObject))
  end

  # do a deep conversion of the list to ErlangObjects
  # produces a Ruby Array of ErlangObjects
  def convert_to_erl
    ar= []
    self.each do |a|
      if a.instance_of? Symbol
        ar << a.to_erl
      elsif a.kind_of? Integer
        ar << a.to_erl
      elsif a.instance_of? String
        ar << a.to_erl
      elsif a.kind_of? Array
        ar << a.to_erl
      else
        # it must already be an OtpErlang type
        ar << a 
      end
    end
    ar
  end
end

class String
  def to_erl(t=nil)
    if t == :list
      OtpErlangString.new(self.to_java_bytes)
    else
      OtpErlangBinary.new(self.to_java_bytes)
    end
  end
end


# monkey patch the Erlang types
EList= com.ericsson.otp.erlang.OtpErlangList
Tuple= com.ericsson.otp.erlang.OtpErlangTuple
Atom= com.ericsson.otp.erlang.OtpErlangAtom
EInt= com.ericsson.otp.erlang.OtpErlangInt
ELong= com.ericsson.otp.erlang.OtpErlangLong
Binary= com.ericsson.otp.erlang.OtpErlangBinary

add_method(:to_s, Tuple) {self.toString()}
add_method(:to_s, EList) {self.toString()}
add_method(:to_s, Atom) {self.toString()}
add_method(:to_s, EInt) {self.toString()}
add_method(:to_s, ELong) {self.toString()}
add_method(:to_s, Binary) {self.binaryValue.toString()}

# convenience methods to index into a tuple and list
class Tuple
  def [](i)
    self.elementAt(i)
  end
end

class EList
  def [](i)
    self.elementAt(i)
  end
end

# convert to the closest Ruby type
add_method(:to_ruby, Atom) {self.toString().to_sym}
add_method(:to_ruby, EInt) {self.intValue}
add_method(:to_ruby, ELong) {self.intValue}
add_method(:to_ruby, Binary) {self.binaryValue.to_a}


# all the interface commands to the Erlang node are wrapped here
# Jinterface does all the work but we wrap those calls here
class ErlangWrapper

  def initialize
    # somewhere to store out-of-band messages
    @msgs= []
  end

  # connect to an Erlang node called erlang_node, call this node my_node
  def connect(erlang_node, my_node)
    begin
      @self = OtpSelf.new(my_node)
      other = OtpPeer.new(erlang_node)
      @connection = @self.connect(other)
    rescue
      raise "could not connect to #{erlang_node} as #{my_node}: $!"
    end
  end

  # returns my nodes pid
  def self_pid
    @self.pid()
  end

  # close the connection
  def close
    @connection.close
  end

  # check we are connected
  def connected?
    @connection && @connection.isConnected()
  end

  # does an rpc to mod:fun on connected node with optional args. args
  # will be a ruby array of ErlangObjects
  def call(mod, fun, args=nil)
    if args
      # convert the ruby array to an Erlang List
      a= args.to_erl
    else
      a= OtpErlangList.new
    end

    #puts "call: #{mod}:#{fun}[#{a.toString}]"
    @connection.sendRPC(mod, fun, a)

    gotrpc= false
    while(!gotrpc) do
      reply = @connection.receive(5000)
      # puts "reply: #{reply.toString}"

      if reply[0].toString != 'rex'
        parse_oobd_message(reply)
      else
        gotrpc= true
      end
    end
    reply.elementAt(1)
  end

  # get the arity of the given ErlangObject
  def arity(v)
    v.arity()
  end

  # convert the args into erlang types returning a ruby array of
  # erlang objects
  def make_args_erlang(*args)
    ar= []
    args.each do |a|
      ar << a.to_erl
    end
    ar
  end

  # convert args to otp type if not already, syms get converted to atoms
  def make_tuple(*args)
    ar= make_args_erlang(*args)
    tuple = OtpErlangTuple.new(ar.to_java(OtpErlangObject))
  end

  # store out-of-band messages in an array, we could parse them here too
  def parse_oobd_message(m)
    @msgs << m
  end

  # pops the next out-of-band message off the fifo
  # first need to read any that may have been queued up
  def get_oobd()
    # read any oob messages in the queue
    while @connection.msgCount > 0
      m= @connection.receive()
      parse_oobd_message(m)
    end

    # pop one off the stack
    unless @msgs.empty?
      res= @msgs.shift
    else
      nil
    end
  end

end

So this is the wrapper, you will probably add some convenience methods here, but these are the basic ones for this article.

What I am doing here is monkey patching some of the Ruby types and Erlang types to be able to easily convert to and from each type, and to produce strings etc. You can extend this to handle types that occur commonly in your programs.

In connect I do the JInterface incantations necessary to use the rpc call API. When using rpc to call functions on other nodes, it uses rex, and the return tuple always starts with the atom rex. However I have a few out of band messages coming in from the server under test and these can come in pretty much anytime, so rather than using the receiveRpc call I extract the messages myself, with a timeout, and if it isn't a rex reply then I put them in a fifo for later testing.

To try to make things easier in the steps, I try to convert Ruby types to Erlang types in any arguments I pass in, and I make some assumptions, like a Ruby symbol becomes an Erlang atom, and that Ruby strings become Erlang binaries (as I use binaries to store all my strings in the server I am writing, you can change that to list based strings if you prefer).

env.rb

Next the features/support/env.rb which sets up the environment and provides a few helper functions to talk to the erlang wrapper, it basically creates a new erlang_wrapper instance for each connection and stores it so it can clean up and close the connections when done, I actually create multiple nodes so I can have them talk with each other, but I'll leave that for another article.

require "spec"

# setup for Java
$LOAD_PATH.unshift(File.dirname(__FILE__) + '/../../libs')
require 'erlang_wrapper'

class Helpers
  def initialize
    @nodes= []
  end

  def create_node
    n= ErlangWrapper.new
    @nodes << n
    n
  end

  def create_node_to(to_node)
    n= create_node
    n.connect(to_node, "test1")
    n
  end

  def get_node(user)
    @nodes[user.to_i-1]
  end

end

World do
  Helpers.new
end

# runs after each Scenario to cleanup
After do
  @nodes.each do |n|
    if n.connected?
      n.close
    end
  end
end

at_exit do
  # Global teardown
end

The Feature

Here is the feature for the simple server I am testing. It is a gen_server called id_server that has one job and that is to return a an integer every time next_value is called, this is used globally in my system, to return a unique id for anything that calls it.

This is put in features/id_server.feature

Feature: test connection to an Erlang node
In order to test my Erlang code with Cucumber
Cucumber needs to be able to communicate with Erlang nodes
So I can write cool features to test the nodes directly

  Scenario: ask id_server for next value
    Given a connection to node "vs1@localhost"
    When I call id_server:next_value
    Then the result is a tuple of arity 2
    And the tuple has the following elements:
      | idx | result |
      | 0   | value  |
      | 1   | :int   |

    When I call id_server:next_value
    Then the 2nd value will be incremented by 1

The Steps

Lastly the basic steps go in features/step_definitions/erlang_steps

Given /^a connection to node "([^"]*)"$/ do |node|
  @erl= create_node_to(node)
end

When /^I call ([^:]+):([^ ]+)$/ do |mod, fun|
  erl= get_node(1)
  @result= erl.call(mod, fun)
end

Then /^the result is a tuple of arity (d+)$/ do |n|
  erl= get_node(1)
  erl.arity(@result).should == n.to_i
end

Then /^the tuple has the following elements:$/ do |table|
  erl= get_node(1)
  # table is a Cucumber::Ast::Table
  table.hashes.each do |h|
    i = h['idx']
    v = h['result']
    e = @result[i.to_i]
    # remember the value at this index for further tests
    @values ||= []
    @values[i.to_i]= e

    if v[0] == ?:
      case v[1..-1].to_sym
      when :int then e.to_ruby.should  be_kind_of(Numeric)
      when :str then e.to_ruby.should be_kind_of(String)
      when :atom then e.to_ruby.should be_kind_of(Symbol)
      else raise "unknown type: #{v}"
      end
    else
      e.to_s.should == v
    end
  end
end

Then /^the (d+)(st|nd|rd|th) value will be incremented by (d+)$/ do |idx, _, inc|
  e = @result[idx.to_i-1]
  o = @values[idx.to_i-1]
  e.to_ruby.should == (o.to_ruby + inc.to_i)
end

So some BDD'ists in the audience may start to howl at me, when they see I carry state around between steps specifically the @result and @values variables, but to be honest I don't think it is a big deal and I don't really see any way to avoid it in this case (if someone can enlighten me please leave a comment).

What I am doing here is telling my erlang_wrapper to connect to the specified Erlang node, make an RPC call to that node, then analyze the results. The results stored in @results contain a tuple, whose arity I check, then use the neat Cucumber tables feature to test each element in the tuple. I use a convention here where I either test the specific value or the returned type if I prefix a : to the value. I have to do this because I do not know what the initial value will be that the id_server will return. I store each element of the tuple in a variable @values so I can test it later in another step.

I then call next_value again, and then use that stored value to see if the integer has incremented, presuming I am the only one talking to the server at this time that would be the correct result, of course if this were a live system it could have been incremented by someone else in the meantime, but I am not going to worry about that right now.

I hope this goes to show just how powerful Cucumber can be, with a little help from JRuby.

Posted in Erlang,Cucumber  |  Tags erlang,cucumber,bdd  |  2 comments

Show

Dealing with a TCP packet with a little endian header in Erlang gen_tcp

Posted by Jim Morris on Mon Apr 13 01:11:57 -0700 2009

In Erlang they have a very neat way of reading TCP packets that have a header that specifies how big the following packet is. So long as you send that header as a big-endian integer, you can use the built-in mechanism. Then gen_tcp, takes care of making sure the entire packet is read before passing it onto you.

Here is an example of a simple server getting packets from a client using some simple binary protocol... For instance sending this binary packet,

<<0,0,0,10,1,2,3,4,5,6,7,8,9,0>>

the first 4 bytes are the length of the rest of the packet in this case 10 bytes, followed by 10 bytes of data. This will be delivered in the receive statement below when all the bytes have been read by the low level gen_tcp code.

-module(example1).

%% state to hold the clients state
-record(state, {stuff=default}).

%% Receives a packet whose header is 4 bytes of 
%% packet count in big-endian order followed by that 
%% length of data as binary
listen(Port) ->
    {ok, LSocket} = gen_tcp:listen(Port, [binary, {packet, 4}, 
                                        {active, false}]),
    do_accept(LSocket).

%% Each client gets its own process
do_accept(LSocket) ->
    case gen_tcp:accept(LSocket) of
        {ok, Socket} ->
            Pid = spawn(fun() -> init(Socket) end),
        gen_tcp:controlling_process(Socket, Pid);

        {error, Reason} ->
        io:format("Socket accept error: ~p~n", [Reason])
    end,
    do_accept(LSocket).

init(Socket) ->
    State = #state{stuff=something},
    loop(Socket, State).


%% the {active,once} stuff means that it will receive one message for each
%% iteration of the loop, this is good as it means TCP/IP flow control will
%% be used to throttle the client, rather than filling up the servers buffers.
%% We get one receive message for each complete packet received, and we know the packet
%% is a complete packet
loop(S, State) ->
    inet:setopts(S,[{active,once}]),
    receive
        {tcp,S,Data} ->
            NewState = process(Data, State),
            loop(S, NewState);
        {tcp_closed,S} ->
            io:format("Socket ~w closed [~w]~n",[S,self()]),
            ok
    end.

process(_Data, _State) ->
    %% do something cool
    io:format("We got the following Data: ~p~n", [_Data]),
    ok.

However if (like me) 10 years ago you made the wrong decision and used a little-endian packet count in your binary protocol (because you were only talking to Intel based Windows PCs back then), you are SOL. Well not quite. A fairly simple state machine will handle the packets quite nicely, although not as efficiently as the built-in mechanism I am sure.

There are at least two ways to go for this one.

  1. If you can spare two processes for each connected client (sure processes are really cheap in Erlang) then have a process that does a blocking tcp_gen:recv (using {active, false}, {packet, 0}) until all the bytes are read, and send a message to the other client process just as the gen_tcp example above does. This is pretty easy and clean, but does use two processes per client, and my goal is to have 100,000 clients connected per server, which may be stretching even Erlang. The example code for this case is shown at the end as example3

  2. Write your own state machine in the one process that handles each client, this is the approach I took, and the example code is below.

-module(example2).

%% state to hold the clients state, and the read state of the packet
-record(state, {rdstate, pktsize, pktbuf, testcount= 0}).

%% definitions for a little endian count, a little endian short and a
%% byte
-define(LONG, 32/unsigned-little-integer).
-define(SHORT, 16/unsigned-little-integer).
-define(BYTE, 8/unsigned-little-integer).

%% Receives a packet whose header is 4 bytes of packet count in
%% little-endian order followed by that length of data as binary Note
%% that packet is set to 0 as we can't use the built-in packet stuff
listen(Port) ->
    {ok, LSocket} = gen_tcp:listen(Port, [binary, {packet, 0}, 
                                        {active, false}]),
    do_accept(LSocket).

%% Each client gets its own process
do_accept(LSocket) ->
    case gen_tcp:accept(LSocket) of
        {ok, Socket} ->
            Pid = spawn(fun() -> init(Socket) end),
        %% we need this so that the client process gets its own tcp messages
        gen_tcp:controlling_process(Socket, Pid);

        {error, Reason} ->
        io:format("Socket accept error: ~p~n", [Reason])
    end,
    do_accept(LSocket).

init(Socket) ->
    %% this sets the inital state of packet read state to read the
    %% size and the initial number of bytes to read as 4, with an empty
    %% packet buffer
    State = #state{rdstate=rdsize, pktbuf= <<>>, pktsize=4},
    loop(Socket, State).


%% the {active,once} stuff means that it will receive one message for each
%% iteration of the loop, this is good as it means TCP/IP flow control will
%% be used to throttle the client, rather than filling up the servers buffers.
%% We get one receive message for each complete packet received, and we know the packet
%% is a complete packet
loop(S, State) ->
    inet:setopts(S,[{active,once}]),
    receive
        {tcp,S,Data} ->
            NewState =  handle_data_loop(Data, State),
            loop(S, NewState);
        {tcp_closed,S} ->
            io:format("Socket ~w closed [~w]~n",[S,self()]),
            ok
    end.

%% handles reading the data packet until a complete packet is read,
%% and accumulate any excess which may be used towards the next packet
%% may execute several packets/commands if it got more than one packet.
%% stores packet information in State between reads from the client.
handle_data_loop(Data, State) ->
    {NewState, Acc, ReadState, Size} =
    handle_data_loop(Data, State#state.pktbuf, State#state.rdstate, State#state.pktsize, State),
    NewState#state{pktbuf=Acc, pktsize=Size, rdstate=ReadState}.

%% We need this second version so we can loop and process multiple
%% packets we may have received in a single TCP/IP packet
handle_data_loop(Data, Acc, ReadState, Size, State) ->   
    case handle_data(ReadState, <<Acc/binary, Data/binary>>, Size) of
    %% need more data from client, so needs to go back to the read
    %% loop to wait for more data from the client
    {need_more, Rest, NewState, NewSize} ->
        {State, Rest, NewState, NewSize};

    %% we got a complete packet so process it, then do a tail
    %% recursive loop until all complete packets have been
    %% processed
    {packet, Pkt, Rest} ->
        NewState= process(Pkt, State),
        handle_data_loop(Rest, <<>>, rdsize, 4, NewState)
    end.


%% the actual state machine that reads the packet size then the data
%% each of these handles a different state (the first parameter)

%% state 1 where we are reading the packet header but we don;t have
%% enough yet
handle_data(rdsize, Data, Size) when byte_size(Data) < Size ->
    {need_more, Data, rdsize, Size};

%% state 2 where we are still reading the packet header and we now
%% have enough, we extract the little endian packet count and switch
%% state to read data
handle_data(rdsize, Data, Size) when byte_size(Data) >= Size ->
    <<N:?LONG, Rest/binary>> = Data,
    handle_data(rddata, Rest, N);

%% state 3 where we are reading the data portion of the packet but
%% don't have enough yet
handle_data(rddata, Data, Size) when byte_size(Data) < Size ->
    {need_more, Data, rddata, Size};

%% state 4 where we are still reading the data portion of the packet,
%% and now have enough for a complete packet
%% extract the complete pkt, and any excess data
handle_data(rddata, Data, Size) when byte_size(Data) >= Size ->
    <<Pkt:Size/binary, Rest/binary>> = Data,
    {packet, Pkt, Rest}.



%% Actually process the packet we received
process(Data, State) ->
    %% do something cool and maybe change the State
    io:format("We got the following Data: ~p~n", [Data]),
    %% for test purposes we just count the number of packets we got
    Cnt= State#state.testcount,
    State#state{testcount= Cnt+1}.


%%
%% We even have Unit tests for the state machine to test the two extreme
%% cases, 1 byte at a time, and multiple packets in one chunk
%%
-define(TEST, 1).
-ifdef(TEST).
-include_lib("/usr/local/lib/erlang/lib/eunit-2.0/include/eunit.hrl").
-endif.

-ifdef(TEST).

handle_data_1byte_at_a_time_test() ->
    ?assertMatch({need_more, <<3>>, rdsize, 4}, handle_data(rdsize, <<3>>, 4)),    
    ?assertMatch({need_more, <<3,0>>, rdsize, 4}, handle_data(rdsize, <<3,0>>, 4)),
    ?assertMatch({need_more, <<3,0,0>>, rdsize, 4}, handle_data(rdsize, <<3,0,0>>, 4)),   
    ?assertMatch({need_more, <<>>, rddata, 3}, handle_data(rdsize, <<3,0,0,0>>, 4)),   

    ?assertMatch({need_more, <<11>>, rddata, 3}, handle_data(rddata, <<11>>, 3)),   
    ?assertMatch({need_more, <<11,22>>, rddata, 3}, handle_data(rddata, <<11,22>>, 3)),   
    ?assertMatch({packet, <<11,22,33>>, <<>>}, handle_data(rddata, <<11,22,33>>, 3)).

handle_data_loop_2_packets_all_bytes_at_once_test() ->
    C1 = #state{rdstate=rdsize, pktsize=4, pktbuf = << >>, testcount=0},
    Bin =  <<4, 0, 0, 0, 250, 1, 0, $z, 5, 0, 0, 0, 251, 2, 0, $a, $b>>,
    C2 = handle_data_loop(Bin, C1),
    ?assertMatch(#state{pktsize=4, rdstate=rdsize, pktbuf = <<>>, testcount=2}, C2).

-endif.

It is a little complicated because with TCP/IP you may receive too little data, too much data or the exact amount you want. There is usually a minimum size of TCP/IP packet, however I take the worst case scenario and allow for a case where we could get one byte at a time per read. There is also the case where the client actually sends multiple packets in one TCP/IP packet, this could easily happen depending on how you write the client, so we need to store all excess data, and maybe process multiple packets from a single read.

Included at the end are two eunit tests to test the two extreme cases.

Now remember this is my first Erlang project, the state machine I am using went through many iterations, as I read more about Erlang I refactored and cleaned up the code until I got to something similar to the above example. I welcome any feedback from Erlang Gurus.

The moral of this story is to make the right decision in the first place and pass big-endian integers over the network, that is why it is also called network byte order. (Hey I was young and naive ;)

To be fair this only added about 23 lines of extra code overall.

Here is the code for example 3 the case where we have two process per client, and uses a blocking recv to get the packets. It is slightly less code, but not nearly as sexy ;)

-module(example3).

%% state to hold the clients state
-record(state, {pid, testcount}).
-record(cc, {testcount}).

listen(Port) ->
    {ok, LSocket} = gen_tcp:listen(Port, [binary, {packet, 0}, 
                                        {active, false}]),
    do_accept(LSocket).

%% Each client gets its own process
do_accept(LSocket) ->
    case gen_tcp:accept(LSocket) of
        {ok, Socket} ->
            Pid = spawn(fun() -> init(Socket) end),
        gen_tcp:controlling_process(Socket, Pid);

        {error, Reason} ->
        io:format("Socket accept error: ~p~n", [Reason])
    end,
    do_accept(LSocket).

%% Each client also gets another process to handle the packets
init(Socket) ->
    Pid = spawn(fun() -> server(#cc{testcount=0}) end),
    State = #state{testcount=0, pid=Pid},
    loop(Socket, State).


%% This is a blocking loop that blocks to get exactly 4 bytes which is
%% a count then blocks reading that count number of bytes, sending a
%% message to the server process when done
loop(S, State) ->
    case gen_tcp:recv(S, 4) of
    {ok, B} ->
        <<Cnt:32/unsigned-little-integer>> = B,
        case gen_tcp:recv(S, Cnt) of
        {ok, Data} ->
            State#state.pid ! {mytcp, Data},
            loop(S, State);
        {error, closed} ->
            State#state.pid ! {mytcp_closed}
        end;
    {error, closed} ->
        State#state.pid ! {mytcp_closed}
    end.

%% the actual server that handles the packets asynchronously
server(State) ->
    receive
        {mytcp,Data} ->
            NewState = process(Data, State),
            server(NewState);
        {mytcp_closed} ->
            io:format("my Socket closed [~w]~n",[self()]),
            ok
    end.

process(Data, State) ->
    %% do something cool
    io:format("We got the following Data: ~p, State: ~p~n", [Data, State]),
    Cnt= State#cc.testcount,
    State#cc{testcount= Cnt+1}.

Posted in Erlang  |  Tags erlang,gen_tcp  |  1 comments

Show

Using Erlang with JInterface

Posted by Jim Morris on Sun Apr 12 16:56:50 -0700 2009

Background

I recently undertook a project to rewrite a voice conference server I wrote over 10 years ago. This server was written in C++ running on a Windows Server (originally NT and then win2k). The reason it was written for win2k was to use I/O Completion Ports which were not (and still are not) available on Linux.

Solaris has an equivalent AIO feature but not quite the same. I/O Completion Ports make very efficient use of multiple CPU's, and the conference server had to be very fast and handle a large number of concurrent connections.

The program uses a limited number of threads basically enough to keep all the CPUs busy, it uses the Proactor Pattern to handle the large number of concurrent connections. Each client connected has its own state, and every Asynchronous I/O needs to switch to that context. Needless to say there was a lot of shared memory and locks everywhere. The program was very complex to write, trying to keep all those locks and memory synchronized yet avoid deadlocks. I guess I did a good job as the servers are still running to this day, with barely a reboot in 5 years.

However the code is tortuous, and making any changes risks introducing deadlocks, also I wanted to get off of the windows servers and onto Linux which all my other servers run.

I have written several scalable servers using Java NIO, and even though they use the Reactor Pattern and I avoid creating hundreds of threads, instead I have several threads working through the ready sockets, it is still very hard to write, even with the great new concurrency libraries,

An article comparing the two patterns is here.

The trick is to write servers that you can simply add new servers to increase capacity, and to allow for redundancy so if one server goes down the service continues to run.

So having heard a lot about Erlang and how it auto-magically allowed you to run scalable and redundant servers I had to give it a go.

I bought Joe Armstrong's Book to learn Erlang, it is a good book to learn the basics, but is very short on details about OTP and how to actually use Erlang to write scalable servers. I guess the next book in the series will cover that ;)

I actually found it very hard to wrap my brain around Erlang initially. Having programmed in procedural and object oriented languages for so many years the switch to a functional, non object oriented language was tough.

Once I got over that initial hurdle (but am still struggling with it), I was able to quickly write the basic operations of the voice server in Erlang. I used a number of gen_servers, and tried to follow the OTP principles as closely as possible. I still have not figured out how to achieve the transparent scalability and redundancy that Erlang is touted for and that my old server has. However I do see the possibilities of making that transition.

What made the whole thing very easy is that every client connection can have its own process and state, totally independent of other connections. No locks are needed as there is no shared memory (except for the large ETS tables I use to store the list of users that are online etc). It turns out that not having to use either the Reactor or Proactor pattern makes things very simple, although as I am using the asynchronous mode of gen_tcp it is almost like a Proactor, as I get a callback (actually Erlang messages) everytime a packet comes in. As every client has its own process, and there could be about 100,000 clients concurrently connected, that is more than enough process to keep all the processors busy, also distributing those process over several physical servers should not be a problem (if I can figure out a way to load balance, probably using a H/W load balancer). I won't go into details here of gen_servers and the Erlang language as there is plenty of documentation on the web covering those, one particularly good series I found is here.

I use a binary protocol between the clients and servers and Erlang is particularly good at handling binary protocols, with its Binary type and easy pattern matching to extract parts of the message. Packing and unpacking binary messages is a breeze once you get the binary matching syntax.

Java and Erlang

However (and finally getting to the point of this article) one thing I found Erlang is particularly bad at is database access. It does have ODBC built in, and there is a Postgresql driver too, however the code is basically using raw SQL, and gets very ugly very fast. Now I am used to using Java and Spring and SpringJdbc to do all my database work, and it is a pleasure to work with. I use a common pattern of Dao, Business logic and I/O separation, and it keeps things very clean. Plus I already had all the database access written in Java.

So being one to use the best tool for the given situation, I wanted to continue to use Java/Spring for all the database accesses, and use Erlang as the server frontend.

Turns out Erlang ships with a pretty robust Java to Erlang interface called JInterface.

Basically this allows me to write an Erlang Node in Java, which Erlang can communicate with seamlessly using the normal Erlang message passing systems.

I have structured the database back end as 3 basic packages. The I/O package that contains all the knowledge of the server/server protocol for database requests and actually runs as an Erlang node. It calls methods in a POJO package which handles all business logic for those requests, and the business logic POJOs access the database entirely through calls to methods in the DAO package. The whole system is hooked together using Spring IoC, and the DAO is written using Spring JDBC.

I won't go into details here of how to write Spring based JDBC access as that is also well documented on the Spring site I may write another article on Spring JDBC as it is very elegant and easy to use.

So the focus of this article is the JInterface based part of the Java system, which receives database requests from the Erlang based front-end server and does whatever it needs to do getting data from the database then replies to that request back to the calling Erlang process.

Here is a portion of the java code setting up and handling the erlang requests, only the JInterface portions are shown

import com.ericsson.otp.erlang.*;

public class ServerNodeImpl implements Server {
    private static final Logger jlog = Logger.getLogger(ServerNodeImpl.class);
    private String nodeName = "databasenode@localhost";
    private String mboxName = "database";

...

    private OtpMbox mbox;

    // create some convenient atoms
    private final OtpErlangAtom request= new OtpErlangAtom("request");
    private final OtpErlangAtom authenticate= new OtpErlangAtom("authenticate");
    private final OtpErlangAtom failed= new OtpErlangAtom("failed");
    private final OtpErlangAtom ok= new OtpErlangAtom("ok");

    // start the node up
    public boolean start() {
        try{
            init();
        }catch(IOException e){
            jlog.error("start()", e);
            return false;
        }

        // this will block and the caller will block until the loop exits
        loop();

        return true;
    }

    // stop the node
    public boolean stop() {
        // to stop it, try sending myself a stop message
        OtpErlangObject[] msg = new OtpErlangObject[2];
        msg[0] = mbox.self();
        msg[1] = new OtpErlangAtom("stop");
        OtpErlangTuple tuple = new OtpErlangTuple(msg);  
        mbox.send(mbox.self(), tuple);
        return true;
    }

    private void init() throws IOException {
        jlog.info("Node: " + nodeName + ", MBox: " + mboxName);
        OtpNode self = new OtpNode(getNodeName());
        mbox = self.createMbox(getMboxName());
    }

    /**
     * We get requests which are tuples.. {from,cmd,ref,...}
     * where cmd is currently an atom stop|request
     */
    private void loop() {
        jlog.info("loop(): starting...");
        boolean running= true;
        while(running){
            try{
                OtpErlangObject o = mbox.receive();
                jlog.debug("loop(): Received Request: " + o);
                if(o instanceof OtpErlangTuple){
                    OtpErlangTuple msg = (OtpErlangTuple) o;
                    OtpErlangPid from = (OtpErlangPid) msg.elementAt(0);
                    OtpErlangObject cmd = msg.elementAt(1);
                    OtpErlangRef ref = (OtpErlangRef) msg.elementAt(2);
                    if(cmd instanceof OtpErlangAtom){
                        OtpErlangAtom atom= (OtpErlangAtom)cmd;
                        String c= atom.atomValue();
                        if(c.equals("stop")){
                            jlog.info("loop(): got stop message");
                            running= false;
                        }else if(c.equals("request")){
                            try{
                                byte [][] reply= handleRequest(from, msg);
                                sendResponse(from, ref, reply);
                            }catch(Exception e){
                                jlog.error("loop(): got exception from handleRequest()", e);
                            }
                        }else{
                            jlog.error("loop(): got unknown cmd: " + c);                          
                        }
                    }else{
                        jlog.error("loop(): got unknown cmd type: " + cmd);
                    }

                }else{
                    jlog.error("loop(): Unknown message type: " + o);
                }
            }catch(Exception e){
                jlog.error("loop(): got exception", e);
                running= false;
            }
        }
        jlog.info("loop(): ...leaving");
    }

    ...

    // sends a tuple back to requesting Pid {From, Ref, Data}
    private void sendResponse(OtpErlangPid from, OtpErlangRef ref, byte [] ba) {
        OtpErlangBinary data= new OtpErlangBinary(ba);      
        OtpErlangObject [] eoa= new OtpErlangObject[]{mbox.self(), ref, data};
        OtpErlangTuple tuple= new OtpErlangTuple(eoa);
        mbox.send(from, tuple);
    }

That is it, a very simple Erlang node which sits and receives Erlang messages from another Erlang node and processes them then sends its responses back to the requesting Erlang node.

You only need the jinterface package from the Erlang distribution, and import it... import com.ericsson.otp.erlang.*;

To start the node listening you simply do this...

OtpNode self = new OtpNode(getNodeName());
mbox = self.createMbox(getMboxName());

Now you have a node which can be sent to using...

{database, databasenode@localhost} ! message.

The loop has the following...

OtpErlangObject o = mbox.receive();

which receives a message, which you can then process. This is a little wordy but fairly simple...

   
// first check that the message is a tuple
if(o instanceof OtpErlangTuple){
    // cast it into a tuple object
    OtpErlangTuple msg = (OtpErlangTuple) o;

    // extract the first element of the tuple
    OtpErlangPid from = (OtpErlangPid) (msg.elementAt(0));

    // extract the second element of the tuple
    OtpErlangObject cmd = msg.elementAt(1);

    // extract the ref of the request
    OtpErlangRef ref = (OtpErlangRef) msg.elementAt(2);

    // which should be an atom of stop | request
    if(cmd instanceof OtpErlangAtom){
        OtpErlangAtom atom= (OtpErlangAtom)cmd;

        // convert the atom to a string
        String c= atom.atomValue();

        // if it is stop then we exit the loop
        if(c.equals("stop")){
            jlog.info("loop(): got stop message");
            running= false;

        // if it is request we pass the rest of the message
        // onto the code to handle the request
        }else if(c.equals("request")){
            try{
                byte [][] reply= handleRequest(from, msg);

                // send the rsponse back to the requesting node
                sendResponse(from, ref, reply);
            }catch(Exception e){
                jlog.error("loop(): got exception from handleRequest()", e);
            }
        }else{
            jlog.error("loop(): got unknown cmd: " + c);                          
        }
    }else{
        jlog.error("loop(): got unknown cmd type: " + cmd);
    }


    // sends a tuple back to requesting Pid {From, Ref, Data}
    private void sendResponse(OtpErlangPid from, OtpErlangRef ref, byte [] ba) {
        // convert the binary reply into and Erlang Binary
        OtpErlangBinary data= new OtpErlangBinary(ba);

        // make an array of Erlang Objects
        OtpErlangObject [] eoa= new OtpErlangObject[]{mbox.self(), ref, data};

        // make an Erlang tuple out of the Object array above
        OtpErlangTuple tuple= new OtpErlangTuple(eoa);

        // send the message back to requesting process Pid
        mbox.send(from, tuple);
    }

}

The worst part is converting back and forth between Erlang objects and Java objects.

You could have several of these nodes running so you can balance the load across several servers if needed.

Posted in Erlang  |  Tags erlang,jinterface  |  2 comments

Show

Using java enum for command dispatching

Posted by Jim Morris on Sun Apr 12 01:26:06 -0700 2009

I haven't blogged much about Java even though it is my primary programming language. Since I have been doing a lot of Java recently I thought I'd post something about some of the Java idioms I've used over the years.

One I have been using a lot recently is a command dispatcher where the command is text.

In some languages you would probably use a switch statement with the command string being the case statement. However Java does not allow strings as the match part of a case.

A common idiom seen is to use enums (in Java 5 and greater), as they are constants and can be used in a switch statement.

Now enums are very powerful constructs in Java, they are not simply convenient #define work-arounds, although they are commonly used that way...

    enum Days { MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY,
            SATURDAY, SUNDAY};

    switch(day){
      case MONDAY: do something with monday;
      case TUESDAY: do something with tuesday;
    }

See this for all the info on enums.

However you can do so much more with enums...

I use them as a super duper switch statement to dispatch text commands, and the cool thing is they are pretty efficient as they use a hash table lookup to do the dispatch. Here's an example of a command line app that uses the java Gnu GetOpt package with long option names that dispatches different commands that can be typed on the command line...

> doit --froboz one two three
> doit --bazznot glozny frobod

where --froboz is a command that takes three parameters, you get the idea.

package com.e4net;
import gnu.getopt.Getopt;
import gnu.getopt.LongOpt;

public class Test {
    private static void usage() {

        System.out.println("Usage: myapp [-x filename] [--command] [args...]");
        System.out.println("   where commands is one of:");
        System.out.println("   froboz param1 param2 param3");
        System.out.println("   bazznot param1 param2");
        System.exit(1);
    }

    // The various supported command line commands
    // code is the short form command argument
    // size is the number of arguments needed for this command
    private enum Commands {
        froboz('f', 3) {
            boolean doit(int off, String [] argv){
                // do whatever froboz does with three arguments
                System.out.println("In froboz with: " + argv[off] + ", " + argv[off+1] + ", " + argv[off+2]);
                return true;
            }
        },

        bazznot('b', 2) {
            boolean doit(int off, String [] argv){
                // do whatever bazznot does with two arguments
                System.out.println("In bazznot with: " + argv[off] + ", " + argv[off+1]);
                return true;
            }
        };

        // Just add more commands here....

        private final int code, size;
        Commands(int code, int size){
            this.code= code;
            this.size= size;
        }
        public int code() { return code; }
        public int size() { return size; }

        // Do the command, off is the offset within argv the commands
        // parameters start
        abstract boolean doit(int off, String [] argv);

    };

    public static void main(String[] args) {

        // build the long commands from the enum
        LongOpt[] longopts = new LongOpt[Commands.values().length+1];
        longopts[0] = new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h');

        // loops through each enum and generates the long command
        // based on the enums value, the code() is the short form of
        // the command so either --froboz can be used or -f
        int off= 1;
        for(Commands c : Commands.values()){
            longopts[off++] = new LongOpt(c.toString(), LongOpt.NO_ARGUMENT, null, c.code());
        }

        Getopt g = new Getopt("myprog", args, "x:", longopts);
        String file= "default.file";
        int c;
        boolean found= false;
        while ((c = g.getopt()) != -1 && !found) {
            int i= g.getOptind();
            switch (c) {
                case 'x': // get some file name with the -x
                    // option
                    file = g.getOptarg();
                    break;

                    // this is the help which runs with --help or -h
                case '?':
                case 'h':
                    usage();
                    break;

                // this will lookup the command and execute it
                default:
                    // see if in commands
                    String cmd= longopts[g.getLongind()].getName();
                    Commands cm= null;
                    try {
                        // lookup the text command in the enum
                        cm= Commands.valueOf(cmd);                      
                    } catch (IllegalArgumentException e) {
                        usage();
                    }

                    // check we have enough arguments for this command
                    if(args.length - i == cm.size){
                        // do the command
                        cm.doit(g.getOptind(), args);
                        found= true;
                    }else {
                        // not enough arguments so show usage
                        System.err.println("Not enough arguments for: " + cmd);
                        usage();
                    }
            }
        }
    }
}

Ok that looks like a lot of code for a command lookup/dispatcher, but the nice thing about it is it is fairly DRY, to add a new command you simply add another enum term, the rest is taken care of automatically.

The command is looked up in the switches default clause using the valueOf method of an enum which basically finds the enum that matches the given string.

It can be run thusly...

> java -cp ./java-getopt.jar:./classes com.e4net.Test --bazznot 1 2
In bazznot with: 1, 2

> java -cp ./java-getopt.jar:./classes com.e4net.Test --froboz 1 2 3
In froboz with: 1, 2, 3


> java -cp ./java-getopt.jar:./classes com.e4net.Test --blahblah 1 2 3
myprog: unrecognized option '--blahblah'
Usage: myapp [-x filename] [--command] [args...]
    where commands is one of:
    froboz param1 param2 param3
    bazznot param1 param2

A variation on this theme is if say you have an Internet server, that receives text commands over a socket, we want to lookup the command we received and dispatch the same way as we do above. This turns out to be easy, we use the valueOf method of an enum. Below we call handle() with the command we got over the wire, and it is neatly dispatched. valueOf() is the command that will do the lookup in a hash table, so is quite efficient if the command list is long.

private Commands getCommand(String command) {
    Commands cmd;
    try {
        cmd= Commands.valueOf(command.toLowerCase());
    } catch (IllegalArgumentException e) {
        cmd= null;          
    }
    return cmd;
}

public boolean handle(String command, String [] args) {     
    // lookup and execute the command
    Commands cmd= getCommand(command);
    if(cmd != null) {
        return cmd.doit(0, args);
    }else{
        System.err.println("handle() - Unknown command: " + command);
        return false;
    }       
}

The code for this article can be downloaded from here

Posted in Java  |  Tags java,enum,dispatcher  |  no comments

Show