Merge pull request #96703 from andersk/revert-test-logging

Revert “nixos/test-driver: use pythons logging module” (#96254)
This commit is contained in:
WORLDofPEACE 2020-08-30 19:08:06 -04:00 committed by GitHub
commit 5c67236602
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 222 additions and 156 deletions

View File

@ -1,14 +1,19 @@
#! /somewhere/python3
from contextlib import contextmanager, _GeneratorContextManager
from queue import Queue, Empty
from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List
from xml.sax.saxutils import XMLGenerator
import queue
import io
import _thread
import argparse
import atexit
import base64
import io
import itertools
import logging
import codecs
import os
import pathlib
import ptpython.repl
import pty
import queue
import re
import shlex
import shutil
@ -16,12 +21,9 @@ import socket
import subprocess
import sys
import tempfile
import _thread
import time
from contextlib import contextmanager
from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple
import ptpython.repl
import traceback
import unicodedata
CHAR_TO_KEY = {
"A": "shift-a",
@ -86,24 +88,13 @@ CHAR_TO_KEY = {
")": "shift-0x0B",
}
# Forward reference
# Forward references
log: "Logger"
machines: "List[Machine]"
logging.basicConfig(format="%(message)s")
logger = logging.getLogger("test-driver")
logger.setLevel(logging.INFO)
machine_colours_iter = (
"\x1b[{}m".format(x) for x in itertools.cycle(reversed(range(31, 37)))
)
class MachineLogAdapter(logging.LoggerAdapter):
def process(self, msg: str, kwargs: Any) -> Tuple[str, Any]:
return (
f"{self.extra['colour_code']}{self.extra['machine']}\x1b[39m: {msg}",
kwargs,
)
def eprint(*args: object, **kwargs: Any) -> None:
print(*args, file=sys.stderr, **kwargs)
def make_command(args: list) -> str:
@ -111,7 +102,8 @@ def make_command(args: list) -> str:
def create_vlan(vlan_nr: str) -> Tuple[str, str, "subprocess.Popen[bytes]", Any]:
logger.info(f"starting VDE switch for network {vlan_nr}")
global log
log.log("starting VDE switch for network {}".format(vlan_nr))
vde_socket = tempfile.mkdtemp(
prefix="nixos-test-vde-", suffix="-vde{}.ctl".format(vlan_nr)
)
@ -150,6 +142,70 @@ def retry(fn: Callable) -> None:
raise Exception("action timed out")
class Logger:
def __init__(self) -> None:
self.logfile = os.environ.get("LOGFILE", "/dev/null")
self.logfile_handle = codecs.open(self.logfile, "wb")
self.xml = XMLGenerator(self.logfile_handle, encoding="utf-8")
self.queue: "Queue[Dict[str, str]]" = Queue()
self.xml.startDocument()
self.xml.startElement("logfile", attrs={})
def close(self) -> None:
self.xml.endElement("logfile")
self.xml.endDocument()
self.logfile_handle.close()
def sanitise(self, message: str) -> str:
return "".join(ch for ch in message if unicodedata.category(ch)[0] != "C")
def maybe_prefix(self, message: str, attributes: Dict[str, str]) -> str:
if "machine" in attributes:
return "{}: {}".format(attributes["machine"], message)
return message
def log_line(self, message: str, attributes: Dict[str, str]) -> None:
self.xml.startElement("line", attributes)
self.xml.characters(message)
self.xml.endElement("line")
def log(self, message: str, attributes: Dict[str, str] = {}) -> None:
eprint(self.maybe_prefix(message, attributes))
self.drain_log_queue()
self.log_line(message, attributes)
def enqueue(self, message: Dict[str, str]) -> None:
self.queue.put(message)
def drain_log_queue(self) -> None:
try:
while True:
item = self.queue.get_nowait()
attributes = {"machine": item["machine"], "type": "serial"}
self.log_line(self.sanitise(item["msg"]), attributes)
except Empty:
pass
@contextmanager
def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
eprint(self.maybe_prefix(message, attributes))
self.xml.startElement("nest", attrs={})
self.xml.startElement("head", attributes)
self.xml.characters(message)
self.xml.endElement("head")
tic = time.time()
self.drain_log_queue()
yield
self.drain_log_queue()
toc = time.time()
self.log("({:.2f} seconds)".format(toc - tic))
self.xml.endElement("nest")
class Machine:
def __init__(self, args: Dict[str, Any]) -> None:
if "name" in args:
@ -179,11 +235,8 @@ class Machine:
self.pid: Optional[int] = None
self.socket = None
self.monitor: Optional[socket.socket] = None
self.logger: Logger = args["log"]
self.allow_reboot = args.get("allowReboot", False)
self.logger = MachineLogAdapter(
logger,
extra=dict(machine=self.name, colour_code=next(machine_colours_iter)),
)
@staticmethod
def create_startcommand(args: Dict[str, str]) -> str:
@ -239,6 +292,14 @@ class Machine:
def is_up(self) -> bool:
return self.booted and self.connected
def log(self, msg: str) -> None:
self.logger.log(msg, {"machine": self.name})
def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager:
my_attrs = {"machine": self.name}
my_attrs.update(attrs)
return self.logger.nested(msg, my_attrs)
def wait_for_monitor_prompt(self) -> str:
assert self.monitor is not None
answer = ""
@ -253,7 +314,7 @@ class Machine:
def send_monitor_command(self, command: str) -> str:
message = ("{}\n".format(command)).encode()
self.logger.info(f"sending monitor command: {command}")
self.log("sending monitor command: {}".format(command))
assert self.monitor is not None
self.monitor.send(message)
return self.wait_for_monitor_prompt()
@ -320,9 +381,9 @@ class Machine:
return self.execute("systemctl {}".format(q))
def require_unit_state(self, unit: str, require_state: str = "active") -> None:
self.logger.info(
f"checking if unit {unit} has reached state '{require_state}'"
)
with self.nested(
"checking if unit {} has reached state '{}'".format(unit, require_state)
):
info = self.get_unit_info(unit)
state = info["ActiveState"]
if state != require_state:
@ -331,9 +392,6 @@ class Machine:
+ "'{}' but it is in state {}".format(require_state, state)
)
def log(self, message: str) -> None:
self.logger.info(message)
def execute(self, command: str) -> Tuple[int, str]:
self.connect()
@ -356,10 +414,10 @@ class Machine:
"""Execute each command and check that it succeeds."""
output = ""
for command in commands:
self.logger.info(f"must succeed: {command}")
with self.nested("must succeed: {}".format(command)):
(status, out) = self.execute(command)
if status != 0:
self.logger.info(f"output: {out}")
self.log("output: {}".format(out))
raise Exception(
"command `{}` failed (exit code {})".format(command, status)
)
@ -370,10 +428,12 @@ class Machine:
"""Execute each command and check that it fails."""
output = ""
for command in commands:
self.logger.info(f"must fail: {command}")
with self.nested("must fail: {}".format(command)):
(status, out) = self.execute(command)
if status == 0:
raise Exception("command `{}` unexpectedly succeeded".format(command))
raise Exception(
"command `{}` unexpectedly succeeded".format(command)
)
output += out
return output
@ -388,7 +448,7 @@ class Machine:
status, output = self.execute(command)
return status == 0
self.logger.info(f"waiting for success: {command}")
with self.nested("waiting for success: {}".format(command)):
retry(check_success)
return output
@ -403,7 +463,7 @@ class Machine:
status, output = self.execute(command)
return status != 0
self.logger.info(f"waiting for failure: {command}")
with self.nested("waiting for failure: {}".format(command)):
retry(check_failure)
return output
@ -411,7 +471,7 @@ class Machine:
if not self.booted:
return
self.logger.info("waiting for the VM to power off")
with self.nested("waiting for the VM to power off"):
sys.stdout.flush()
self.process.wait()
@ -435,17 +495,17 @@ class Machine:
def tty_matches(last: bool) -> bool:
text = self.get_tty_text(tty)
if last:
self.logger.info(
self.log(
f"Last chance to match /{regexp}/ on TTY{tty}, "
f"which currently contains: {text}"
)
return len(matcher.findall(text)) > 0
self.logger.info(f"waiting for {regexp} to appear on tty {tty}")
with self.nested("waiting for {} to appear on tty {}".format(regexp, tty)):
retry(tty_matches)
def send_chars(self, chars: List[str]) -> None:
self.logger.info(f"sending keys {chars}")
with self.nested("sending keys {}".format(chars)):
for char in chars:
self.send_key(char)
@ -456,7 +516,7 @@ class Machine:
status, _ = self.execute("test -e {}".format(filename))
return status == 0
self.logger.info(f"waiting for file {filename}")
with self.nested("waiting for file {}".format(filename)):
retry(check_file)
def wait_for_open_port(self, port: int) -> None:
@ -464,7 +524,7 @@ class Machine:
status, _ = self.execute("nc -z localhost {}".format(port))
return status == 0
self.logger.info(f"waiting for TCP port {port}")
with self.nested("waiting for TCP port {}".format(port)):
retry(port_is_open)
def wait_for_closed_port(self, port: int) -> None:
@ -487,7 +547,7 @@ class Machine:
if self.connected:
return
self.logger.info("waiting for the VM to finish booting")
with self.nested("waiting for the VM to finish booting"):
self.start()
tic = time.time()
@ -495,8 +555,8 @@ class Machine:
# TODO: Timeout
toc = time.time()
self.logger.info("connected to guest root shell")
self.logger.info(f"(connecting took {toc - tic:.2f} seconds)")
self.log("connected to guest root shell")
self.log("(connecting took {:.2f} seconds)".format(toc - tic))
self.connected = True
def screenshot(self, filename: str) -> None:
@ -506,7 +566,10 @@ class Machine:
filename = os.path.join(out_dir, "{}.png".format(filename))
tmp = "{}.ppm".format(filename)
self.logger.info(f"making screenshot {filename}")
with self.nested(
"making screenshot {}".format(filename),
{"image": os.path.basename(filename)},
):
self.send_monitor_command("screendump {}".format(tmp))
ret = subprocess.run("pnmtopng {} > {}".format(tmp, filename), shell=True)
os.unlink(tmp)
@ -587,7 +650,7 @@ class Machine:
tess_args = "-c debug_file=/dev/null --psm 11 --oem 2"
self.logger.info("performing optical character recognition")
with self.nested("performing optical character recognition"):
with tempfile.NamedTemporaryFile() as tmpin:
self.send_monitor_command("screendump {}".format(tmpin.name))
@ -596,7 +659,9 @@ class Machine:
)
ret = subprocess.run(cmd, shell=True, capture_output=True)
if ret.returncode != 0:
raise Exception("OCR failed with exit code {}".format(ret.returncode))
raise Exception(
"OCR failed with exit code {}".format(ret.returncode)
)
return ret.stdout.decode("utf-8")
@ -606,15 +671,15 @@ class Machine:
matches = re.search(regex, text) is not None
if last and not matches:
self.logger.info(f"Last OCR attempt failed. Text was: {text}")
self.log("Last OCR attempt failed. Text was: {}".format(text))
return matches
self.logger.info(f"waiting for {regex} to appear on screen")
with self.nested("waiting for {} to appear on screen".format(regex)):
retry(screen_matches)
def wait_for_console_text(self, regex: str) -> None:
self.logger.info(f"waiting for {regex} to appear on console")
self.log("waiting for {} to appear on console".format(regex))
# Buffer the console output, this is needed
# to match multiline regexes.
console = io.StringIO()
@ -637,7 +702,7 @@ class Machine:
if self.booted:
return
self.logger.info("starting vm")
self.log("starting vm")
def create_socket(path: str) -> socket.socket:
if os.path.exists(path):
@ -694,7 +759,7 @@ class Machine:
# Store last serial console lines for use
# of wait_for_console_text
self.last_lines: queue.Queue = queue.Queue()
self.last_lines: Queue = Queue()
def process_serial_output() -> None:
assert self.process.stdout is not None
@ -702,7 +767,8 @@ class Machine:
# Ignore undecodable bytes that may occur in boot menus
line = _line.decode(errors="ignore").replace("\r", "").rstrip()
self.last_lines.put(line)
self.logger.info(line)
eprint("{} # {}".format(self.name, line))
self.logger.enqueue({"msg": line, "machine": self.name})
_thread.start_new_thread(process_serial_output, ())
@ -711,10 +777,10 @@ class Machine:
self.pid = self.process.pid
self.booted = True
self.logger.info(f"QEMU running (pid {self.pid})")
self.log("QEMU running (pid {})".format(self.pid))
def cleanup_statedir(self) -> None:
self.logger.info("delete the VM state directory")
self.log("delete the VM state directory")
if os.path.isfile(self.state_dir):
shutil.rmtree(self.state_dir)
@ -729,7 +795,7 @@ class Machine:
if not self.booted:
return
self.logger.info("forced crash")
self.log("forced crash")
self.send_monitor_command("quit")
self.wait_for_shutdown()
@ -749,7 +815,7 @@ class Machine:
status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
return status == 0
self.logger.info("waiting for the X11 server")
with self.nested("waiting for the X11 server"):
retry(check_x)
def get_window_names(self) -> List[str]:
@ -763,13 +829,14 @@ class Machine:
def window_is_visible(last_try: bool) -> bool:
names = self.get_window_names()
if last_try:
self.logger.info(
f"Last chance to match {regexp} on the window list, "
+ f"which currently contains: {', '.join(names)}"
self.log(
"Last chance to match {} on the window list,".format(regexp)
+ " which currently contains: "
+ ", ".join(names)
)
return any(pattern.search(name) for name in names)
self.logger.info("Waiting for a window to appear")
with self.nested("Waiting for a window to appear"):
retry(window_is_visible)
def sleep(self, secs: int) -> None:
@ -799,20 +866,21 @@ class Machine:
def create_machine(args: Dict[str, Any]) -> Machine:
global log
args["log"] = log
args["redirectSerial"] = os.environ.get("USE_SERIAL", "0") == "1"
return Machine(args)
def start_all() -> None:
global machines
logger.info("starting all VMs")
with log.nested("starting all VMs"):
for machine in machines:
machine.start()
def join_all() -> None:
global machines
logger.info("waiting for all VMs to finish")
with log.nested("waiting for all VMs to finish"):
for machine in machines:
machine.wait_for_shutdown()
@ -825,11 +893,12 @@ def run_tests() -> None:
global machines
tests = os.environ.get("tests", None)
if tests is not None:
logger.info("running the VM test script")
with log.nested("running the VM test script"):
try:
exec(tests, globals())
except Exception:
logging.exception("error:")
except Exception as e:
eprint("error: ")
traceback.print_exc()
sys.exit(1)
else:
ptpython.repl.embed(locals(), globals())
@ -843,19 +912,18 @@ def run_tests() -> None:
@contextmanager
def subtest(name: str) -> Iterator[None]:
logger.info(name)
with log.nested(name):
try:
yield
return True
except Exception as e:
logger.info(f'Test "{name}" failed with error: "{e}"')
log.log(f'Test "{name}" failed with error: "{e}"')
raise e
return False
def main() -> None:
global machines
if __name__ == "__main__":
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument(
"-K",
@ -865,6 +933,8 @@ def main() -> None:
)
(cli_args, vm_scripts) = arg_parser.parse_known_args()
log = Logger()
vlan_nrs = list(dict.fromkeys(os.environ.get("VLANS", "").split()))
vde_sockets = [create_vlan(v) for v in vlan_nrs]
for nr, vde_socket, _, _ in vde_sockets:
@ -875,27 +945,23 @@ def main() -> None:
if not cli_args.keep_vm_state:
machine.cleanup_statedir()
machine_eval = [
"global {0}; {0} = machines[{1}]".format(m.name, idx)
for idx, m in enumerate(machines)
"{0} = machines[{1}]".format(m.name, idx) for idx, m in enumerate(machines)
]
exec("\n".join(machine_eval))
@atexit.register
def clean_up() -> None:
logger.info("cleaning up")
with log.nested("cleaning up"):
for machine in machines:
if machine.pid is None:
continue
logger.info(f"killing {machine.name} (pid {machine.pid})")
log.log("killing {} (pid {})".format(machine.name, machine.pid))
machine.process.kill()
for _, _, process, _ in vde_sockets:
process.terminate()
log.close()
tic = time.time()
run_tests()
toc = time.time()
print("test script finished in {:.2f}s".format(toc - tic))
if __name__ == "__main__":
main()

View File

@ -62,7 +62,7 @@ rec {
''
mkdir -p $out
tests='exec(os.environ["testScript"])' ${driver}/bin/nixos-test-driver
LOGFILE=/dev/null tests='exec(os.environ["testScript"])' ${driver}/bin/nixos-test-driver
for i in */xchg/coverage-data; do
mkdir -p $out/coverage-data