Like I've said in previous posts, UNIX pipes are pretty neat. So let's say I want to replicate them in Python. First: is it even possible? Turns out yeah; Python has operator overloading. In general I am opposed to operator overloading, since it makes it possible to write really horrible code. Really horrible. On the other hand, it also enables Stupid Language Tricks, which might just balance out. :D
I'll do this in two parts, 'cos the full trick is really a combination of two stupid tricks.
First: we want a pipe-like syntax for doing stuff. We can do this by overloading the | operator in Python, using the __or__ and __ror__ functions. Initially, I tried to write classes that implemented the __or__ function to sort of stick together, and strung instances of these classes into a pipeline. Let's say I want to take the absolute value of an input, convert it to a string, and reverse the digits. (No reason - it's just an operation that takes several functions to do.) It'd look like this:
pipe_wrap(abs) | pipe_wrap(str) | pipe_wrap(lambda s: s[::-1])
This turned out to be really messy. I want to write a pipe using regular functions, and having to wrap the functions like that just looks messy.
The approach I ended up with is having "joiner" objects in between. First, I defined a joiner class which implements both __or__ and __ror__, so it can handle |s on both the left and right sides. When it's received things on both sides, it magically turns into the result of calling a provided function on the two things. Here's the class:
class incomplete_join(object): def __init__(self, compose, l=None, r=None): self.compose = compose self.l = l self.r = r def __call__(self, *args, **kwargs): raise Exception("Incomplete join: l=%s r=%s" % (repr(self.l), repr(self.r))) def _do_join(self, l=None, r=None): if l is None: l = self.l if r is None: r = self.r if l is not None and r is not None: return self.compose(l, r) return incomplete_join(self.compose, l, r) def __or__(self, right): return self._do_join(r=right) def __ror__(self, left): return self._do_join(l=left) def compose(f1, f2): return lambda x: f2(f1(x)) # Print function, because Python doesn't yet let you treat "print" as a function def prnt(x): print x j = incomplete_join(compose)
This is all we need to start composing functions together using pipes. compose implements standard functional composition, and incomplete_join handles the pipe-like syntax. To use our previous example, the following two things are equivalent:
temp = abs |j| str |j| (lambda s: s[::-1]) |j| prnt temp2 = compose(compose(compose(abs, str), (lambda s: s[::-1])), prnt)
Pretty neat! However, we started out wanting to implement pipes. We've got pipe syntax, sort of, but not pipe behavior. Ideally, we'd like to have a set of objects which can run in parallel in separate processes, which have user-defined behaviors, and which can be combined into a pipeline. So:
In the first example, compose was just a simple thing that composed functions together. What if we used something fancier in its place? What if, for instance, we had a function that took two objects (which stored their results in multiprocessing queues), and called the second one in a new process, and took care of shuffling data from the output queue of the first into the second?
import multiprocessing class queue_pipe(object): def __init__(self, func): self.func = func self.output_queue = multiprocessing.Queue(1) def __call__(self, item): if item is None: self.output_queue.put(None) else: self.output_queue.put(self.func(item)) def next(self): item = self.output_queue.get() if item is None: raise StopIteration() return item class queue_sink(object): def __init__(self, func): self.func = func def __call__(self, item): if item is not None: self.func(item) def queue_compose(q1, q2): def transfer(q1, q2): try: item = q1.next() while True: q2(item) item = q1.next() except StopIteration: q2(None) proc = multiprocessing.Process(target=transfer, args=(q1, q2)) proc.start() return q2 q = incomplete_join(queue_compose)
So what does this do? Using this, you can create a pipeline out of callable iterators. queue_pipe implements both the callable and iterator interfaces, and queue_sink is a (sort of unnecessary) callable which filters out the terminal None value in the pipe. transfer() runs in a new process, and bridges the gap between two pipe components.
An example would be a lot more descriptive here.
@queue_pipe def double(x): return x * 2 qprnt = queue_sink(prnt) iter(range(50)) |q| double |q| qprnt
This example demonstrates two different ways you can use the wrapper classes, just for the heck of it. The single line of code at the end constructs a pipeline across three processes, and executes each function for each item in the input, in a separate process. (Just like a UNIX pipe!) As an added bonus, the ends of the pipe are standard Python objects, so you can use any iterable at the front and any callable at the back. It even cleans up the processes afterward, since they exit when the end of the input is reached.
So, yeah. This is the kind of stuff I work on when I'm procrastinating on my thesis.
Disclaimer: I do not recommend using this trick in any critical code. It is a hack, and an incomplete one at that. I take no responsibility for bad things that may happen if you actually use this in production code. I cannot guarantee that this code will not fork-bomb your systems, give you the flu, or kidnap your pets.