Tuesday, January 1, 2008

Pipelines Using Fibers in Ruby 1.9—Part II

In the previous post, I developed a class called PipelineElement. This made it relatively easy to create elements that act as producers and filters in a programmatic pipeline. Using it, we could write Ruby 1.9 code like:


    10.times do
puts (evens | multiples_of_three | multiples_of_seven).resume
end

The construct in the loop is a pipeline containing three chunks of code: a generator of even numbers, a filter that only passes multiples of three, and another filter that passes multiples of seven. Numbers are passed from the producer to the first filter, and then from that filter to the next, until finally popping out and being made available to puts.


However, creating these pipeline elements is still something of a pain. It turns out that we can simplify things when it comes to creating filters. In the implementation I’ll show here, we’ll only handle the case of simple transforming filters—filters that take an input, do something to it, and write the result to the filter chain.


Let’s revisit the PipelineElement class


    class PipelineElement

attr_accessor :source

def initialize
@fiber_delegate = Fiber.new do
process
end
end

def |(other)
other.source = self
other
end

def resume
@fiber_delegate.resume
end

def process
while value = input
handle_value(value)
end
end

def handle_value(value)
output(value)
end

def input
source.resume
end

def output(value)
Fiber.yield(value)
end
end

The process method is the driving loop. It reads the next input from the pipeline, then callshandle_value to deal with it. In the base class, handle_value simply echoes the input to the output-real filters subclass PipelineElement and subclass this method.


Let’s make a small change to the handle_value method.


    def handle_value(value)
output(transform(value))
end

def transform(value)
value
end

By doing this, we’ve split the transformation of the incoming value into a separate method. And the work done by this method no longer uses any of the state in the PipelineElement object, which means we can also do it in a block in the caller’s context. Let’s change our PipelineElement class to allow this. We’ll have the constructor take an optional block, and we’ll use that block in preference to thetransform. Here’s another listing, showing just the changed methods.


    class PipelineElement

def initialize(&block)
@transformer = block || method(:transform)
@fiber_delegate = Fiber.new do
process
end
end

# ...

def handle_value(value)
output(@transformer.call(value))
end
end

This illustrates a cool (and underused) feature of Ruby. Method objects (created with the method(...)call) are duck-typed with proc objects: we can use .call(params) on both. This is a great way of letting users of a class change its behavior either by subclassing and overriding a method, or by simply passing in a block.


With this change in place, we can now write transforming filters using blocks. This is a lot more compact that the previous subclassing approach.


    class Evens < PipelineElement
def process
value = 0
loop do
output(value)
value += 2
end
end
end

evens = Evens.new

tripler = PipelineElement.new {|val| val * 3}
incrementer = PipelineElement.new {|val| val + 1}

5.times do
puts (evens | tripler | incrementer ).resume
end

This outputs 1, 7, 13, 19, and 25.


Different Kinds of Filter


This approach works well if all we want is transforming filters. But what if we would also like to simplify filters that either pass of don’t pass values based on some criteria? A block would seem like a great way of specifying the condition, but we’ve already used our one block parameter up. Subclassing to the rescue. We can create two subclasses, Transformer and Filter. One sets the @transformer instance variable to any block it is passed. The other sets @filter. Here’s the relevant code:


    class PipelineElement

attr_accessor :source

def initialize(&block)
@transformer ||= method(:transform)
@filter ||= method(:filter)
@fiber_delegate = Fiber.new do
process
end
end

# ...

def handle_value(value)
output(@transformer.call(value)) if @filter.call(value)
end

def transform(value)
value
end

def filter(value)
true
end
end

class Transformer < PipelineElement
def initialize(&block)
@transformer = block
super
end
end

class Filter < PipelineElement
def initialize(&block)
@filter = block
super
end
end

Thus equipped, we can write:


    tripler          = Transformer.new {|val| val * 3}
incrementer = Transformer.new {|val| val + 1}
multiple_of_five = Filter.new {|val| val % 5 == 0}

5.times do
puts (evens | tripler | incrementer | multiple_of_five ).resume
end

