Random musings on random stuff.
Earlier this evening an announcement by Pete Fein regarding the formation of a python-concurrency mailing list (aka Python Concurrency Special Interest Group) bounced past my inbox in the unladen-swallow mailing list. Naturally, given my work on Axon (a concurrency library) and Kamaelia (a bunch of components that use it), it jumped out at me as interesting. (5 minute overview for those that don't know what Kamaelia is...)
Pete then posted to the list a small collection of different ways of implementing...
#!/bin/sh... in python.
tail -f /var/log/system.log |grep pants
import timeNow, this is nice, but the core logic:
import re
def follow(fname):
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
def grep(lines, pattern):
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
yield l
def printer(lines):
for l in lines:
print l.strip()
f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
tail -f /var/log/system.log |grep pantsHas somehow become obfuscated at the end - which is a shame, because it doesn't need to be. Taking the Kamaelia version step by step, let's take that end part:
f = follow('/var/log/system.log')... and turn it into something which looks like what you'd do in Kamaelia:
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
from Kamaelia.Chassis.Pipeline import Pipeline(incidentally, to actually use separate processes, ala shell, you'd use ProcessPipeline, not Pipeline)
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
Printer(),
).run()
def follow(fname):First of all, we note that this follow source blocks - specifically calling time.sleep. Now there are other ways of doing this, but the simplest way of keeping this code structure is to just create a threaded component. We also need to capture the argument (fname) which isn't optional and has no logical default, so we need to have an __init__ method. So we grab that, store it somewhere handy, and call the super class initialiser. Pretty standard stuff.
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
import AxonThe main() method of this component now follows the same core logic as the follow() generator, as highlighted in green. The logic added in blue, is simply control logic, and is fairly commonly used, which looks like this:
class Follow(Axon.ThreadedComponent.threadedcomponent):
def __init__(self, fname, **argv):
self.fname = fname
super(Follow,self).__init__(**argv)
def main(self):
f = file(self.fname)
f.seek(0,2) # go to the end
while not self.dataReady("control"):
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
self.send(l, "outbox")
f.close()
self.send(self.recv("control"), "signal")
while not self.dataReady("control"):This allows someone to shutdown our component cleanly. Other than that, the other major difference is that this:
<Body of generator>
self.send(self.recv("control"), "signal")
yield lHas been replaced with this:
self.send(l, "outbox")
def grep(lines, pattern):With the Kamaelia equivalent looking like this:
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
yield l
def __init__(self, *args, **argd):
..
self.__dict__.update(argd)
def printer(lines):Which transforms into this:
for l in lines:
print l.strip()
class Printer(Axon.Component.component):Again the core logic is the same (green), and control logic is blue. Again, rather than needing the explicit "lines" iterator, we have a standard place for data to come into the component - the inbox "inbox" - leaving the rest of the logic essentially unchanged.
def main(self):
while not self.dataReady("control"):
for l in self.Inbox("inbox"):
print l.strip()
self.pause()
yield 1
self.send(self.recv("control"), "signal")
f = follow('/var/log/system.log')We can put the Kamaelia components together:
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
Pipeline(This is very different. The generator version creates a chain of iterators, passing them in as the first parameter to the next one in the chain, with the last item in the chain (the for loop) being the one that pulls things along. The Kamaelia version instantiates a Pipeline component which is very clearly piping the outputs from one component into the inputs of the following one.
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
Printer(),
).run()
from Kamaelia.Chassis.ConnectedServer import ServerCoreThough that opens a Follow & Grep component for every client. To use just one Follow client, you could just follow and publish the data for all clients:
def FollowPantsInMyLogs(*args):
return Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
)
ServerCore(protocol=FollowPantsInMyLogs, port=8000).run()
from Kamaelia.Util.Backplane import *
Backplane("PANTSINMYLOGS").activate()
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
PublishTo("PANTSINMYLOGS"),
).activate()
def clientProtocol(*args):
return SubscribeTo("PANTSINMYLOGS")
ServerCore(protocol=clientProtocol, port=8000).run()
Backplane("PANTSINMYLOGS").activate()... which all fairly naturally describes the higher level co-ordination going on. Now you can do all this from the ground up using plain old generators, but personally I find this approach easier to follow. Some people find others simple :)
Pipeline(
Follow('tail -f /var/log/system.log'),
Grep(".*pants.*"),
PublishTo("PANTSINMYLOGS"),
).activate()
Pipeline(
SubscribeTo("PANTSINMYLOGS"),
Printer(),
).activate()
def clientProtocol(*args):
return SubscribeTo("PANTSINMYLOGS")
ServerCore(protocol=clientProtocol, port=8000).run()