123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- import pipes
- import os
- import string
- import unittest
- from test.test_support import TESTFN, run_unittest, unlink, reap_children
- if os.name != 'posix':
- raise unittest.SkipTest('pipes module only works on posix')
- TESTFN2 = TESTFN + "2"
- # tr a-z A-Z is not portable, so make the ranges explicit
- s_command = 'tr %s %s' % (string.ascii_lowercase, string.ascii_uppercase)
- class SimplePipeTests(unittest.TestCase):
- def tearDown(self):
- for f in (TESTFN, TESTFN2):
- unlink(f)
- def testSimplePipe1(self):
- t = pipes.Template()
- t.append(s_command, pipes.STDIN_STDOUT)
- f = t.open(TESTFN, 'w')
- f.write('hello world #1')
- f.close()
- with open(TESTFN) as f:
- self.assertEqual(f.read(), 'HELLO WORLD #1')
- def testSimplePipe2(self):
- with open(TESTFN, 'w') as f:
- f.write('hello world #2')
- t = pipes.Template()
- t.append(s_command + ' < $IN > $OUT', pipes.FILEIN_FILEOUT)
- t.copy(TESTFN, TESTFN2)
- with open(TESTFN2) as f:
- self.assertEqual(f.read(), 'HELLO WORLD #2')
- def testSimplePipe3(self):
- with open(TESTFN, 'w') as f:
- f.write('hello world #2')
- t = pipes.Template()
- t.append(s_command + ' < $IN', pipes.FILEIN_STDOUT)
- with t.open(TESTFN, 'r') as f:
- self.assertEqual(f.read(), 'HELLO WORLD #2')
- def testEmptyPipeline1(self):
- # copy through empty pipe
- d = 'empty pipeline test COPY'
- with open(TESTFN, 'w') as f:
- f.write(d)
- with open(TESTFN2, 'w') as f:
- f.write('')
- t=pipes.Template()
- t.copy(TESTFN, TESTFN2)
- with open(TESTFN2) as f:
- self.assertEqual(f.read(), d)
- def testEmptyPipeline2(self):
- # read through empty pipe
- d = 'empty pipeline test READ'
- with open(TESTFN, 'w') as f:
- f.write(d)
- t=pipes.Template()
- with t.open(TESTFN, 'r') as f:
- self.assertEqual(f.read(), d)
- def testEmptyPipeline3(self):
- # write through empty pipe
- d = 'empty pipeline test WRITE'
- t = pipes.Template()
- with t.open(TESTFN, 'w') as f:
- f.write(d)
- with open(TESTFN) as f:
- self.assertEqual(f.read(), d)
- def testQuoting(self):
- safeunquoted = string.ascii_letters + string.digits + '@%_-+=:,./'
- unsafe = '"`$\\!'
- self.assertEqual(pipes.quote(''), "''")
- self.assertEqual(pipes.quote(safeunquoted), safeunquoted)
- self.assertEqual(pipes.quote('test file name'), "'test file name'")
- for u in unsafe:
- self.assertEqual(pipes.quote('test%sname' % u),
- "'test%sname'" % u)
- for u in unsafe:
- self.assertEqual(pipes.quote("test%s'name'" % u),
- "'test%s'\"'\"'name'\"'\"''" % u)
- def testRepr(self):
- t = pipes.Template()
- self.assertEqual(repr(t), "<Template instance, steps=[]>")
- t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
- self.assertEqual(repr(t),
- "<Template instance, steps=[('tr a-z A-Z', '--')]>")
- def testSetDebug(self):
- t = pipes.Template()
- t.debug(False)
- self.assertEqual(t.debugging, False)
- t.debug(True)
- self.assertEqual(t.debugging, True)
- def testReadOpenSink(self):
- # check calling open('r') on a pipe ending with
- # a sink raises ValueError
- t = pipes.Template()
- t.append('boguscmd', pipes.SINK)
- self.assertRaises(ValueError, t.open, 'bogusfile', 'r')
- def testWriteOpenSource(self):
- # check calling open('w') on a pipe ending with
- # a source raises ValueError
- t = pipes.Template()
- t.prepend('boguscmd', pipes.SOURCE)
- self.assertRaises(ValueError, t.open, 'bogusfile', 'w')
- def testBadAppendOptions(self):
- t = pipes.Template()
- # try a non-string command
- self.assertRaises(TypeError, t.append, 7, pipes.STDIN_STDOUT)
- # try a type that isn't recognized
- self.assertRaises(ValueError, t.append, 'boguscmd', 'xx')
- # shouldn't be able to append a source
- self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SOURCE)
- # check appending two sinks
- t = pipes.Template()
- t.append('boguscmd', pipes.SINK)
- self.assertRaises(ValueError, t.append, 'boguscmd', pipes.SINK)
- # command needing file input but with no $IN
- t = pipes.Template()
- self.assertRaises(ValueError, t.append, 'boguscmd $OUT',
- pipes.FILEIN_FILEOUT)
- t = pipes.Template()
- self.assertRaises(ValueError, t.append, 'boguscmd',
- pipes.FILEIN_STDOUT)
- # command needing file output but with no $OUT
- t = pipes.Template()
- self.assertRaises(ValueError, t.append, 'boguscmd $IN',
- pipes.FILEIN_FILEOUT)
- t = pipes.Template()
- self.assertRaises(ValueError, t.append, 'boguscmd',
- pipes.STDIN_FILEOUT)
- def testBadPrependOptions(self):
- t = pipes.Template()
- # try a non-string command
- self.assertRaises(TypeError, t.prepend, 7, pipes.STDIN_STDOUT)
- # try a type that isn't recognized
- self.assertRaises(ValueError, t.prepend, 'tr a-z A-Z', 'xx')
- # shouldn't be able to prepend a sink
- self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SINK)
- # check prepending two sources
- t = pipes.Template()
- t.prepend('boguscmd', pipes.SOURCE)
- self.assertRaises(ValueError, t.prepend, 'boguscmd', pipes.SOURCE)
- # command needing file input but with no $IN
- t = pipes.Template()
- self.assertRaises(ValueError, t.prepend, 'boguscmd $OUT',
- pipes.FILEIN_FILEOUT)
- t = pipes.Template()
- self.assertRaises(ValueError, t.prepend, 'boguscmd',
- pipes.FILEIN_STDOUT)
- # command needing file output but with no $OUT
- t = pipes.Template()
- self.assertRaises(ValueError, t.prepend, 'boguscmd $IN',
- pipes.FILEIN_FILEOUT)
- t = pipes.Template()
- self.assertRaises(ValueError, t.prepend, 'boguscmd',
- pipes.STDIN_FILEOUT)
- def testBadOpenMode(self):
- t = pipes.Template()
- self.assertRaises(ValueError, t.open, 'bogusfile', 'x')
- def testClone(self):
- t = pipes.Template()
- t.append('tr a-z A-Z', pipes.STDIN_STDOUT)
- u = t.clone()
- self.assertNotEqual(id(t), id(u))
- self.assertEqual(t.steps, u.steps)
- self.assertNotEqual(id(t.steps), id(u.steps))
- self.assertEqual(t.debugging, u.debugging)
- def test_main():
- run_unittest(SimplePipeTests)
- reap_children()
- if __name__ == "__main__":
- test_main()
|