aboutsummaryrefslogtreecommitdiff
path: root/pw_emu/py/tests/qemu_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'pw_emu/py/tests/qemu_test.py')
-rw-r--r--pw_emu/py/tests/qemu_test.py280
1 files changed, 280 insertions, 0 deletions
diff --git a/pw_emu/py/tests/qemu_test.py b/pw_emu/py/tests/qemu_test.py
new file mode 100644
index 000000000..c84fb2b6d
--- /dev/null
+++ b/pw_emu/py/tests/qemu_test.py
@@ -0,0 +1,280 @@
+#!/usr/bin/env python
+# Copyright 2023 The Pigweed Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may not
+# use this file except in compliance with the License. You may obtain a copy of
+# the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations under
+# the License.
+"""QEMU emulator tests."""
+
+import json
+import os
+import socket
+import sys
+import tempfile
+import time
+import unittest
+
+from pathlib import Path
+from typing import Any, Dict, Optional
+
+from pw_emu.core import InvalidChannelName, InvalidChannelType
+from tests.common import check_prog, ConfigHelperWithEmulator
+
+
+# TODO: b/301382004 - The Python Pigweed package install (into python-venv)
+# races with running this test and there is no way to add that package as a test
+# depedency without creating circular depedencies. This means we can't rely on
+# using Pigweed tools like pw cli or the arm-none-eabi-gdb wrapper.
+#
+# run the arm_gdb.py wrapper directly
+_arm_none_eabi_gdb_path = Path(
+ os.path.join(
+ os.environ['PW_ROOT'],
+ 'pw_env_setup',
+ 'py',
+ 'pw_env_setup',
+ 'entry_points',
+ 'arm_gdb.py',
+ )
+).resolve()
+
+
+class TestQemu(ConfigHelperWithEmulator):
+ """Tests for a valid qemu configuration."""
+
+ _config = {
+ 'gdb': ['python', str(_arm_none_eabi_gdb_path)],
+ 'qemu': {
+ 'executable': 'qemu-system-arm',
+ },
+ 'targets': {
+ 'test-target': {
+ 'ignore1': None,
+ 'qemu': {
+ 'machine': 'lm3s6965evb',
+ 'channels': {
+ 'chardevs': {
+ 'test_uart': {
+ 'id': 'serial0',
+ }
+ }
+ },
+ },
+ 'ignore2': None,
+ }
+ },
+ }
+
+ def setUp(self) -> None:
+ super().setUp()
+ # No image so start paused to avoid crashing.
+ self._emu.start(target='test-target', pause=True)
+
+ def tearDown(self) -> None:
+ self._emu.stop()
+ super().tearDown()
+
+ def test_running(self) -> None:
+ self.assertTrue(self._emu.running())
+
+ def test_list_properties(self) -> None:
+ self.assertIsNotNone(self._emu.list_properties('/machine'))
+
+ def test_get_property(self) -> None:
+ self.assertEqual(
+ self._emu.get_property('/machine', 'type'), 'lm3s6965evb-machine'
+ )
+
+ def test_set_property(self) -> None:
+ self._emu.set_property('/machine', 'graphics', False)
+ self.assertFalse(self._emu.get_property('/machine', 'graphics'))
+
+ def test_bad_channel_name(self) -> None:
+ with self.assertRaises(InvalidChannelName):
+ self._emu.get_channel_addr('serial1')
+
+ def get_reg(self, addr: int) -> bytes:
+ temp = tempfile.NamedTemporaryFile(delete=False)
+ temp.close()
+
+ res = self._emu.run_gdb_cmds(
+ [
+ f'dump val {temp.name} *(char*){addr}',
+ 'disconnect',
+ ]
+ )
+ self.assertEqual(res.returncode, 0, res.stderr.decode('ascii'))
+
+ with open(temp.name, 'rb') as file:
+ ret = file.read(1)
+
+ self.assertNotEqual(ret, b'', res.stderr.decode('ascii'))
+
+ os.unlink(temp.name)
+
+ return ret
+
+ def poll_data(self, timeout: int) -> Optional[bytes]:
+ uartris = 0x4000C03C
+ uartrd = 0x4000C000
+
+ deadline = time.monotonic() + timeout
+ while self.get_reg(uartris) == b'\x00':
+ time.sleep(0.1)
+ if time.monotonic() > deadline:
+ return None
+ return self.get_reg(uartrd)
+
+ def test_channel_stream(self) -> None:
+ ok, msg = check_prog('arm-none-eabi-gdb')
+ if not ok:
+ self.skipTest(msg)
+
+ stream = self._emu.get_channel_stream('test_uart')
+ stream.write('test\n'.encode('ascii'))
+
+ self.assertEqual(self.poll_data(5), b't')
+ self.assertEqual(self.poll_data(5), b'e')
+ self.assertEqual(self.poll_data(5), b's')
+ self.assertEqual(self.poll_data(5), b't')
+
+ def test_gdb(self) -> None:
+ self._emu.run_gdb_cmds(['c'])
+ deadline = time.monotonic() + 5
+ while self._emu.running():
+ if time.monotonic() > deadline:
+ return
+ self.assertFalse(self._emu.running())
+
+
+class TestQemuChannelsTcp(TestQemu):
+ """Tests for configurations using TCP channels."""
+
+ _config: Dict[str, Any] = {}
+ _config.update(json.loads(json.dumps(TestQemu._config)))
+ _config['qemu']['channels'] = {'type': 'tcp'}
+
+ def test_get_channel_addr(self) -> None:
+ host, port = self._emu.get_channel_addr('test_uart')
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((host, port))
+ sock.close()
+
+
+class TestQemuChannelsPty(TestQemu):
+ """Tests for configurations using PTY channels."""
+
+ _config: Dict[str, Any] = {}
+ _config.update(json.loads(json.dumps(TestQemu._config)))
+ _config['qemu']['channels'] = {'type': 'pty'}
+
+ def setUp(self):
+ if sys.platform == 'win32':
+ self.skipTest('pty not supported on win32')
+ super().setUp()
+
+ def test_get_path(self) -> None:
+ self.assertTrue(os.path.exists(self._emu.get_channel_path('test_uart')))
+
+
+class TestQemuInvalidChannelType(ConfigHelperWithEmulator):
+ """Test invalid channel type configuration."""
+
+ _config = {
+ 'qemu': {
+ 'executable': 'qemu-system-arm',
+ 'channels': {'type': 'invalid'},
+ },
+ 'targets': {
+ 'test-target': {
+ 'qemu': {
+ 'machine': 'lm3s6965evb',
+ }
+ }
+ },
+ }
+
+ def test_start(self) -> None:
+ with self.assertRaises(InvalidChannelType):
+ self._emu.start('test-target', pause=True)
+
+
+class TestQemuTargetChannelsMixed(ConfigHelperWithEmulator):
+ """Test configuration with mixed channels types."""
+
+ _config = {
+ 'qemu': {
+ 'executable': 'qemu-system-arm',
+ },
+ 'targets': {
+ 'test-target': {
+ 'qemu': {
+ 'machine': 'lm3s6965evb',
+ 'channels': {
+ 'chardevs': {
+ 'test_uart0': {
+ 'id': 'serial0',
+ },
+ 'test_uart1': {
+ 'id': 'serial1',
+ 'type': 'tcp',
+ },
+ 'test_uart2': {
+ 'id': 'serial2',
+ 'type': 'pty',
+ },
+ }
+ },
+ }
+ }
+ },
+ }
+
+ def setUp(self) -> None:
+ if sys.platform == 'win32':
+ self.skipTest('pty not supported on win32')
+ super().setUp()
+ # no image to run so start paused
+ self._emu.start('test-target', pause=True)
+
+ def tearDown(self) -> None:
+ self._emu.stop()
+ super().tearDown()
+
+ def test_uart0_addr(self) -> None:
+ host, port = self._emu.get_channel_addr('test_uart0')
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((host, port))
+ sock.close()
+
+ def test_uart1_addr(self) -> None:
+ host, port = self._emu.get_channel_addr('test_uart1')
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect((host, port))
+ sock.close()
+
+ def test_uart2_path(self) -> None:
+ self.assertTrue(
+ os.path.exists(self._emu.get_channel_path('test_uart2'))
+ )
+
+
+def main() -> None:
+ ok, msg = check_prog('qemu-system-arm')
+ if not ok:
+ print(f'skipping tests: {msg}')
+ sys.exit(0)
+
+ unittest.main()
+
+
+if __name__ == '__main__':
+ main()