diff --git a/tests/run-multitests.py b/tests/run-multitests.py new file mode 100755 index 0000000000..f27af3b0ed --- /dev/null +++ b/tests/run-multitests.py @@ -0,0 +1,396 @@ +#!/usr/bin/env python3 + +# This file is part of the MicroPython project, http://micropython.org/ +# The MIT License (MIT) +# Copyright (c) 2020 Damien P. George + +import sys, os, time, re, select +import argparse +import subprocess + +sys.path.append("../tools") +import pyboard + +if os.name == "nt": + CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3.exe") + MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/windows/micropython.exe") +else: + CPYTHON3 = os.getenv("MICROPY_CPYTHON3", "python3") + MICROPYTHON = os.getenv("MICROPY_MICROPYTHON", "../ports/unix/micropython") + +PYTHON_TRUTH = CPYTHON3 + +INSTANCE_READ_TIMEOUT_S = 10 + +APPEND_CODE_TEMPLATE = """ +import sys +class multitest: + @staticmethod + def flush(): + if hasattr(sys.stdout, "flush"): + sys.stdout.flush() + @staticmethod + def skip(): + print("SKIP") + multitest.flush() + raise SystemExit + @staticmethod + def next(): + print("NEXT") + multitest.flush() + @staticmethod + def globals(**gs): + for g in gs: + print("SET {{}} = {{!r}}".format(g, gs[g])) + multitest.flush() + @staticmethod + def get_network_ip(): + try: + import network + ip = network.WLAN().ifconfig()[0] + except: + ip = "127.0.0.1" + return ip + +{} + +instance{}() +multitest.flush() +""" + + +class PyInstance: + def __init__(self): + pass + + def close(self): + pass + + def prepare_script_from_file(self, filename, prepend, append): + with open(filename, "rb") as f: + script = f.read() + if prepend: + script = bytes(prepend, "ascii") + b"\n" + script + if append: + script += b"\n" + bytes(append, "ascii") + return script + + def run_file(self, filename, prepend="", append=""): + return self.run_script(self.prepare_script_from_file(filename, prepend, append)) + + def start_file(self, filename, prepend="", append=""): + return self.start_script(self.prepare_script_from_file(filename, prepend, append)) + + +class PyInstanceSubProcess(PyInstance): + def __init__(self, cmd): + self.cmd = cmd + self.popen = None + self.finished = True + + def __str__(self): + return self.cmd[0].rsplit("/")[-1] + + def run_script(self, script): + output = b"" + err = None + try: + p = subprocess.run( + self.cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, input=script + ) + output = p.stdout + except subprocess.CalledProcessError as er: + err = er + return str(output.strip(), "ascii"), err + + def start_script(self, script): + self.popen = subprocess.Popen( + self.cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT + ) + self.popen.stdin.write(script) + self.popen.stdin.close() + self.finished = False + + def stop(self): + if self.popen and self.popen.poll() is None: + self.popen.terminate() + + def readline(self): + sel = select.select([self.popen.stdout.raw], [], [], 0.1) + if not sel[0]: + self.finished = self.popen.poll() is not None + return None, None + out = self.popen.stdout.raw.readline() + if out == b"": + self.finished = self.popen.poll() is not None + return None, None + else: + return str(out.rstrip(), "ascii"), None + + def is_finished(self): + return self.finished + + def wait_finished(self): + self.popen.wait() + out = self.popen.stdout.read() + return str(out, "ascii"), "" + + +class PyInstancePyboard(PyInstance): + def __init__(self, device): + self.device = device + self.pyb = pyboard.Pyboard(device) + self.pyb.enter_raw_repl() + self.finished = True + + def __str__(self): + return self.device.rsplit("/")[-1] + + def close(self): + self.pyb.exit_raw_repl() + self.pyb.close() + + def run_script(self, script): + output = b"" + err = None + try: + self.pyb.enter_raw_repl() + output = self.pyb.exec_(script) + except pyboard.PyboardError as er: + err = er + return str(output.strip(), "ascii"), err + + def start_script(self, script): + self.pyb.enter_raw_repl() + self.pyb.exec_raw_no_follow(script) + self.finished = False + + def stop(self): + self.pyb.serial.write(b"\r\x03") + + def readline(self): + if self.finished: + return None, None + if self.pyb.serial.inWaiting() == 0: + return None, None + out = self.pyb.read_until(1, (b"\r\n", b"\x04")) + if out.endswith(b"\x04"): + self.finished = True + out = out[:-1] + err = str(self.pyb.read_until(1, b"\x04"), "ascii") + err = err[:-1] + if not out and not err: + return None, None + else: + err = None + return str(out.rstrip(), "ascii"), err + + def is_finished(self): + return self.finished + + def wait_finished(self): + out, err = self.pyb.follow(10, None) + return str(out, "ascii"), str(err, "ascii") + + +def prepare_test_file_list(test_files): + test_files2 = [] + for test_file in sorted(test_files): + num_instances = 0 + with open(test_file) as f: + for line in f: + m = re.match(r"def instance([0-9]+)\(\):", line) + if m: + num_instances = max(num_instances, int(m.group(1)) + 1) + test_files2.append((test_file, num_instances)) + return test_files2 + + +def trace_instance_output(instance_idx, line): + if cmd_args.trace_output: + t_ms = round((time.time() - trace_t0) * 1000) + print("{:6} i{} :".format(t_ms, instance_idx), line) + + +def run_test_on_instances(test_file, num_instances, instances): + global trace_t0 + trace_t0 = time.time() + + error = False + skip = False + injected_globals = "" + output = [[] for _ in range(num_instances)] + + # Start all instances running, in order, waiting until they signal they are ready + for idx in range(num_instances): + append_code = APPEND_CODE_TEMPLATE.format(injected_globals, idx) + instance = instances[idx] + instance.start_file(test_file, append=append_code) + last_read_time = time.time() + while True: + if instance.is_finished(): + break + out, err = instance.readline() + if out is None and err is None: + if time.time() > last_read_time + INSTANCE_READ_TIMEOUT_S: + output[idx].append("TIMEOUT") + error = True + break + time.sleep(0.1) + continue + last_read_time = time.time() + if out is not None: + trace_instance_output(idx, out) + if out.startswith("SET "): + injected_globals += out[4:] + "\n" + elif out == "SKIP": + skip = True + break + elif out == "NEXT": + break + else: + output[idx].append(out) + if err is not None: + trace_instance_output(idx, err) + output[idx].append(err) + error = True + + if error or skip: + break + + if not error and not skip: + # Capture output and wait for all instances to finish running + last_read_time = [time.time() for _ in range(num_instances)] + while True: + num_running = 0 + num_output = 0 + for idx in range(num_instances): + instance = instances[idx] + if instance.is_finished(): + continue + num_running += 1 + out, err = instance.readline() + if out is None and err is None: + if time.time() > last_read_time[idx] + INSTANCE_READ_TIMEOUT_S: + output[idx].append("TIMEOUT") + error = True + continue + num_output += 1 + last_read_time[idx] = time.time() + if out is not None: + trace_instance_output(idx, out) + output[idx].append(out) + if err is not None: + trace_instance_output(idx, err) + output[idx].append(err) + error = True + + if not num_output: + time.sleep(0.1) + if not num_running or error: + break + + # Stop all instances + for idx in range(num_instances): + instances[idx].stop() + + output_str = "" + for idx, lines in enumerate(output): + output_str += "--- instance{} ---\n".format(idx) + output_str += "\n".join(lines) + "\n" + + return error, skip, output_str + + +def run_tests(test_files, instances_truth, instances_test): + for test_file, num_instances in test_files: + instances_str = "|".join(str(instances_test[i]) for i in range(num_instances)) + print("{} on {}: ".format(test_file, instances_str), end="") + if cmd_args.show_output or cmd_args.trace_output: + print() + sys.stdout.flush() + + # Run test on test instances + error, skip, output_test = run_test_on_instances(test_file, num_instances, instances_test) + + if cmd_args.show_output: + print("### TEST ###") + print(output_test, end="") + + if not skip: + # Check if truth exists in a file, and read it in + test_file_expected = test_file + ".exp" + if os.path.isfile(test_file_expected): + with open(test_file_expected) as f: + output_truth = f.read() + else: + # Run test on truth instances to get expected output + _, _, output_truth = run_test_on_instances( + test_file, num_instances, instances_truth + ) + if cmd_args.show_output: + print("### TRUTH ###") + print(output_truth, end="") + + # Print result of test + if skip: + print("skip") + elif output_test == output_truth: + print("pass") + else: + print("FAIL") + if not cmd_args.show_output: + print("### TEST ###") + print(output_test, end="") + print("### TRUTH ###") + print(output_truth, end="") + + if cmd_args.show_output: + print() + + +def main(): + global cmd_args + + cmd_parser = argparse.ArgumentParser(description="Run network tests for MicroPython") + cmd_parser.add_argument( + "-s", "--show-output", action="store_true", help="show test output after running" + ) + cmd_parser.add_argument( + "-t", "--trace-output", action="store_true", help="trace test output while running" + ) + cmd_parser.add_argument( + "-i", "--instance", action="append", default=[], help="instance(s) to run the tests on" + ) + cmd_parser.add_argument("files", nargs="+", help="input test files") + cmd_args = cmd_parser.parse_args() + + test_files = prepare_test_file_list(cmd_args.files) + max_instances = max(t[1] for t in test_files) + + instances_truth = [PyInstanceSubProcess([PYTHON_TRUTH]) for _ in range(max_instances)] + + instances_test = [] + for i in cmd_args.instance: + if i.startswith("exec:"): + instances_test.append(PyInstanceSubProcess([i[len("exec:") :]])) + elif i.startswith("pyb:"): + instances_test.append(PyInstancePyboard(i[len("pyb:") :])) + else: + print("unknown instance string: {}".format(i), file=sys.stderr) + sys.exit(1) + + for _ in range(max_instances - len(instances_test)): + instances_test.append(PyInstanceSubProcess([MICROPYTHON])) + + try: + run_tests(test_files, instances_truth, instances_test) + finally: + for i in instances_truth: + i.close() + for i in instances_test: + i.close() + + +if __name__ == "__main__": + main()