Wednesday, October 10, 2007

Osh one-liner of the day (October 11, 2007)

The problem is to take a list of words as input (one per line) and print the distribution of word lengths, (e.g. 5 words of length 1, 27 words of length 2, ...). This can be done in one osh command as follows (split into multiple lines for readability):


zack$ cat /usr/share/dict/words | osh ^ \
agg -g 'word: len(word)' 0 'count, word: count + 1' ^ \
sort $
(1, 52)
(2, 155)
(3, 1351)
(4, 5110)
(5, 9987)
(6, 17477)
(7, 23734)
(8, 29926)
(9, 32380)
(10, 30867)
(11, 26010)
(12, 20460)
(13, 14937)
(14, 9763)
(15, 5924)
(16, 3377)
(17, 1813)
(18, 842)
(19, 428)
(20, 198)
(21, 82)
(22, 41)
(23, 17)
(24, 5)


How this works:

  • cat /usr/share/dict/words | : Pipes the contents of the input file to the rest of the command.
  • osh ^: Invokes the osh interpreter and pipes stdin to the first osh command. Each line of input turns into a tuple containing a string, because osh pipes tuples from one command to the next. (The piping here is done within the osh process; it's not using a pipe from the host OS.)
  • agg -g 'word: len(word)': agg is the osh command for doing aggregation, somewhat like python's reduce. -g means that multiple groups will be tracked during the aggregation, with a counter maintained for each. Groups are defined by the value of a grouping function, 'word: len(word)', which returns the length of an input word. I.e., we're going to maintain a counter for each word length observed.
  • 0: The initial value of each counter.
  • 'count, word: count + 1': A function for maintaining a group's counter. The arguments are the current count for the group, and the input word. The output from the function is an incremented count (for the word's group). Output from this agg command is a stream of pairs of the form (word length, #words of that length).
  • ^ sort: Pipe output from agg to the sort command which sorts by word length.
  • $: Print to stdout.

Tuesday, October 9, 2007

Scalable data structures in Erlang, and gen_leader

A scalable data structure is a data structure distributed across multiple nodes. You can add nodes to get more capacity, more throughput, and better availability. Erlang/OTP is ideal for building such structures, but is missing one critical piece.

I've been working on a distributed hash table (DHT) based on the idea of linear hashing. (The inventor of linear hashing, Witold Litwin, is also working along these lines.) Each node can serve a put or get request, and also owns a set of hash buckets. A mapping of hash buckets to nodes is used to dispatch a request to the right node.

When a node fails, or when a node is added, a new mapping of hash buckets to nodes is required. The question is how to create this new mapping. It is difficult or maybe impossible to guarantee that each node will derive the identical map, especially since map creation isn't perfectly synchronous across the set of nodes, and there may be changes in the node population (e.g. another node failure) during map creation.

The alternative is for one node to create the map and distribute it to other nodes. But which node? What is needed is for the set of participating nodes to agree on a node to carry out this task. This is the "leader election" problem. When a leader is needed, it is needed right away. There must be exactly one leader selected, and all participants must know who the leader is.

Leader election is a suprisingly difficult problem. Erlang/OTP does not include a leader election module. However, I ran across an implementation of leader election that would be right at home in OTP: gen_leader.

gen_leader includes a demo application, gdict. Updates go to the leader and are distributed to all nodes, while reads can be carried out locally. I had some trouble getting gdict to work. One of the gen_leader authors, Ulf Wiger, was kind enough to set me straight. My test program appears below. Some comments on the code:

  • I first wrote the test to run from one node, z@zack, and access a two-node hash table, on nodes a@zack and b@zack. I also specified that nodes a and b were both candidate leaders (second argument to gdict:new) and workers (not eligible to be leader). All that was wrong.
  • The current version of my test runs on node z, makes a, b, and z candidates, has no workers, and runs gdict:new on z.
  • It is necessary to have the nodes see each other, which I accomplished using the hack of the two rpc:call invocations. I still don't completely understand how the nodes are supposed to discover each other, but I haven't gotten very far in reading about OTP.

I haven't done any work with gen_leader other than to watch gdict work. But assuming it works as advertised, (and the authors seem to have been fanatical about testing), it is a crucial piece of software in implementing a scalable data structure in Erlang.

The gdict test code:

-module(test).
-export([main/0]).

-define(DUMP(X), io:format("~p:~p - ~p = ~p~n", [?MODULE, ?LINE, ??X, X])).


main() ->
rpc:call(a@zack, hello, hello, []),
rpc:call(b@zack, hello, hello, []),
?DUMP(node()),
?DUMP(nodes()),
Nodes = [node() | nodes()],
{ok, D} = gdict:new(test, Nodes, []),
?DUMP(D),
?DUMP(gdict:append(a, 1, D)),
?DUMP(gdict:append(b, 2, D)),
?DUMP(gdict:append(c, 3, D)),
?DUMP(gdict:find(a, D)),
?DUMP(gdict:find(b, D)),
?DUMP(gdict:find(c, D)).

Sunday, October 7, 2007

New version of osh

Osh (Object SHell) is a scripting tool I wrote for working with clusters and databases. I just released version 0.9.1. It's based on the idea of piping Python objects from one command to another. Python functions can be used to filter and transform objects. Rows from a database query turn into Python tuples. Commands can be run locally, or on all nodes of a cluster in parallel. You can use osh from the command-line, but it also has a Python API for use within Python scripts.

Osh is three years old and pretty stable, and I use it routinely for working on clusters in my work at Hitachi Data Systems. More information on osh can be found here.