Wednesday, October 10, 2007

Osh one-liner of the day (October 11, 2007)

The problem is to take a list of words as input (one per line) and print the distribution of word lengths, (e.g. 5 words of length 1, 27 words of length 2, ...). This can be done in one osh command as follows (split into multiple lines for readability):


zack$ cat /usr/share/dict/words | osh ^ \
agg -g 'word: len(word)' 0 'count, word: count + 1' ^ \
sort $
(1, 52)
(2, 155)
(3, 1351)
(4, 5110)
(5, 9987)
(6, 17477)
(7, 23734)
(8, 29926)
(9, 32380)
(10, 30867)
(11, 26010)
(12, 20460)
(13, 14937)
(14, 9763)
(15, 5924)
(16, 3377)
(17, 1813)
(18, 842)
(19, 428)
(20, 198)
(21, 82)
(22, 41)
(23, 17)
(24, 5)


How this works:

  • cat /usr/share/dict/words | : Pipes the contents of the input file to the rest of the command.
  • osh ^: Invokes the osh interpreter and pipes stdin to the first osh command. Each line of input turns into a tuple containing a string, because osh pipes tuples from one command to the next. (The piping here is done within the osh process; it's not using a pipe from the host OS.)
  • agg -g 'word: len(word)': agg is the osh command for doing aggregation, somewhat like python's reduce. -g means that multiple groups will be tracked during the aggregation, with a counter maintained for each. Groups are defined by the value of a grouping function, 'word: len(word)', which returns the length of an input word. I.e., we're going to maintain a counter for each word length observed.
  • 0: The initial value of each counter.
  • 'count, word: count + 1': A function for maintaining a group's counter. The arguments are the current count for the group, and the input word. The output from the function is an incremented count (for the word's group). Output from this agg command is a stream of pairs of the form (word length, #words of that length).
  • ^ sort: Pipe output from agg to the sort command which sorts by word length.
  • $: Print to stdout.

Tuesday, October 9, 2007

Scalable data structures in Erlang, and gen_leader

A scalable data structure is a data structure distributed across multiple nodes. You can add nodes to get more capacity, more throughput, and better availability. Erlang/OTP is ideal for building such structures, but is missing one critical piece.

I've been working on a distributed hash table (DHT) based on the idea of linear hashing. (The inventor of linear hashing, Witold Litwin, is also working along these lines.) Each node can serve a put or get request, and also owns a set of hash buckets. A mapping of hash buckets to nodes is used to dispatch a request to the right node.

When a node fails, or when a node is added, a new mapping of hash buckets to nodes is required. The question is how to create this new mapping. It is difficult or maybe impossible to guarantee that each node will derive the identical map, especially since map creation isn't perfectly synchronous across the set of nodes, and there may be changes in the node population (e.g. another node failure) during map creation.

The alternative is for one node to create the map and distribute it to other nodes. But which node? What is needed is for the set of participating nodes to agree on a node to carry out this task. This is the "leader election" problem. When a leader is needed, it is needed right away. There must be exactly one leader selected, and all participants must know who the leader is.

Leader election is a suprisingly difficult problem. Erlang/OTP does not include a leader election module. However, I ran across an implementation of leader election that would be right at home in OTP: gen_leader.

gen_leader includes a demo application, gdict. Updates go to the leader and are distributed to all nodes, while reads can be carried out locally. I had some trouble getting gdict to work. One of the gen_leader authors, Ulf Wiger, was kind enough to set me straight. My test program appears below. Some comments on the code:

  • I first wrote the test to run from one node, z@zack, and access a two-node hash table, on nodes a@zack and b@zack. I also specified that nodes a and b were both candidate leaders (second argument to gdict:new) and workers (not eligible to be leader). All that was wrong.
  • The current version of my test runs on node z, makes a, b, and z candidates, has no workers, and runs gdict:new on z.
  • It is necessary to have the nodes see each other, which I accomplished using the hack of the two rpc:call invocations. I still don't completely understand how the nodes are supposed to discover each other, but I haven't gotten very far in reading about OTP.

I haven't done any work with gen_leader other than to watch gdict work. But assuming it works as advertised, (and the authors seem to have been fanatical about testing), it is a crucial piece of software in implementing a scalable data structure in Erlang.

The gdict test code:

-module(test).
-export([main/0]).

-define(DUMP(X), io:format("~p:~p - ~p = ~p~n", [?MODULE, ?LINE, ??X, X])).


main() ->
rpc:call(a@zack, hello, hello, []),
rpc:call(b@zack, hello, hello, []),
?DUMP(node()),
?DUMP(nodes()),
Nodes = [node() | nodes()],
{ok, D} = gdict:new(test, Nodes, []),
?DUMP(D),
?DUMP(gdict:append(a, 1, D)),
?DUMP(gdict:append(b, 2, D)),
?DUMP(gdict:append(c, 3, D)),
?DUMP(gdict:find(a, D)),
?DUMP(gdict:find(b, D)),
?DUMP(gdict:find(c, D)).

Sunday, October 7, 2007

New version of osh

Osh (Object SHell) is a scripting tool I wrote for working with clusters and databases. I just released version 0.9.1. It's based on the idea of piping Python objects from one command to another. Python functions can be used to filter and transform objects. Rows from a database query turn into Python tuples. Commands can be run locally, or on all nodes of a cluster in parallel. You can use osh from the command-line, but it also has a Python API for use within Python scripts.

Osh is three years old and pretty stable, and I use it routinely for working on clusters in my work at Hitachi Data Systems. More information on osh can be found here.

Saturday, September 29, 2007

Getting started with Erlang and OTP

At work I've been helping to build a distributed system in Java for the past four years. The system has no centralized anything. We built a lot of things from scratch: a messaging layer, thread pools, node monitoring and management, leader election. And while all this was going on, we built our application. It all works, and the system is very reliable and highly available. But it took a while to get there, and we probably spent at least half our time on the distributed stuff that supports the application. I recently came across Erlang, and discovered that it solved nearly all the system-level problems we faced. If we had known about it when we started the company, it might have saved us a lot of time.

I've been going through the Armstrong book, and tried building a little multi-node program using the OTP gen_server module. I ran into a few problems, and thought I'd document the gotchas as a public service. The program is a "Hello World" server. You send a hello message to one of the server processes, and the server echoes your message and includes a count of messages processed so far.

The main problem was realizing how to use gen_server in such an environment. The various forms of gen_server:start don't appear to have any option for starting a gen_server remotely. The OTP Introduction (chapter 16) doesn't discuss this point, and the my_bank example shows everything running on the same node. Also, the my_bank example identifies the server process by name (?MODULE), so it wouldn't work for multiple banks.

Here is my test module:

-module(test).
-export([main/0]).

-include_lib("definitions.hrl").

main() ->
A = hello:start(a@zack),
B = hello:start(b@zack),
?DUMP(main, hello:hello(A, world)),
?DUMP(main, hello:hello(B, world)),
?DUMP(main, hello:hello(A, world)),
?DUMP(main, hello:hello(A, world)),
?DUMP(main, hello:hello(B, world)).

Here is the output:

test:9 - main: "hello : hello ( A , world )" =
{hello,world,1}
test:10 - main: "hello : hello ( B , world )" =
{hello,world,1}
test:11 - main: "hello : hello ( A , world )" =
{hello,world,2}
test:12 - main: "hello : hello ( A , world )" =
{hello,world,3}
test:13 - main: "hello : hello ( B , world )" =
{hello,world,2}

I started two nodes, in two different shells, as follows:

erl -noshell -sname a
erl -noshell -sname b

(Running the shell in the background: "erl ... &" doesn't seem to work. I'm guessing that the OS process blocks when it needs to write to the console. I don't really get this part; it's kind of irritating.)

The hostname is zack, which is why main() refers to nodes a@zack and b@zack. ?DUMP is a debugging macro from definitions.hrl. hello:hello is the hello function in the hello module. I pass the PID of the server I want to send the request to. The payload is world (so the message is {hello, world}).

The entire code of hello.erl is at the end of this posting, but here is the important part:

start(Node) ->
{ok, Hello} =
rpc:call(Node, gen_server, start,
[{local, ?MODULE}, ?MODULE, [], []]),
Hello.

A direct call to gen_server:start would start a gen_server locally, i.e., on the node running the test code, (this is how the my_bank example in chapter 16 is written). spawn(fun() -> gen_server:start ...) doesn't work, because then there are two processes, one started by gen_server and one from the spawn. The latter gets returned to the caller (test:main), and then main:test can't contact the gen_server. The rpc:call starts a service on Node (a@zack or b@zack, supplied by test:main), and returns the service's PID back to test:main.

I ran into one other little problem. The simple Makefile provided in chapter 6 doesn't recompile everything if an hrl file changes. So instead of supplying a rule for .erl.beam, I did this:

HEADERS = definitions.hrl

%.beam: %.erl ${HEADERS}
erlc -W $<

Here is hello.erl:

-module(hello).

-behavior(gen_server).

%% gen_server API
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).


%% hello API
-export([start/1,
stop/1,
hello/2]).

-include_lib("definitions.hrl").

%% gen_server

init([]) ->
{ok, 0}.

handle_call(stop, _From, RequestCount) ->
{stop, normal, stopped, RequestCount};
handle_call({hello, Who}, _From, RequestCount) ->
NewRequestCount = RequestCount + 1,
{reply, {hello, Who, NewRequestCount}, NewRequestCount}.

handle_cast(_Request, RequestCount) ->
{noreply, RequestCount}.

handle_info(_Info, RequestCount) ->
{noreply, RequestCount}.

terminate(_Reason, _RequestCount) ->
ok.

code_change(_OldVersion, RequestCount, _Extra) ->
{ok, RequestCount}.


%% hello

start(Node) ->
{ok, Hello} = rpc:call(Node, gen_server, start,
[{local, ?MODULE}, ?MODULE, [], []]),
Hello.

stop(P) ->
gen_server:call(P, stop).

hello(P, Who) ->
gen_server:call(P, {hello, Who}).

And here is definitions.hrl:

-define(DUMP(Label, X),
io:format("~p:~p - ~p: ~p = ~p~n",
[?MODULE, ?LINE, Label, ??X, X])).