Posted by Jim Morris
on Sat May 02 00:09:36 -0700 2009
Background
I've been using cucumber to do all my integration testing. Usually
testing over the wire to a live system, regardless of what the target
system is written in (Ruby Rails, Merb, Java etc).
I have been writing a conference server in Erlang (actually
rewriting), and have been testing it over the wire using JRuby and
Cucumber.
I have a Java library which talks the binary protocol, so
using JRuby allows me to use that library. However I need to test a
lot of edge conditions in the server, and these cannot easily be
tested using the wire protocol.
So after discovering
JInterface
I realized I could probably use JRuby and Cucumber to directly test my
Erlang gen_server based module directly.
It works pretty well, first I wrap the JInterface code in a ruby
library (actually JRuby), then I write my steps to talk to that
wrapper. I can get pretty deep in my testing, even to the point of
directly testing the returned tuplets.
I could use one of the Ruby to Erlang libraries, but the two I found
are either no longer supported (Erltricity) or too new to be stable
(erlix), so using JInterface under JRuby seems a good alternative for
now. I'll be keeping my eye on
erlix as it is under
development and seems to be written the right way.
The Code
Erlang Wrapper
OK down to business, here is the wrapper, what I have tried to do is
put all the JInterface knowledge and knowledge of Erlang types in this
wrapper, I would also put any specific calls to the Erlang Node and
unpacking of any return data here as well, but for now I'll keep it
simple.
Put this in the libs directory of the cucumber test directory, given
the standard cucumber directory hierarchy...
workdir
features
step_definitions
support
libs
You need to put the OtpErlang.jar file in the libs directory as well.
require 'java'
$LOAD_PATH.unshift(File.dirname(__FILE__))
require 'OtpErlang.jar'
import 'com.ericsson.otp.erlang.OtpNode'
import 'com.ericsson.otp.erlang.OtpSelf'
import 'com.ericsson.otp.erlang.OtpPeer'
import 'com.ericsson.otp.erlang.OtpConnection'
import 'com.ericsson.otp.erlang.OtpErlangList'
import 'com.ericsson.otp.erlang.OtpErlangObject'
import 'com.ericsson.otp.erlang.OtpErlangTuple'
import 'com.ericsson.otp.erlang.OtpErlangAtom'
import 'com.ericsson.otp.erlang.OtpErlangBinary'
import 'com.ericsson.otp.erlang.OtpErlangInt'
import 'com.ericsson.otp.erlang.OtpErlangString'
def add_method(meth, klass, &block)
klass.instance_eval { define_method meth, &block }
end
add_method(:to_erl, Symbol) {OtpErlangAtom.new(self.to_s)}
add_method(:to_erl, Integer) {OtpErlangInt.new(self)}
class Array
def to_erl
a= self.convert_to_erl
OtpErlangList.new(a.to_java(OtpErlangObject))
end
def to_tuple
a= self.convert_to_erl
OtpErlangTuple.new(a.to_java(OtpErlangObject))
end
def convert_to_erl
ar= []
self.each do |a|
if a.instance_of? Symbol
ar << a.to_erl
elsif a.kind_of? Integer
ar << a.to_erl
elsif a.instance_of? String
ar << a.to_erl
elsif a.kind_of? Array
ar << a.to_erl
else
ar << a
end
end
ar
end
end
class String
def to_erl(t=nil)
if t == :list
OtpErlangString.new(self.to_java_bytes)
else
OtpErlangBinary.new(self.to_java_bytes)
end
end
end
EList= com.ericsson.otp.erlang.OtpErlangList
Tuple= com.ericsson.otp.erlang.OtpErlangTuple
Atom= com.ericsson.otp.erlang.OtpErlangAtom
EInt= com.ericsson.otp.erlang.OtpErlangInt
ELong= com.ericsson.otp.erlang.OtpErlangLong
Binary= com.ericsson.otp.erlang.OtpErlangBinary
add_method(:to_s, Tuple) {self.toString()}
add_method(:to_s, EList) {self.toString()}
add_method(:to_s, Atom) {self.toString()}
add_method(:to_s, EInt) {self.toString()}
add_method(:to_s, ELong) {self.toString()}
add_method(:to_s, Binary) {self.binaryValue.toString()}
class Tuple
def [](i)
self.elementAt(i)
end
end
class EList
def [](i)
self.elementAt(i)
end
end
add_method(:to_ruby, Atom) {self.toString().to_sym}
add_method(:to_ruby, EInt) {self.intValue}
add_method(:to_ruby, ELong) {self.intValue}
add_method(:to_ruby, Binary) {self.binaryValue.to_a}
class ErlangWrapper
def initialize
@msgs= []
end
def connect(erlang_node, my_node)
begin
@self = OtpSelf.new(my_node)
other = OtpPeer.new(erlang_node)
@connection = @self.connect(other)
rescue
raise "could not connect to #{erlang_node} as #{my_node}: $!"
end
end
def self_pid
@self.pid()
end
def close
@connection.close
end
def connected?
@connection && @connection.isConnected()
end
def call(mod, fun, args=nil)
if args
a= args.to_erl
else
a= OtpErlangList.new
end
@connection.sendRPC(mod, fun, a)
gotrpc= false
while(!gotrpc) do
reply = @connection.receive(5000)
if reply[0].toString != 'rex'
parse_oobd_message(reply)
else
gotrpc= true
end
end
reply.elementAt(1)
end
def arity(v)
v.arity()
end
def make_args_erlang(*args)
ar= []
args.each do |a|
ar << a.to_erl
end
ar
end
def make_tuple(*args)
ar= make_args_erlang(*args)
tuple = OtpErlangTuple.new(ar.to_java(OtpErlangObject))
end
def parse_oobd_message(m)
@msgs << m
end
def get_oobd()
while @connection.msgCount > 0
m= @connection.receive()
parse_oobd_message(m)
end
unless @msgs.empty?
res= @msgs.shift
else
nil
end
end
end
So this is the wrapper, you will probably add some convenience methods
here, but these are the basic ones for this article.
What I am doing here is monkey patching some of the Ruby types and
Erlang types to be able to easily convert to and from each type, and
to produce strings etc. You can extend this to handle types that occur
commonly in your programs.
In connect I do the JInterface incantations necessary to use the rpc
call API. When using rpc to call functions on other nodes, it uses
rex, and the return tuple always starts with the atom rex. However I
have a few out of band messages coming in from the server under test
and these can come in pretty much anytime, so rather than using the
receiveRpc call I extract the messages myself, with a timeout, and if
it isn't a rex reply then I put them in a fifo for later testing.
To try to make things easier in the steps, I try to convert Ruby types
to Erlang types in any arguments I pass in, and I make some
assumptions, like a Ruby symbol becomes an Erlang atom, and that Ruby
strings become Erlang binaries (as I use binaries to store all my
strings in the server I am writing, you can change that to list based
strings if you prefer).
env.rb
Next the features/support/env.rb which sets up the environment and
provides a few helper functions to talk to the erlang wrapper, it
basically creates a new erlang_wrapper instance for each connection
and stores it so it can clean up and close the connections when done,
I actually create multiple nodes so I can have them talk with each
other, but I'll leave that for another article.
require "spec"
$LOAD_PATH.unshift(File.dirname(__FILE__) + '/../../libs')
require 'erlang_wrapper'
class Helpers
def initialize
@nodes= []
end
def create_node
n= ErlangWrapper.new
@nodes << n
n
end
def create_node_to(to_node)
n= create_node
n.connect(to_node, "test1")
n
end
def get_node(user)
@nodes[user.to_i-1]
end
end
World do
Helpers.new
end
After do
@nodes.each do |n|
if n.connected?
n.close
end
end
end
at_exit do
end
The Feature
Here is the feature for the simple server I am testing. It is a
gen_server called id_server that has one job and that is to return a
an integer every time next_value is called, this is used globally in
my system, to return a unique id for anything that calls it.
This is put in features/id_server.feature
Feature: test connection to an Erlang node
In order to test my Erlang code with Cucumber
Cucumber needs to be able to communicate with Erlang nodes
So I can write cool features to test the nodes directly
Scenario: ask id_server for next value
Given a connection to node "vs1@localhost"
When I call id_server:next_value
Then the result is a tuple of arity 2
And the tuple has the following elements:
| idx | result |
| 0 | value |
| 1 | :int |
When I call id_server:next_value
Then the 2nd value will be incremented by 1
The Steps
Lastly the basic steps go in features/step_definitions/erlang_steps
Given /^a connection to node "([^"]*)"$/ do |node|
@erl= create_node_to(node)
end
When /^I call ([^:]+):([^ ]+)$/ do |mod, fun|
erl= get_node(1)
@result= erl.call(mod, fun)
end
Then /^the result is a tuple of arity (d+)$/ do |n|
erl= get_node(1)
erl.arity(@result).should == n.to_i
end
Then /^the tuple has the following elements:$/ do |table|
erl= get_node(1)
# table is a Cucumber::Ast::Table
table.hashes.each do |h|
i = h['idx']
v = h['result']
e = @result[i.to_i]
# remember the value at this index for further tests
@values ||= []
@values[i.to_i]= e
if v[0] == ?:
case v[1..-1].to_sym
when :int then e.to_ruby.should be_kind_of(Numeric)
when :str then e.to_ruby.should be_kind_of(String)
when :atom then e.to_ruby.should be_kind_of(Symbol)
else raise "unknown type: #{v}"
end
else
e.to_s.should == v
end
end
end
Then /^the (d+)(st|nd|rd|th) value will be incremented by (d+)$/ do |idx, _, inc|
e = @result[idx.to_i-1]
o = @values[idx.to_i-1]
e.to_ruby.should == (o.to_ruby + inc.to_i)
end
So some BDD'ists in the audience may start to howl at me, when they
see I carry state around between steps specifically the @result and
@values variables, but to be honest I don't think it is a big deal
and I don't really see any way to avoid it in this case (if someone
can enlighten me please leave a comment).
What I am doing here is telling my erlang_wrapper to connect to the
specified Erlang node, make an RPC call to that node, then analyze the
results. The results stored in @results contain a tuple, whose arity
I check, then use the neat Cucumber tables feature to test each
element in the tuple. I use a convention here where I either test the
specific value or the returned type if I prefix a : to the value. I
have to do this because I do not know what the initial value will be
that the id_server will return. I store each element of the tuple in a
variable @values so I can test it later in another step.
I then call next_value again, and then use that stored value to see if
the integer has incremented, presuming I am the only one talking to
the server at this time that would be the correct result, of course if
this were a live system it could have been incremented by someone else
in the meantime, but I am not going to worry about that right now.
I hope this goes to show just how powerful Cucumber can be, with a
little help from JRuby.
Posted in
Erlang,Cucumber
|
Tags
erlang,cucumber,bdd
|
2 comments
Show
Posted by Jim Morris
on Mon Apr 13 01:11:57 -0700 2009
In Erlang they have a very neat way of reading TCP packets that have a
header that specifies how big the following packet is. So long as you
send that header as a big-endian integer, you can use the built-in
mechanism. Then gen_tcp, takes care of making sure the entire packet
is read before passing it onto you.
Here is an example of a simple server getting packets from a client
using some simple binary protocol... For instance sending this binary
packet,
<<0,0,0,10,1,2,3,4,5,6,7,8,9,0>>
the first 4 bytes are the length of the rest of the packet in this
case 10 bytes, followed by 10 bytes of data. This will be delivered in
the receive statement below when all the bytes have been read by the
low level gen_tcp code.
-module(example1).
%% state to hold the clients state
-record(state, {stuff=default}).
%% Receives a packet whose header is 4 bytes of
%% packet count in big-endian order followed by that
%% length of data as binary
listen(Port) ->
{ok, LSocket} = gen_tcp:listen(Port, [binary, {packet, 4},
{active, false}]),
do_accept(LSocket).
%% Each client gets its own process
do_accept(LSocket) ->
case gen_tcp:accept(LSocket) of
{ok, Socket} ->
Pid = spawn(fun() -> init(Socket) end),
gen_tcp:controlling_process(Socket, Pid);
{error, Reason} ->
io:format("Socket accept error: ~p~n", [Reason])
end,
do_accept(LSocket).
init(Socket) ->
State = #state{stuff=something},
loop(Socket, State).
%% the {active,once} stuff means that it will receive one message for each
%% iteration of the loop, this is good as it means TCP/IP flow control will
%% be used to throttle the client, rather than filling up the servers buffers.
%% We get one receive message for each complete packet received, and we know the packet
%% is a complete packet
loop(S, State) ->
inet:setopts(S,[{active,once}]),
receive
{tcp,S,Data} ->
NewState = process(Data, State),
loop(S, NewState);
{tcp_closed,S} ->
io:format("Socket ~w closed [~w]~n",[S,self()]),
ok
end.
process(_Data, _State) ->
%% do something cool
io:format("We got the following Data: ~p~n", [_Data]),
ok.
However if (like me) 10 years ago you made the wrong decision and used a
little-endian packet count in your binary protocol (because you were
only talking to Intel based Windows PCs back then), you are SOL. Well
not quite. A fairly simple state machine will handle the packets quite
nicely, although not as efficiently as the built-in mechanism I am
sure.
There are at least two ways to go for this one.
If you can spare two processes for each connected client (sure
processes are really cheap in Erlang) then have a process that does a
blocking tcp_gen:recv (using {active, false}, {packet, 0}) until all
the bytes are read, and send a message to the other client process
just as the gen_tcp example above does. This is pretty easy and
clean, but does use two processes per client, and my goal is to have
100,000 clients connected per server, which may be stretching even
Erlang. The example code for this case is shown at the end as example3
Write your own state machine in the one process that handles each
client, this is the approach I took, and the example code is below.
-module(example2).
%% state to hold the clients state, and the read state of the packet
-record(state, {rdstate, pktsize, pktbuf, testcount= 0}).
%% definitions for a little endian count, a little endian short and a
%% byte
-define(LONG, 32/unsigned-little-integer).
-define(SHORT, 16/unsigned-little-integer).
-define(BYTE, 8/unsigned-little-integer).
%% Receives a packet whose header is 4 bytes of packet count in
%% little-endian order followed by that length of data as binary Note
%% that packet is set to 0 as we can't use the built-in packet stuff
listen(Port) ->
{ok, LSocket} = gen_tcp:listen(Port, [binary, {packet, 0},
{active, false}]),
do_accept(LSocket).
%% Each client gets its own process
do_accept(LSocket) ->
case gen_tcp:accept(LSocket) of
{ok, Socket} ->
Pid = spawn(fun() -> init(Socket) end),
%% we need this so that the client process gets its own tcp messages
gen_tcp:controlling_process(Socket, Pid);
{error, Reason} ->
io:format("Socket accept error: ~p~n", [Reason])
end,
do_accept(LSocket).
init(Socket) ->
%% this sets the inital state of packet read state to read the
%% size and the initial number of bytes to read as 4, with an empty
%% packet buffer
State = #state{rdstate=rdsize, pktbuf= <<>>, pktsize=4},
loop(Socket, State).
%% the {active,once} stuff means that it will receive one message for each
%% iteration of the loop, this is good as it means TCP/IP flow control will
%% be used to throttle the client, rather than filling up the servers buffers.
%% We get one receive message for each complete packet received, and we know the packet
%% is a complete packet
loop(S, State) ->
inet:setopts(S,[{active,once}]),
receive
{tcp,S,Data} ->
NewState = handle_data_loop(Data, State),
loop(S, NewState);
{tcp_closed,S} ->
io:format("Socket ~w closed [~w]~n",[S,self()]),
ok
end.
%% handles reading the data packet until a complete packet is read,
%% and accumulate any excess which may be used towards the next packet
%% may execute several packets/commands if it got more than one packet.
%% stores packet information in State between reads from the client.
handle_data_loop(Data, State) ->
{NewState, Acc, ReadState, Size} =
handle_data_loop(Data, State#state.pktbuf, State#state.rdstate, State#state.pktsize, State),
NewState#state{pktbuf=Acc, pktsize=Size, rdstate=ReadState}.
%% We need this second version so we can loop and process multiple
%% packets we may have received in a single TCP/IP packet
handle_data_loop(Data, Acc, ReadState, Size, State) ->
case handle_data(ReadState, <<Acc/binary, Data/binary>>, Size) of
%% need more data from client, so needs to go back to the read
%% loop to wait for more data from the client
{need_more, Rest, NewState, NewSize} ->
{State, Rest, NewState, NewSize};
%% we got a complete packet so process it, then do a tail
%% recursive loop until all complete packets have been
%% processed
{packet, Pkt, Rest} ->
NewState= process(Pkt, State),
handle_data_loop(Rest, <<>>, rdsize, 4, NewState)
end.
%% the actual state machine that reads the packet size then the data
%% each of these handles a different state (the first parameter)
%% state 1 where we are reading the packet header but we don;t have
%% enough yet
handle_data(rdsize, Data, Size) when byte_size(Data) < Size ->
{need_more, Data, rdsize, Size};
%% state 2 where we are still reading the packet header and we now
%% have enough, we extract the little endian packet count and switch
%% state to read data
handle_data(rdsize, Data, Size) when byte_size(Data) >= Size ->
<<N:?LONG, Rest/binary>> = Data,
handle_data(rddata, Rest, N);
%% state 3 where we are reading the data portion of the packet but
%% don't have enough yet
handle_data(rddata, Data, Size) when byte_size(Data) < Size ->
{need_more, Data, rddata, Size};
%% state 4 where we are still reading the data portion of the packet,
%% and now have enough for a complete packet
%% extract the complete pkt, and any excess data
handle_data(rddata, Data, Size) when byte_size(Data) >= Size ->
<<Pkt:Size/binary, Rest/binary>> = Data,
{packet, Pkt, Rest}.
%% Actually process the packet we received
process(Data, State) ->
%% do something cool and maybe change the State
io:format("We got the following Data: ~p~n", [Data]),
%% for test purposes we just count the number of packets we got
Cnt= State#state.testcount,
State#state{testcount= Cnt+1}.
%%
%% We even have Unit tests for the state machine to test the two extreme
%% cases, 1 byte at a time, and multiple packets in one chunk
%%
-define(TEST, 1).
-ifdef(TEST).
-include_lib("/usr/local/lib/erlang/lib/eunit-2.0/include/eunit.hrl").
-endif.
-ifdef(TEST).
handle_data_1byte_at_a_time_test() ->
?assertMatch({need_more, <<3>>, rdsize, 4}, handle_data(rdsize, <<3>>, 4)),
?assertMatch({need_more, <<3,0>>, rdsize, 4}, handle_data(rdsize, <<3,0>>, 4)),
?assertMatch({need_more, <<3,0,0>>, rdsize, 4}, handle_data(rdsize, <<3,0,0>>, 4)),
?assertMatch({need_more, <<>>, rddata, 3}, handle_data(rdsize, <<3,0,0,0>>, 4)),
?assertMatch({need_more, <<11>>, rddata, 3}, handle_data(rddata, <<11>>, 3)),
?assertMatch({need_more, <<11,22>>, rddata, 3}, handle_data(rddata, <<11,22>>, 3)),
?assertMatch({packet, <<11,22,33>>, <<>>}, handle_data(rddata, <<11,22,33>>, 3)).
handle_data_loop_2_packets_all_bytes_at_once_test() ->
C1 = #state{rdstate=rdsize, pktsize=4, pktbuf = << >>, testcount=0},
Bin = <<4, 0, 0, 0, 250, 1, 0, $z, 5, 0, 0, 0, 251, 2, 0, $a, $b>>,
C2 = handle_data_loop(Bin, C1),
?assertMatch(#state{pktsize=4, rdstate=rdsize, pktbuf = <<>>, testcount=2}, C2).
-endif.
It is a little complicated because with TCP/IP you may receive too
little data, too much data or the exact amount you want. There is
usually a minimum size of TCP/IP packet, however I take the worst case
scenario and allow for a case where we could get one byte at a time
per read. There is also the case where the client actually sends
multiple packets in one TCP/IP packet, this could easily happen
depending on how you write the client, so we need to store all excess
data, and maybe process multiple packets from a single read.
Included at the end are two eunit tests to test the two extreme cases.
Now remember this is my first Erlang project, the state machine I am
using went through many iterations, as I read more about Erlang I
refactored and cleaned up the code until I got to something similar to
the above example. I welcome any feedback from Erlang Gurus.
The moral of this story is to make the right decision in the first
place and pass big-endian integers over the network, that is why it is
also called network byte order. (Hey I was young and naive ;)
To be fair this only added about 23 lines of extra code overall.
Here is the code for example 3 the case where we have two process per
client, and uses a blocking recv to get the packets. It is slightly
less code, but not nearly as sexy ;)
-module(example3).
%% state to hold the clients state
-record(state, {pid, testcount}).
-record(cc, {testcount}).
listen(Port) ->
{ok, LSocket} = gen_tcp:listen(Port, [binary, {packet, 0},
{active, false}]),
do_accept(LSocket).
%% Each client gets its own process
do_accept(LSocket) ->
case gen_tcp:accept(LSocket) of
{ok, Socket} ->
Pid = spawn(fun() -> init(Socket) end),
gen_tcp:controlling_process(Socket, Pid);
{error, Reason} ->
io:format("Socket accept error: ~p~n", [Reason])
end,
do_accept(LSocket).
%% Each client also gets another process to handle the packets
init(Socket) ->
Pid = spawn(fun() -> server(#cc{testcount=0}) end),
State = #state{testcount=0, pid=Pid},
loop(Socket, State).
%% This is a blocking loop that blocks to get exactly 4 bytes which is
%% a count then blocks reading that count number of bytes, sending a
%% message to the server process when done
loop(S, State) ->
case gen_tcp:recv(S, 4) of
{ok, B} ->
<<Cnt:32/unsigned-little-integer>> = B,
case gen_tcp:recv(S, Cnt) of
{ok, Data} ->
State#state.pid ! {mytcp, Data},
loop(S, State);
{error, closed} ->
State#state.pid ! {mytcp_closed}
end;
{error, closed} ->
State#state.pid ! {mytcp_closed}
end.
%% the actual server that handles the packets asynchronously
server(State) ->
receive
{mytcp,Data} ->
NewState = process(Data, State),
server(NewState);
{mytcp_closed} ->
io:format("my Socket closed [~w]~n",[self()]),
ok
end.
process(Data, State) ->
%% do something cool
io:format("We got the following Data: ~p, State: ~p~n", [Data, State]),
Cnt= State#cc.testcount,
State#cc{testcount= Cnt+1}.
Posted in
Erlang
|
Tags
erlang,gen_tcp
|
1 comments
Show
Posted by Jim Morris
on Sun Apr 12 16:56:50 -0700 2009
Background
I recently undertook a project to rewrite a voice conference server I
wrote over 10 years ago. This server was written in C++ running on a
Windows Server (originally NT and then win2k). The reason it was
written for win2k was to use
I/O Completion Ports
which were not (and still are not) available on Linux.
Solaris has an equivalent AIO feature but not quite the same. I/O
Completion Ports make very efficient use of multiple CPU's, and the
conference server had to be very fast and handle a large number of
concurrent connections.
The program uses a limited number of threads basically enough to keep
all the CPUs busy, it uses the
Proactor Pattern
to handle the large number of concurrent connections. Each client
connected has its own state, and every Asynchronous I/O needs to
switch to that context. Needless to say there was a lot of shared
memory and locks everywhere. The program was very complex to write,
trying to keep all those locks and memory synchronized yet avoid
deadlocks. I guess I did a good job as the servers are still running
to this day, with barely a reboot in 5 years.
However the code is tortuous, and making any changes risks introducing
deadlocks, also I wanted to get off of the windows servers and onto
Linux which all my other servers run.
I have written several scalable servers using
Java NIO,
and even though they use the
Reactor Pattern
and I avoid creating hundreds of threads, instead I have several
threads working through the ready sockets, it is still very hard to
write, even with the great new concurrency libraries,
An article comparing the two patterns is
here.
The trick is to write servers that you can simply add new servers to
increase capacity, and to allow for redundancy so if one server goes
down the service continues to run.
So having heard a lot about Erlang and how it auto-magically allowed
you to run scalable and redundant servers I had to give it a go.
I bought
Joe Armstrong's Book
to learn Erlang, it is a good book to learn the basics, but is very
short on details about OTP and how to actually use Erlang to write
scalable servers. I guess the next book in the series will cover that ;)
I actually found it very hard to wrap my brain around Erlang
initially. Having programmed in procedural and object oriented
languages for so many years the switch to a functional, non object
oriented language was tough.
Once I got over that initial hurdle (but am still struggling with it),
I was able to quickly write the basic operations of the voice server
in Erlang. I used a number of gen_servers, and tried to follow the OTP
principles as closely as possible. I still have not figured out how to
achieve the transparent scalability and redundancy that Erlang is
touted for and that my old server has. However I do see the
possibilities of making that transition.
What made the whole thing very easy is that every client connection
can have its own process and state, totally independent of other
connections. No locks are needed as there is no shared memory (except
for the large ETS tables I use to store the list of users that are
online etc). It turns out that not having to use either the Reactor or
Proactor pattern makes things very simple, although as I am using the
asynchronous mode of gen_tcp it is almost like a Proactor, as I get a
callback (actually Erlang messages) everytime a packet comes in. As
every client has its own process, and there could be about 100,000
clients concurrently connected, that is more than enough process to
keep all the processors busy, also distributing those process over
several physical servers should not be a problem (if I can figure out
a way to load balance, probably using a H/W load balancer). I won't go
into details here of gen_servers and the Erlang language as there is
plenty of documentation on the web covering those, one particularly
good series I found is
here.
I use a binary protocol between the clients and servers and Erlang is
particularly good at handling binary protocols, with its Binary type
and easy pattern matching to extract parts of the message. Packing and
unpacking binary messages is a breeze once you get the binary matching
syntax.
Java and Erlang
However (and finally getting to the point of this article) one thing I
found Erlang is particularly bad at is database access. It does have
ODBC built in, and there is a Postgresql driver too, however the code
is basically using raw SQL, and gets very ugly very fast. Now I am
used to using Java and Spring and SpringJdbc to do all my database
work, and it is a pleasure to work with. I use a common pattern of
Dao, Business logic and I/O separation, and it keeps things very clean.
Plus I already had all the database access written in Java.
So being one to use the best tool for the given situation, I wanted to
continue to use Java/Spring for all the database accesses, and use
Erlang as the server frontend.
Turns out Erlang ships with a pretty robust Java to Erlang interface
called JInterface.
Basically this allows me to write an Erlang Node in Java, which Erlang
can communicate with seamlessly using the normal Erlang message
passing systems.
I have structured the database back end as 3 basic packages. The I/O
package that contains all the knowledge of the server/server protocol
for database requests and actually runs as an Erlang node. It calls
methods in a POJO package which handles all business logic for those
requests, and the business logic POJOs access the database entirely
through calls to methods in the DAO package. The whole system is
hooked together using
Spring IoC,
and the DAO is written using
Spring JDBC.
I won't go into details here of how to write Spring based JDBC access
as that is also well documented on the
Spring site
I may write another article on Spring JDBC as it is very elegant and
easy to use.
So the focus of this article is the JInterface based part of the Java
system, which receives database requests from the Erlang based
front-end server and does whatever it needs to do getting data from the
database then replies to that request back to the calling Erlang
process.
Here is a portion of the java code setting up and handling the erlang
requests, only the JInterface portions are shown
import com.ericsson.otp.erlang.*;
public class ServerNodeImpl implements Server {
private static final Logger jlog = Logger.getLogger(ServerNodeImpl.class);
private String nodeName = "databasenode@localhost";
private String mboxName = "database";
...
private OtpMbox mbox;
// create some convenient atoms
private final OtpErlangAtom request= new OtpErlangAtom("request");
private final OtpErlangAtom authenticate= new OtpErlangAtom("authenticate");
private final OtpErlangAtom failed= new OtpErlangAtom("failed");
private final OtpErlangAtom ok= new OtpErlangAtom("ok");
// start the node up
public boolean start() {
try{
init();
}catch(IOException e){
jlog.error("start()", e);
return false;
}
// this will block and the caller will block until the loop exits
loop();
return true;
}
// stop the node
public boolean stop() {
// to stop it, try sending myself a stop message
OtpErlangObject[] msg = new OtpErlangObject[2];
msg[0] = mbox.self();
msg[1] = new OtpErlangAtom("stop");
OtpErlangTuple tuple = new OtpErlangTuple(msg);
mbox.send(mbox.self(), tuple);
return true;
}
private void init() throws IOException {
jlog.info("Node: " + nodeName + ", MBox: " + mboxName);
OtpNode self = new OtpNode(getNodeName());
mbox = self.createMbox(getMboxName());
}
/**
* We get requests which are tuples.. {from,cmd,ref,...}
* where cmd is currently an atom stop|request
*/
private void loop() {
jlog.info("loop(): starting...");
boolean running= true;
while(running){
try{
OtpErlangObject o = mbox.receive();
jlog.debug("loop(): Received Request: " + o);
if(o instanceof OtpErlangTuple){
OtpErlangTuple msg = (OtpErlangTuple) o;
OtpErlangPid from = (OtpErlangPid) msg.elementAt(0);
OtpErlangObject cmd = msg.elementAt(1);
OtpErlangRef ref = (OtpErlangRef) msg.elementAt(2);
if(cmd instanceof OtpErlangAtom){
OtpErlangAtom atom= (OtpErlangAtom)cmd;
String c= atom.atomValue();
if(c.equals("stop")){
jlog.info("loop(): got stop message");
running= false;
}else if(c.equals("request")){
try{
byte [][] reply= handleRequest(from, msg);
sendResponse(from, ref, reply);
}catch(Exception e){
jlog.error("loop(): got exception from handleRequest()", e);
}
}else{
jlog.error("loop(): got unknown cmd: " + c);
}
}else{
jlog.error("loop(): got unknown cmd type: " + cmd);
}
}else{
jlog.error("loop(): Unknown message type: " + o);
}
}catch(Exception e){
jlog.error("loop(): got exception", e);
running= false;
}
}
jlog.info("loop(): ...leaving");
}
...
// sends a tuple back to requesting Pid {From, Ref, Data}
private void sendResponse(OtpErlangPid from, OtpErlangRef ref, byte [] ba) {
OtpErlangBinary data= new OtpErlangBinary(ba);
OtpErlangObject [] eoa= new OtpErlangObject[]{mbox.self(), ref, data};
OtpErlangTuple tuple= new OtpErlangTuple(eoa);
mbox.send(from, tuple);
}
That is it, a very simple Erlang node which sits and receives
Erlang messages from another Erlang node and processes them then sends
its responses back to the requesting Erlang node.
You only need the jinterface package from the Erlang distribution, and
import it... import com.ericsson.otp.erlang.*;
To start the node listening you simply do this...
OtpNode self = new OtpNode(getNodeName());
mbox = self.createMbox(getMboxName());
Now you have a node which can be sent to using...
{database, databasenode@localhost} ! message.
The loop has the following...
OtpErlangObject o = mbox.receive();
which receives a message, which you can then process. This is a little
wordy but fairly simple...
// first check that the message is a tuple
if(o instanceof OtpErlangTuple){
// cast it into a tuple object
OtpErlangTuple msg = (OtpErlangTuple) o;
// extract the first element of the tuple
OtpErlangPid from = (OtpErlangPid) (msg.elementAt(0));
// extract the second element of the tuple
OtpErlangObject cmd = msg.elementAt(1);
// extract the ref of the request
OtpErlangRef ref = (OtpErlangRef) msg.elementAt(2);
// which should be an atom of stop | request
if(cmd instanceof OtpErlangAtom){
OtpErlangAtom atom= (OtpErlangAtom)cmd;
// convert the atom to a string
String c= atom.atomValue();
// if it is stop then we exit the loop
if(c.equals("stop")){
jlog.info("loop(): got stop message");
running= false;
// if it is request we pass the rest of the message
// onto the code to handle the request
}else if(c.equals("request")){
try{
byte [][] reply= handleRequest(from, msg);
// send the rsponse back to the requesting node
sendResponse(from, ref, reply);
}catch(Exception e){
jlog.error("loop(): got exception from handleRequest()", e);
}
}else{
jlog.error("loop(): got unknown cmd: " + c);
}
}else{
jlog.error("loop(): got unknown cmd type: " + cmd);
}
// sends a tuple back to requesting Pid {From, Ref, Data}
private void sendResponse(OtpErlangPid from, OtpErlangRef ref, byte [] ba) {
// convert the binary reply into and Erlang Binary
OtpErlangBinary data= new OtpErlangBinary(ba);
// make an array of Erlang Objects
OtpErlangObject [] eoa= new OtpErlangObject[]{mbox.self(), ref, data};
// make an Erlang tuple out of the Object array above
OtpErlangTuple tuple= new OtpErlangTuple(eoa);
// send the message back to requesting process Pid
mbox.send(from, tuple);
}
}
The worst part is converting back and forth between Erlang objects and
Java objects.
You could have several of these nodes running so you can balance the
load across several servers if needed.
Posted in
Erlang
|
Tags
erlang,jinterface
|
2 comments
Show
Posted by Jim Morris
on Sun Apr 12 01:26:06 -0700 2009
I haven't blogged much about Java even though it is my primary
programming language. Since I have been doing a lot of Java recently I
thought I'd post something about some of the Java idioms I've used
over the years.
One I have been using a lot recently is a command dispatcher where the
command is text.
In some languages you would probably use a switch statement with the
command string being the case statement. However Java does not allow
strings as the match part of a case.
A common idiom seen is to use enums (in Java 5 and greater), as they
are constants and can be used in a switch statement.
Now enums are very powerful constructs in Java, they are not simply
convenient #define work-arounds, although they are commonly used that
way...
enum Days { MONDAY, TUESDAY, WEDNESDAY, THURSDAY, FRIDAY,
SATURDAY, SUNDAY};
switch(day){
case MONDAY: do something with monday;
case TUESDAY: do something with tuesday;
}
See
this
for all the info on enums.
However you can do so much more with enums...
I use them as a super duper switch statement to dispatch text
commands, and the cool thing is they are pretty efficient as they use
a hash table lookup to do the dispatch. Here's an example of a command
line app that uses the java Gnu
GetOpt package with long option names that
dispatches different commands that can be typed on the command line...
> doit --froboz one two three
> doit --bazznot glozny frobod
where --froboz is a command that takes three parameters, you get the
idea.
package com.e4net;
import gnu.getopt.Getopt;
import gnu.getopt.LongOpt;
public class Test {
private static void usage() {
System.out.println("Usage: myapp [-x filename] [--command] [args...]");
System.out.println(" where commands is one of:");
System.out.println(" froboz param1 param2 param3");
System.out.println(" bazznot param1 param2");
System.exit(1);
}
// The various supported command line commands
// code is the short form command argument
// size is the number of arguments needed for this command
private enum Commands {
froboz('f', 3) {
boolean doit(int off, String [] argv){
// do whatever froboz does with three arguments
System.out.println("In froboz with: " + argv[off] + ", " + argv[off+1] + ", " + argv[off+2]);
return true;
}
},
bazznot('b', 2) {
boolean doit(int off, String [] argv){
// do whatever bazznot does with two arguments
System.out.println("In bazznot with: " + argv[off] + ", " + argv[off+1]);
return true;
}
};
// Just add more commands here....
private final int code, size;
Commands(int code, int size){
this.code= code;
this.size= size;
}
public int code() { return code; }
public int size() { return size; }
// Do the command, off is the offset within argv the commands
// parameters start
abstract boolean doit(int off, String [] argv);
};
public static void main(String[] args) {
// build the long commands from the enum
LongOpt[] longopts = new LongOpt[Commands.values().length+1];
longopts[0] = new LongOpt("help", LongOpt.NO_ARGUMENT, null, 'h');
// loops through each enum and generates the long command
// based on the enums value, the code() is the short form of
// the command so either --froboz can be used or -f
int off= 1;
for(Commands c : Commands.values()){
longopts[off++] = new LongOpt(c.toString(), LongOpt.NO_ARGUMENT, null, c.code());
}
Getopt g = new Getopt("myprog", args, "x:", longopts);
String file= "default.file";
int c;
boolean found= false;
while ((c = g.getopt()) != -1 && !found) {
int i= g.getOptind();
switch (c) {
case 'x': // get some file name with the -x
// option
file = g.getOptarg();
break;
// this is the help which runs with --help or -h
case '?':
case 'h':
usage();
break;
// this will lookup the command and execute it
default:
// see if in commands
String cmd= longopts[g.getLongind()].getName();
Commands cm= null;
try {
// lookup the text command in the enum
cm= Commands.valueOf(cmd);
} catch (IllegalArgumentException e) {
usage();
}
// check we have enough arguments for this command
if(args.length - i == cm.size){
// do the command
cm.doit(g.getOptind(), args);
found= true;
}else {
// not enough arguments so show usage
System.err.println("Not enough arguments for: " + cmd);
usage();
}
}
}
}
}
Ok that looks like a lot of code for a command lookup/dispatcher, but
the nice thing about it is it is fairly DRY, to add a new command you
simply add another enum term, the rest is taken care of automatically.
The command is looked up in the switches default clause using the
valueOf method of an enum which basically finds the enum that
matches the given string.
It can be run thusly...
> java -cp ./java-getopt.jar:./classes com.e4net.Test --bazznot 1 2
In bazznot with: 1, 2
> java -cp ./java-getopt.jar:./classes com.e4net.Test --froboz 1 2 3
In froboz with: 1, 2, 3
> java -cp ./java-getopt.jar:./classes com.e4net.Test --blahblah 1 2 3
myprog: unrecognized option '--blahblah'
Usage: myapp [-x filename] [--command] [args...]
where commands is one of:
froboz param1 param2 param3
bazznot param1 param2
A variation on this theme is if say you have an Internet server, that
receives text commands over a socket, we want to lookup the command we
received and dispatch the same way as we do above. This turns out to
be easy, we use the valueOf method of an enum. Below we call
handle() with the command we got over the wire, and it is neatly
dispatched. valueOf() is the command that will do the
lookup in a hash table, so is quite efficient if the command list is
long.
private Commands getCommand(String command) {
Commands cmd;
try {
cmd= Commands.valueOf(command.toLowerCase());
} catch (IllegalArgumentException e) {
cmd= null;
}
return cmd;
}
public boolean handle(String command, String [] args) {
// lookup and execute the command
Commands cmd= getCommand(command);
if(cmd != null) {
return cmd.doit(0, args);
}else{
System.err.println("handle() - Unknown command: " + command);
return false;
}
}
The code for this article can be downloaded
from here
Posted in
Java
|
Tags
java,enum,dispatcher
|
no comments
Show