Random musings on random stuff.
A few months ago, there was a thread on the then google group python-concurrency about some standard forms for showing how some libraries deal with concurrent problems. The specific example chosen looked like this:
#!/bin/shPete Fein also posted an example of this using generators, based on David Beazley's talk on python generators being used as (limited) coroutines:
tail -f /var/log/system.log |grep pants
import timeThe question/challenge raised on the list was essentially "what does this look like in your framework or system?". For some reason, someone saw fit to move the mailing list from google groups, and delete the archives, so I can't point at the thread, but I did repost my answer for what was called "99 bottles" for kamaelia on the python wiki .
import re
def follow(fname):
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
def grep(lines, pattern):
regex = re.compile(pattern)
for l in lines:
if regex.match(l):
yield l
def printer(lines):
for l in lines:
print l.strip()
f = follow('/var/log/system.log')
g = grep(f, ".*pants.*")
p = printer(g)
for i in p:
pass
tail -f /var/log/system.log |grep pantsYou aren't interested in the fact this uses 3 processes - tail, grep & parent process - but the fact that by writing it like this you're able to solve a problem quickly and simply. It also isn't particularly pretty, though I personally I view the shell version as rather elegant.
grep "foo" somefileThen grep will open the file "somefile", read it, and output lines that match the pattern and exit.
bla | grep "foo"Then grep will read values from stdin, and output lines which match the pattern. Furthermore, it will pause outputting values when bla stops pushing values into the chain, and exit when bla exits (after finishing processing stdin). ie It essentially has two modes of operating, based on getting a value or having an absent value.
def grep(lines, pattern):To this:
regex = re.compile(pattern)
for l in lines: # Note this requires an activate generator, or another iterable
if regex.match(l):
yield l
def grep(lines, pattern):We gain something that can operate very much like the command line grep. That is, it reads from its equivalent to stdin until stdin is exhausted. To indicated stdin is exhausted it simply yields - ie yields None. The caller can then go off and get more data to feed grep. Alternatively the caller can shutdown this grep at any point in time by throwing in an exception.
"To stop this generator, you need to call it's .throw() method. The wrapper could do this"
regex = re.compile(pattern)
while 1:
for l in lines(): # Note we activate the generator here inside instead
if regex.search(l):
yield l
yield
import sysThe implementation for both decorators.py and example.py above can both be found here:
import time
import re
import Axon
from Kamaelia.Chassis.Pipeline import Pipeline
from decorators import blockingProducer, TransformerGenComponent
@blockingProducer
def follow(fname):
"To stop this generator, you need to call it's .throw() method. The wrapper could do this"
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
@TransformerGenComponent
def grep(lines, pattern):
"To stop this generator, you need to call it's .throw() method"
regex = re.compile(pattern)
while 1:
for l in lines():
if regex.search(l):
yield l
yield
@TransformerGenComponent
def printer(lines):
"To stop this generator, you need to call it's .throw() method"
while 1:
for line in lines():
sys.stdout.write(line)
sys.stdout.flush()
yield
Pipeline(
follow('/var/log/system.log'),
grep(None, ".*pants.*"),
printer(None)
).run()
http://code.google.com/p/kamaelia/source/browse/trunk/Sketches/MPS/AxonDecorators/Similarly, if we wanted to use multiple processes, we could rewrite that final pipeline like this:
from Axon.experimental.Process import ProcessPipelineSpecifically the above will use 4 processes. One container process, and 3 subprocesses. (ProcessPipeline would benefit from a rewrite using multiprocess rather than pprocess though)
ProcessPipeline(
follow('/var/log/system.log'),
grep(None, ".*pants.*"),
printer(None)
).run()
def source():You could use that instead of "follow" above like this:
for i in ["hello", "world", "game", "over"]:
yield i
Pipeline(For me, this has a certain symmetry with the change from this
grep(source, ".*pants.*"),
printer(None)
).run()
tail somefile.txt | grep ".*pants.*" | cat -to this:
grep ".*pants.*" source | cat -ie if you pass in an absent value, it processes the standard inbox "inbox", rather than stdin. If you pass in a value, it's assumed to be a generator that needs activating.
import sysNow, I don't particularly like the word pythonic - maybe it is, maybe it isn't - but hopefully this example does look better than perhaps than last time! The biggest area needing work, from my perspective, in this example is the names of the decorators.
import time
import re
import Axon
from Kamaelia.Util.Backplane import Backplane, SubscribeTo, PublishTo
from Kamaelia.Chassis.Pipeline import Pipeline
from decorators import blockingProducer, TransformerGenComponent
@blockingProducer
def follow(fname):
f = file(fname)
f.seek(0,2) # go to the end
while True:
l = f.readline()
if not l: # no data
time.sleep(.1)
else:
yield l
@TransformerGenComponent
def grep(lines, pattern):
regex = re.compile(pattern)
while 1:
for l in lines():
if regex.search(l):
yield l
yield
@TransformerGenComponent
def printer(lines):
while 1:
for line in lines():
sys.stdout.write(line)
sys.stdout.flush()
yield
Backplane("RESULTS").activate()
for logfile in ["com.example.1", "com.example.2", "com.example.3","com.example.4","com.example.5"]:
Pipeline(
follow(logfile+"-access.log"),
grep(None, "POST"),
PublishTo("RESULTS")
).activate()
Pipeline(
SubscribeTo("RESULTS"),
printer(None)
).run()
For some people, comments appear bust - I'm guessing due to the version of the dojo editor I'm using. If that's the case, and you want to send feedback, places like the kamaelia google group or twitter would be good too :) Need a better blog I guess :)
Commenting before I've read the whole article.
Can you say "create the generator" rather than "activate the generator" please?
Hi,
looks nice :)
I think something closer to the unix way would be to have separate pipeline objects, rather than one. Then be able to compose those pipeline objects together.
I guess you could also overload '|' to take the component on the left and pipe to the component on the right.
Then you could have...
follow('/var/log/system.log') | grep(".*pants.*") | printer()
(well, I made some default arguments, and moved the grep search for expression to the first argument).
Well, you probably don't want to do the last one, but would be a fun hack.
cu!
It's perhaps unclear from that example, but a Pipeline is a compoent. As a result, where you say:
I think something closer to the unix way would be to have separate pipeline objects, rather than one. Then be able to compose those pipeline objects together.You can happily do that.
Pipeline(as this:
follow('/var/log/system.log'),
grep(None, ".*pants.*"),
printer(None)
).run()
Pipeline(or this:
Pipeline(
follow('/var/log/system.log'),
grep(None, ".*pants.*"),
),
printer(None)
).run()
Pipeline(or this:
Pipeline(
follow('/var/log/system.log'),
),
Pipeline(
grep(None, ".*pants.*"),
),
Pipeline(
printer(None)
),
).run()
Pipeline(etc. If you want circular pipelines, you can have those as well, or arbitrary shapes using graphlines.
follow('/var/log/system.log'),
Pipeline(
grep(None, ".*pants.*"),
printer(None)
),
).run()