mirror of
https://github.com/NixOS/nixpkgs.git
synced 2024-11-26 08:53:21 +00:00
nixos/lib/test-driver: add wait_for_qmp_event
Adds a function to wait for a new QMP event with a model filter so that you can expect specific type of events with specific payloads. e.g. a guest-reset-induced shutdown event.
This commit is contained in:
parent
5545bc83ec
commit
1196ae6e6b
@ -740,6 +740,30 @@ class Machine:
|
||||
self.booted = False
|
||||
self.connected = False
|
||||
|
||||
def wait_for_qmp_event(self, event_filter: Callable[[dict[str, Any]], bool], timeout: int = 60 * 10) -> dict[str, Any]:
|
||||
"""
|
||||
Wait for a QMP event which you can filter with the `event_filter` function.
|
||||
The function takes as an input a dictionary of the event and if it returns True, we return that event,
|
||||
if it does not, we wait for the next event and retry.
|
||||
|
||||
It will skip all events received in the meantime, if you want to keep them,
|
||||
you have to do the bookkeeping yourself and store them somewhere.
|
||||
|
||||
By default, it will wait up to 10 minutes, `timeout` is in seconds.
|
||||
"""
|
||||
if self.qmp_client is None:
|
||||
raise RuntimeError('QMP API is not ready yet, is the VM ready?')
|
||||
|
||||
start = time.time()
|
||||
while True:
|
||||
evt = self.qmp_client.wait_for_event(timeout=timeout)
|
||||
if event_filter(evt):
|
||||
return evt
|
||||
|
||||
elapsed = time.time() - start
|
||||
if elapsed >= timeout:
|
||||
raise TimeoutError
|
||||
|
||||
def get_tty_text(self, tty: str) -> str:
|
||||
status, output = self.execute(
|
||||
f"fold -w$(stty -F /dev/tty{tty} size | "
|
||||
|
Loading…
Reference in New Issue
Block a user