'''Test driver for scripts in the cli directory.'''
-import json, errno, shutil, subprocess
+import json
+import errno
+import shutil
+import subprocess
+import difflib
from base_driver import *
class CliDriver(Driver):
def do_apitrace(self, args):
- cmd = [self.options.apitrace] + args[1:]
+ cmd = [self.options.apitrace] + args.split()
- self.output = subprocess.check_output(cmd)
+ print " ".join(cmd)
+ proc = subprocess.Popen(cmd, stdout = subprocess.PIPE)
+ self.output = proc.communicate()[0]
+
+ proc.wait()
+
+ if (self.expect_failure):
+ if (proc.returncode == 0):
+ fail("Command unexpectedly passed when expecting failure:\n " + " ".join(cmd))
+ else:
+ if (proc.returncode != 0):
+ fail("Command failed (returned non-zero):\n " + " ".join(cmd))
def do_expect(self, args):
- expected = json.loads(" ".join(args[1:]))
+ expected = eval(args)
if (self.output != expected):
- fail("Unexpected output:\n Expected: %s\n Received: %s\n" % (expected, self.output))
+ differ = difflib.Differ()
+ diff = differ.compare(expected.splitlines(1), self.output.splitlines(1))
+ diff = ''.join(diff)
+ fail("Unexpected output:\n%s\n" % diff)
def do_rm_and_mkdir(self, args):
+ args = args.split()
+
# Operate only on local directories
- dir = './' + args[1]
+ dir = './' + args[0]
# Failing to delete a directory that doesn't exist is no failure
def rmtree_onerror(function, path, excinfo):
os.makedirs(dir)
def unknown_command(self, args):
- fail('Broken test script: Unknown command: %s' % (args[0]))
+ fail('Broken test script: Unknown command: %s' % (args))
def run_script(self, cli_script):
"Execute the commands in the given cli script."
script = open(cli_script, 'rt')
while True:
+
+ self.expect_failure = False
+
line = script.readline()
+ # Exit loop on EOF
if (line == ''):
break
- cmd = line.split()
-
- if (len(cmd) == 0):
+ line = line.rstrip()
+
+ if " " in line:
+ (cmd, args) = line.split(None,1)
+ if args.startswith('r"""'):
+ while not line.endswith('"""'):
+ line = script.readline()
+ line = line.rstrip()
+ args += '\n' + line
+ else:
+ cmd = line
+ args = ''
+
+ # Ignore blank lines and comments
+ if (len(cmd) == 0 or cmd == '\n' or cmd[0] == '#'):
continue
- commands.get(cmd[0], self.unknown_command)(cmd)
+ if (cmd == 'EXPECT_FAILURE:'):
+ self.expect_failure = True
+ if " " in args:
+ (cmd, args) = args.split(None, 1)
+ else:
+ cmd = args
+ args = ''
+
+ commands.get(cmd, self.unknown_command)(args)
def run(self):
self.parseOptions()