from __future__ import print_function import sys, os, re, time, socket, select, subprocess, errno, shutil, traceback from subprocess import check_call, Popen from optparse import OptionParser __all__ = [] ################################################################## # Test structure # __all__ += ["test", "end_part", "run_tests", "get_current_test"] TESTS = [] TOTAL = POSSIBLE = 0 PART_TOTAL = PART_POSSIBLE = 0 CURRENT_TEST = None def test(points, title=None, parent=None): """Decorator for declaring test functions. If title is None, the title of the test will be derived from the function name by stripping the leading "test_" and replacing underscores with spaces.""" def register_test(fn, title=title): if not title: assert fn.__name__.startswith("test_") title = fn.__name__[5:].replace("_", " ") if parent: title = " " + title def run_test(): global TOTAL, POSSIBLE, CURRENT_TEST # Handle test dependencies if run_test.complete: return run_test.complete = True if parent: parent() # Run the test fail = None start = time.time() CURRENT_TEST = run_test sys.stdout.write("%s: " % title) sys.stdout.flush() try: fn() except AssertionError as e: fail = "".join(traceback.format_exception_only(type(e), e)) # Display and handle test result POSSIBLE += points if points: print("%s" % \ (color("red", "FAIL") if fail else color("green", "OK")), end=' ') if time.time() - start > 0.1: print("(%.1fs)" % (time.time() - start), end=' ') print() if fail: print(" %s" % fail.replace("\n", "\n ")) else: TOTAL += points for callback in run_test.on_finish: callback(fail) CURRENT_TEST = None # Record test metadata on the test wrapper function run_test.__name__ = fn.__name__ run_test.title = title run_test.complete = False run_test.on_finish = [] TESTS.append(run_test) return run_test return register_test def end_part(name): def show_part(): global PART_TOTAL, PART_POSSIBLE print("Part %s score: %d/%d" % \ (name, TOTAL - PART_TOTAL, POSSIBLE - PART_POSSIBLE)) print() PART_TOTAL, PART_POSSIBLE = TOTAL, POSSIBLE show_part.title = "" TESTS.append(show_part) def run_tests(): """Set up for testing and run the registered test functions.""" # Handle command line global options parser = OptionParser(usage="usage: %prog [-v] [filters...]") parser.add_option("-v", "--verbose", action="store_true", help="print commands") parser.add_option("--color", choices=["never", "always", "auto"], default="auto", help="never, always, or auto") (options, args) = parser.parse_args() # Start with a full build to catch build errors make() # Clean the file system if there is one reset_fs() # Run tests limit = list(map(str.lower, args)) try: for test in TESTS: if not limit or any(l in test.title.lower() for l in limit): test() if not limit: print("Score: %d/%d" % (TOTAL, POSSIBLE)) except KeyboardInterrupt: pass if TOTAL < POSSIBLE: sys.exit(1) def get_current_test(): if not CURRENT_TEST: raise RuntimeError("No test is running") return CURRENT_TEST ################################################################## # Assertions # __all__ += ["assert_equal", "assert_lines_match"] def assert_equal(got, expect, msg=""): if got == expect: return if msg: msg += "\n" raise AssertionError("%sgot:\n %s\nexpected:\n %s" % (msg, str(got).replace("\n", "\n "), str(expect).replace("\n", "\n "))) def assert_lines_match(text, *regexps, **kw): """Assert that all of regexps match some line in text. If a 'no' keyword argument is given, it must be a list of regexps that must *not* match any line in text.""" def assert_lines_match_kw(no=[]): return no no = assert_lines_match_kw(**kw) # Check text against regexps lines = text.splitlines() good = set() bad = set() for i, line in enumerate(lines): if any(re.match(r, line) for r in regexps): good.add(i) regexps = [r for r in regexps if not re.match(r, line)] if any(re.match(r, line) for r in no): bad.add(i) if not regexps and not bad: return # We failed; construct an informative failure message show = set() for lineno in good.union(bad): for offset in range(-2, 3): show.add(lineno + offset) if regexps: show.update(n for n in range(len(lines) - 5, len(lines))) msg = [] last = -1 for lineno in sorted(show): if 0 <= lineno < len(lines): if lineno != last + 1: msg.append("...") last = lineno msg.append("%s %s" % (color("red", "BAD ") if lineno in bad else color("green", "GOOD") if lineno in good else " ", lines[lineno])) if last != len(lines) - 1: msg.append("...") if bad: msg.append("unexpected lines in output") for r in regexps: msg.append(color("red", "MISSING") + " '%s'" % r) raise AssertionError("\n".join(msg)) ################################################################## # Utilities # __all__ += ["make", "maybe_unlink", "reset_fs", "color"] MAKE_TIMESTAMP = 0 def pre_make(): """Delay prior to running make to ensure file mtimes change.""" while int(time.time()) == MAKE_TIMESTAMP: time.sleep(0.1) def post_make(): """Record the time after make completes so that the next run of make can be delayed if needed.""" global MAKE_TIMESTAMP MAKE_TIMESTAMP = int(time.time()) def make(*target): pre_make() if Popen(("make",) + target).wait(): sys.exit(1) post_make() def show_command(cmd): from pipes import quote print("\n$", " ".join(map(quote, cmd))) def maybe_unlink(*paths): for path in paths: try: os.unlink(path) except EnvironmentError as e: if e.errno != errno.ENOENT: raise COLORS = {"default": "\033[0m", "red": "\033[31m", "green": "\033[32m"} def color(name, text): if options.color == "always" or (options.color == "auto" and os.isatty(1)): return COLORS[name] + text + COLORS["default"] return text def reset_fs(): if os.path.exists("obj/fs/clean-fs.img"): shutil.copyfile("obj/fs/clean-fs.img", "obj/fs/fs.img") ################################################################## # Controllers # __all__ += ["QEMU", "GDBClient"] class QEMU(object): _GDBPORT = None def __init__(self, *make_args): # Check that QEMU is not currently running try: GDBClient(self.get_gdb_port(), timeout=0).close() except socket.error: pass else: print("""\ GDB stub found on port %d. QEMU appears to already be running. Please exit it if possible or use 'killall qemu' or 'killall qemu.real'.""" % self.get_gdb_port(), file=sys.stderr) sys.exit(1) if options.verbose: show_command(("make",) + make_args) cmd = ("make", "-s", "--no-print-directory") + make_args self.proc = Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=subprocess.PIPE) # Accumulated output as a string self.output = "" # Accumulated output as a bytearray self.outbytes = bytearray() self.on_output = [] @staticmethod def get_gdb_port(): if QEMU._GDBPORT is None: p = Popen(["make", "-s", "--no-print-directory", "print-gdbport"], stdout=subprocess.PIPE) (out, _) = p.communicate() if p.returncode: raise RuntimeError( "Failed to get gdbport: make exited with %d" % p.returncode) QEMU._GDBPORT = int(out) return QEMU._GDBPORT def fileno(self): if self.proc: return self.proc.stdout.fileno() def handle_read(self): buf = os.read(self.proc.stdout.fileno(), 4096) self.outbytes.extend(buf) self.output = self.outbytes.decode("utf-8", "replace") for callback in self.on_output: callback(buf) if buf == b"": self.wait() return def wait(self): if self.proc: self.proc.wait() self.proc = None def kill(self): if self.proc: self.proc.terminate() class GDBClient(object): def __init__(self, port, timeout=15): start = time.time() while True: self.sock = socket.socket() try: self.sock.settimeout(1) self.sock.connect(("localhost", port)) break except socket.error: if time.time() >= start + timeout: raise self.__buf = "" def fileno(self): if self.sock: return self.sock.fileno() def handle_read(self): try: data = self.sock.recv(4096).decode("ascii", "replace") except socket.error: data = "" if data == "": self.sock.close() self.sock = None return self.__buf += data while True: m = re.search(r"\$([^#]*)#[0-9a-zA-Z]{2}", self.__buf) if not m: break pkt = m.group(1) self.__buf = self.__buf[m.end():] if pkt.startswith("T05"): # Breakpoint raise TerminateTest def __send(self, cmd): packet = "$%s#%02x" % (cmd, sum(map(ord, cmd)) % 256) self.sock.sendall(packet.encode("ascii")) def __send_break(self): self.sock.sendall(b"\x03") def close(self): if self.sock: self.sock.close() self.sock = None def cont(self): self.__send("c") def breakpoint(self, addr): self.__send("Z1,%x,1" % addr) ################################################################## # QEMU test runner # __all__ += ["TerminateTest", "Runner"] class TerminateTest(Exception): pass class Runner(): def __init__(self, *default_monitors): self.__default_monitors = default_monitors def run_qemu(self, *monitors, **kw): """Run a QEMU-based test. monitors should functions that will be called with this Runner instance once QEMU and GDB are started. Typically, they should register callbacks that throw TerminateTest when stop events occur. The target_base argument gives the make target to run. The make_args argument should be a list of additional arguments to pass to make. The timeout argument bounds how long to run before returning.""" def run_qemu_kw(target_base="qemu", make_args=[], timeout=30): return target_base, make_args, timeout target_base, make_args, timeout = run_qemu_kw(**kw) # Start QEMU pre_make() self.qemu = QEMU(target_base + "-nox-gdb", *make_args) self.gdb = None try: # Wait for QEMU to start or make to fail. This will set # self.gdb if QEMU starts. self.qemu.on_output = [self.__monitor_start] self.__react([self.qemu], timeout=30) self.qemu.on_output = [] if self.gdb is None: print("Failed to connect to QEMU; output:") print(self.qemu.output) sys.exit(1) post_make() # QEMU and GDB are up self.reactors = [self.qemu, self.gdb] # Start monitoring for m in self.__default_monitors + monitors: m(self) # Run and react self.gdb.cont() self.__react(self.reactors, timeout) finally: # Shutdown QEMU try: if self.gdb is None: sys.exit(1) self.qemu.kill() self.__react(self.reactors, 5) self.gdb.close() self.qemu.wait() except: print("""\ Failed to shutdown QEMU. You might need to 'killall qemu' or 'killall qemu.real'. """) raise def __monitor_start(self, output): if b"\n" in output: try: self.gdb = GDBClient(self.qemu.get_gdb_port(), timeout=30) raise TerminateTest except socket.error: pass if not len(output): raise TerminateTest def __react(self, reactors, timeout): deadline = time.time() + timeout try: while True: timeleft = deadline - time.time() if timeleft < 0: sys.stdout.write("Timeout! ") sys.stdout.flush() return rset = [r for r in reactors if r.fileno() is not None] if not rset: return rset, _, _ = select.select(rset, [], [], timeleft) for reactor in rset: reactor.handle_read() except TerminateTest: pass def user_test(self, binary, *monitors, **kw): """Run a user test using the specified binary. Monitors and keyword arguments are as for run_qemu. This runs on a disk snapshot unless the keyword argument 'snapshot' is False.""" maybe_unlink("obj/kern/init.o", "obj/kern/kernel") if kw.pop("snapshot", True): kw.setdefault("make_args", []).append("QEMUEXTRA+=-snapshot") self.run_qemu(target_base="run-%s" % binary, *monitors, **kw) def match(self, *args, **kwargs): """Shortcut to call assert_lines_match on the most recent QEMU output.""" assert_lines_match(self.qemu.output, *args, **kwargs) ################################################################## # Monitors # __all__ += ["save", "stop_breakpoint", "call_on_line", "stop_on_line"] def save(path): """Return a monitor that writes QEMU's output to path. If the test fails, copy the output to path.test-name.""" def setup_save(runner): f.seek(0) f.truncate() runner.qemu.on_output.append(f.write) get_current_test().on_finish.append(save_on_finish) def save_on_finish(fail): f.flush() save_path = path + "." + get_current_test().__name__[5:] if fail: shutil.copyfile(path, save_path) print(" QEMU output saved to %s" % save_path) elif os.path.exists(save_path): os.unlink(save_path) print(" (Old %s failure log removed)" % save_path) f = open(path, "wb") return setup_save def stop_breakpoint(addr): """Returns a monitor that stops when addr is reached. addr may be a number or the name of a symbol.""" def setup_breakpoint(runner): if isinstance(addr, str): addrs = [int(sym[:8], 16) for sym in open("obj/kern/kernel.sym") if sym[11:].strip() == addr] assert len(addrs), "Symbol %s not found" % addr runner.gdb.breakpoint(addrs[0]) else: runner.gdb.breakpoint(addr) return setup_breakpoint def call_on_line(regexp, callback): """Returns a monitor that calls 'callback' when QEMU prints a line matching 'regexp'.""" def setup_call_on_line(runner): buf = bytearray() def handle_output(output): buf.extend(output) while b"\n" in buf: line, buf[:] = buf.split(b"\n", 1) line = line.decode("utf-8", "replace") if re.match(regexp, line): callback(line) runner.qemu.on_output.append(handle_output) return setup_call_on_line def stop_on_line(regexp): """Returns a monitor that stops when QEMU prints a line matching 'regexp'.""" def stop(line): raise TerminateTest return call_on_line(regexp, stop)