Moving The Blocks Inline


Our final hack lets us move the blocks directly into the pipeline.


Let’s look at the actual pipeline code:


    puts (evens | tripler | incrementer | multiple_of_five ).resume

Those pipe characters are simply calls to the | method in class PipelineElement. And methods can take block arguments, right? So what stops us writing


    puts (evens | {|v| v*3} | {|v| v+1} | multiple_of_five ).resume

It turns out that Ruby stops us. The brace characters are taken to be hash parameters, not blocks, so Ruby gets its knickers in a twist. Fortunately, that’s easily fixed by making the method calls explicit.


    puts (evens .| {|v| v*3} .| {|v| v+1} .| multiple_of_five ).resume

Now we just need to make the | method accept an optional block. If the block is present, we use it to create a new transformer.


    def |(other=nil, &block)
other = Transformer.new(&block) if block
other.source = self
other
end

Ruby 1.9 lets you chain method calls across lines, so we can tidy up our pipeline visually.


    5.times do
puts (evens
.| {|v| v*3}
.| {|v| v+1}
.| multiple_of_five
).resume
end

A Palindrome Finder


Let’s finish with another trivial example. We’ll create a generic producer class that takes a collection and passes it, one element at a time, into the pipeline.


    class Pump < PipelineElement
def initialize(source)
@source = source
super()
end
def process
@source.each {|item| Fiber.yield item}
nil
end
end

Now we can write a simple palindrome finder (a palindrome is a word which is the same when spelled backwards).


    words = Pump.new %w{Madam, the civic radar rotator is not level.}
is_palindrome = Filter.new {|word| word == word.reverse}

pipeline = words .| {|word| word.downcase.tr("^a-z", '') } .| is_palindrome

while word = pipeline.resume
puts word
end

This outputs: madam, civic, radar, rotator, level.


But what if we instead want to show each word in the input stream, and flag it if it is a palindrome? That’s easily done, but we won’t do it the easy way. Instead, let’s show a more convoluted method, because it might be useful in the general case.


There’s no law to say that a transformer that receives a string as input has to write a string as output. It could, if it wanted to, write an array. Or a structure. So we could write:


    WordInfo = Struct.new(:original, :forwards, :backwords)

words = Pump.new %w{Madam, the civic radar rotator is not level.}

normalize = Transformer.new {|word| [word, word.downcase.tr("^a-z", '')] }

to_word_info = Transformer.new do |word, normalized|
reversed = normalized.reverse
WordInfo.new(word, normalized, reversed)
end

formatter = Transformer.new do |word_info|
if word_info.forwards == word_info.backwords
"'#{word_info.original}' is a palindrome"
else
"'#{word_info.original}' is not a palindrome"
end
end

pipeline = words | normalize | to_word_info | formatter

while word = pipeline.resume
puts word
end

This outputs


    'Madam,' is a palindrome
'the' is not a palindrome
'civic' is a palindrome
'radar' is a palindrome
'rotator' is a palindrome
'is' is not a palindrome
'not' is not a palindrome
'level.' is a palindrome

So, What’s the Point?


>


Is this a great way of writing a palindrome finder? Not really. But…


What we’ve done here is turned the way a program works on it’s head. We’ve written chunks of isolated code, each of which either filters or transforms an input. We’ve then independently knitted these chunks together. That’s a high degree of decoupling. We can also leave it until runtime to determine what gets put into the pipeline (and the order that it appears in the pipeline), which means we can move more power into the hands of our users.


Could we have done all this without Fibers? Of course. Could we do it without Ruby 1.9? Absolutely. But sometimes factors come together which lead us to experiment with new ways of thinking about our code.


This pipeline stuff is not revolutionary, and it isn’t generally applicable. But it’s fun to play with. And, for me, that’s the main thing.


A Wee Postscript


All this content is stuff that I decided not to include in the third edition of the PickAxe. It didn’t work in the section on fibers, because it uses programming techniques not yet covered. It didn’t work later because, as an example of various programming techniques, it is just too long.

No comments:

Post a Comment