diff options
Diffstat (limited to 'pw_emu/py/pw_emu/core.py')
-rw-r--r-- | pw_emu/py/pw_emu/core.py | 956 |
1 files changed, 956 insertions, 0 deletions
diff --git a/pw_emu/py/pw_emu/core.py b/pw_emu/py/pw_emu/core.py new file mode 100644 index 000000000..1e5c2ea9f --- /dev/null +++ b/pw_emu/py/pw_emu/core.py @@ -0,0 +1,956 @@ +# Copyright 2023 The Pigweed Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +"""Infrastructure used by the user interface or specific emulators.""" + +import io +import json +import logging +import os +import re +import socket +import subprocess +import sys +import time + +from abc import ABC, abstractmethod +from importlib import import_module +from pathlib import Path +from typing import Optional, Dict, List, Union, Any, Type + +import psutil # type: ignore + +from pw_emu.pigweed_emulators import pigweed_emulators +from pw_env_setup.config_file import load as pw_config_load +from pw_env_setup.config_file import path as pw_config_path +from serial import Serial + + +_LAUNCHER_LOG = logging.getLogger('pw_qemu.core.launcher') + + +def _stop_process(pid: int) -> None: + """Gracefully stop a running process.""" + + try: + proc = psutil.Process(pid) + proc.terminate() + try: + proc.wait(timeout=5) + except psutil.TimeoutExpired: + proc.kill() + except psutil.NoSuchProcess: + pass + + +def _get_class(name: str) -> type: + """Returns a class from a full qualified class name + (e.g. "package.module.Class"). + + """ + try: + module_path, class_name = name.rsplit('.', 1) + module = import_module(module_path) + return getattr(module, class_name) + except (ImportError, AttributeError): + raise ImportError(name) + + +class Error(Exception): + """Generic pw_emu exception.""" + + +class ConfigError(Error): + """Exception raised for configuration errors.""" + + def __init__(self, config: Optional[Path], err: str) -> None: + msg = f'{config}: {err}\n' + try: + if config: + with open(config, 'r') as file: + msg += json.dumps(json.load(file), indent=4) + except (OSError, json.decoder.JSONDecodeError): + pass + super().__init__(msg) + + +class AlreadyRunning(Error): + """Exception raised if an emulator process is already running.""" + + def __init__(self, wdir: Path) -> None: + super().__init__(f'{wdir}: emulator already started') + + +class NotRunning(Error): + """Exception raised if an emulator process is not running.""" + + def __init__(self, wdir: Path) -> None: + super().__init__(f'{wdir}: emulator not started') + + +class InvalidEmulator(Error): + """Exception raised if an different backend is running.""" + + def __init__(self, emu: str) -> None: + super().__init__(f'invalid emulator `{emu}`') + + +class InvalidTarget(Error): + """Exception raised if the target is invalid.""" + + def __init__(self, config: Path, emu: Optional[str], target: str) -> None: + emu_str = f'for `{emu}`' if emu else '' + super().__init__(f'{config}: invalid target `{target}` {emu_str}') + + +class InvalidChannelName(Error): + """Exception raised if a channel name is invalid.""" + + def __init__(self, name: str, target: str, valid: str) -> None: + msg = f""" + `{name}` is not a valid device name for {target}` + try: {valid} + """ + super().__init__(msg) + + +class InvalidChannelType(Error): + """Exception raised if a channel type is invalid.""" + + def __init__(self, name: str) -> None: + super().__init__(f'`{name}` is not a valid channel type') + + +class WrongEmulator(Error): + """Exception raised if an different backend is running.""" + + def __init__(self, exp: str, found: str) -> None: + super().__init__(f'wrong emulator: expected `{exp}, found {found}`') + + +class RunError(Error): + """Exception raised when a command failed to run.""" + + def __init__(self, proc: str, msg: str) -> None: + super().__init__(f'error running `{proc}`: {msg}') + + +class InvalidPropertyPath(Error): + """Exception raised for an invalid property path.""" + + def __init__(self, path: str) -> None: + super().__init__(f'invalid property path `{path}`') + + +class InvalidProperty(Error): + """Exception raised for an invalid property path.""" + + def __init__(self, path: str, name: str) -> None: + super().__init__(f'invalid property `{name}` at `{path}`') + + +class HandlesError(Error): + """Exception raised while trying to load emulator handles.""" + + def __init__(self, msg: str) -> None: + super().__init__(f'error loading handles: {msg}') + + +class Handles: + """Running emulator handles.""" + + class Channel: + def __init__(self, chan_type: str): + self.type = chan_type + + class PtyChannel(Channel): + def __init__(self, path: str): + super().__init__('pty') + self.path = path + + class TcpChannel(Channel): + def __init__(self, host: str, port: int): + super().__init__('tcp') + self.host = host + self.port = port + + class Proc: + def __init__(self, pid: int): + self.pid = pid + + @staticmethod + def _ser_obj(obj) -> Any: + if isinstance(obj, dict): + data = {} + for key, val in obj.items(): + data[key] = Handles._ser_obj(val) + return data + if hasattr(obj, "__iter__") and not isinstance(obj, str): + return [Handles._ser_obj(item) for item in obj] + if hasattr(obj, "__dict__"): + return Handles._ser_obj(obj.__dict__) + return obj + + def _serialize(self): + return Handles._ser_obj(self) + + def save(self, wdir: Path) -> None: + """Saves handles to the given working directory.""" + + with open(os.path.join(wdir, 'handles.json'), 'w') as file: + json.dump(self._serialize(), file) + + @staticmethod + def load(wdir: Path): + try: + with open(os.path.join(wdir, 'handles.json'), 'r') as file: + data = json.load(file) + except (KeyError, OSError, json.decoder.JSONDecodeError): + raise NotRunning(wdir) + + handles = Handles(data['emu'], data['config']) + handles.set_target(data['target']) + gdb_cmd = data.get('gdb_cmd') + if gdb_cmd: + handles.set_gdb_cmd(gdb_cmd) + for name, chan in data['channels'].items(): + chan_type = chan['type'] + if chan_type == 'tcp': + handles.add_channel_tcp(name, chan['host'], chan['port']) + elif chan_type == 'pty': + handles.add_channel_pty(name, chan['path']) + else: + raise InvalidChannelType(chan_type) + for name, proc in data['procs'].items(): + handles.add_proc(name, proc['pid']) + return handles + + def __init__(self, emu: str, config: str) -> None: + self.emu = emu + self.config = config + self.gdb_cmd: List[str] = [] + self.target = '' + self.channels: Dict[str, Handles.Channel] = {} + self.procs: Dict[str, Handles.Proc] = {} + + def add_channel_tcp(self, name: str, host: str, port: int) -> None: + """Add a TCP channel.""" + + self.channels[name] = self.TcpChannel(host, port) + + def add_channel_pty(self, name: str, path: str) -> None: + """Add a pty channel.""" + + self.channels[name] = self.PtyChannel(path) + + def add_proc(self, name: str, pid: int) -> None: + """Add a pid.""" + + self.procs[name] = self.Proc(pid) + + def set_target(self, target: str) -> None: + """Set the target.""" + + self.target = target + + def set_gdb_cmd(self, cmd: List[str]) -> None: + """Set the gdb command.""" + + self.gdb_cmd = cmd.copy() + + +def _stop_processes(handles: Handles, wdir: Path) -> None: + """Stop all processes for a (partially) running emulator instance. + + Remove pid files as well. + """ + + for _, proc in handles.procs.items(): + _stop_process(proc.pid) + path = os.path.join(wdir, f'{proc}.pid') + if os.path.exists(path): + os.unlink(path) + + +class Config: + """Get and validate options from the configuration file.""" + + def __init__( + self, + config_path: Optional[Path] = None, + target: Optional[str] = None, + emu: Optional[str] = None, + ) -> None: + """Load the emulator configuration. + + If no configuration file path is given, the root project + configuration is used. + + This method set ups the generic configuration (e.g. gdb). + + It loads emulator target files and gathers them under the 'targets' key + for each emulator backend. The 'targets' settings in the configuration + file takes precedence over the loaded target files. + + """ + try: + if config_path: + with open(config_path, 'r') as file: + config = json.load(file)['pw']['pw_emu'] + else: + config_path = pw_config_path() + config = pw_config_load()['pw']['pw_emu'] + except KeyError: + raise ConfigError(config_path, 'missing `pw_emu` configuration') + + if not config_path: + raise ConfigError(None, 'unable to deterine config path') + + if config.get('target_files'): + tmp = {} + for path in config['target_files']: + if not os.path.isabs(path): + path = os.path.join(os.path.dirname(config_path), path) + with open(path, 'r') as file: + tmp.update(json.load(file).get('targets')) + if config.get('targets'): + tmp.update(config['targets']) + config['targets'] = tmp + + self.path = config_path + self._config = {'emulators': pigweed_emulators} + self._config.update(config) + self._emu = emu + self._target = target + + def set_target(self, target: str) -> None: + """Sets the current target. + + The current target is used by the get_target method. + + """ + + self._target = target + try: + self.get(['targets', target], optional=False, entry_type=dict) + except ConfigError: + raise InvalidTarget(self.path, self._emu, self._target) + + def get_targets(self) -> List[str]: + return list(self.get(['targets'], entry_type=dict).keys()) + + def _subst(self, string: str) -> str: + """Substitutes $pw_<subst_type>{arg} statements.""" + + match = re.search(r'\$pw_([^{]+){([^}]+)}', string) + if not match: + return string + + subst_type = match.group(1) + arg = match.group(2) + + if subst_type == 'env': + value = os.environ.get(arg) + if value is None: + msg = f'Environment variable `{arg}` not set' + raise ConfigError(self.path, msg) + return string.replace(f'$pw_{subst_type}{{{arg}}}', value) + + raise ConfigError(self.path, f'Invalid substitution type: {subst_type}') + + def _subst_list(self, items: List[Any]) -> List[Any]: + new_list = [] + for item in items: + if isinstance(item, str): + new_list.append(self._subst(item)) + else: + new_list.append(item) + return new_list + + def get( + self, + keys: List[str], + optional: bool = True, + entry_type: Optional[Type] = None, + ) -> Any: + """Get a config entry. + + keys is a list of string that identifies the config entry, e.g. + ['targets', 'test-target'] is going to look in the config dicionary for + ['targets']['test-target']. + + If the option is not found and optional is True it returns None if + entry_type is none or a new (empty) object of type entry_type. + + If the option is not found an optional is False it raises ConfigError. + + If entry_type is not None it will check the option to be of + that type. If it is not it will raise ConfigError. + + """ + + keys_str = ': '.join(keys) + entry: Optional[Dict[str, Any]] = self._config + + for key in keys: + if not isinstance(entry, dict): + if optional: + if entry_type: + return entry_type() + return None + raise ConfigError(self.path, f'{keys_str}: not found') + entry = entry.get(key) + + if entry is None: + if optional: + if entry_type: + return entry_type() + return None + raise ConfigError(self.path, f'{keys_str}: not found') + + if entry_type and not isinstance(entry, entry_type): + msg = f'{keys_str}: expected entry of type `{entry_type}`' + raise ConfigError(self.path, msg) + + if isinstance(entry, str): + entry = self._subst(entry) + elif isinstance(entry, list): + entry = self._subst_list(entry) + + return entry + + def get_target( + self, + keys: List[str], + optional: bool = True, + entry_type: Optional[Type] = None, + ) -> Any: + """Get a config option starting at ['targets'][target].""" + + if not self._target: + raise Error('target not set') + return self.get(['targets', self._target] + keys, optional, entry_type) + + def get_emu( + self, + keys: List[str], + optional: bool = True, + entry_type: Optional[Type] = None, + ) -> Any: + """Get a config option starting at [emu].""" + + if not self._emu: + raise Error('emu not set') + return self.get([self._emu] + keys, optional, entry_type) + + def get_target_emu( + self, + keys: List[str], + optional: bool = True, + entry_type: Optional[Type] = None, + ) -> Any: + """Get a config option starting at ['targets'][target][emu].""" + + if not self._emu or not self._target: + raise Error('emu or target not set') + return self.get( + ['targets', self._target, self._emu] + keys, optional, entry_type + ) + + +class Connector(ABC): + """Interface between a running emulator and the user visible APIs.""" + + def __init__(self, wdir: Path) -> None: + self._wdir = wdir + self._handles = Handles.load(wdir) + self._channels = self._handles.channels + self._target = self._handles.target + + @staticmethod + def get(wdir: Path) -> Any: + """Return a connector instace for a given emulator type.""" + handles = Handles.load(wdir) + config = Config(handles.config) + emu = handles.emu + try: + name = config.get(['emulators', emu, 'connector']) + cls = _get_class(name) + except (ConfigError, ImportError): + raise InvalidEmulator(emu) + return cls(wdir) + + def get_emu(self) -> str: + """Returns the emulator type.""" + + return self._handles.emu + + def get_gdb_cmd(self) -> List[str]: + """Returns the configured gdb command.""" + return self._handles.gdb_cmd + + def get_config_path(self) -> Path: + """Returns the configuration path.""" + + return self._handles.config + + def get_procs(self) -> Dict[str, Handles.Proc]: + """Returns the running processes indexed by the process name.""" + + return self._handles.procs + + def get_channel_type(self, name: str) -> str: + """Returns the channel type.""" + + try: + return self._channels[name].type + except KeyError: + channels = ' '.join(self._channels.keys()) + raise InvalidChannelName(name, self._target, channels) + + def get_channel_path(self, name: str) -> str: + """Returns the channel path. Raises InvalidChannelType if this + is not a pty channel. + + """ + + try: + if self._channels[name].type != 'pty': + raise InvalidChannelType(self._channels[name].type) + return self._channels[name].path + except KeyError: + raise InvalidChannelName(name, self._target, self._channels.keys()) + + def get_channel_addr(self, name: str) -> tuple: + """Returns a pair of (host, port) for the channel. Raises + InvalidChannelType if this is not a tcp channel. + + """ + + try: + if self._channels[name].type != 'tcp': + raise InvalidChannelType(self._channels[name].type) + return (self._channels[name].host, self._channels[name].port) + except KeyError: + raise InvalidChannelName(name, self._target, self._channels.keys()) + + def get_channel_stream( + self, + name: str, + timeout: Optional[float] = None, + ) -> io.RawIOBase: + """Returns a file object for a given host exposed device. + + If timeout is None than reads and writes are blocking. If + timeout is zero the stream is operating in non-blocking + mode. Otherwise read and write will timeout after the given + value. + + """ + + chan_type = self.get_channel_type(name) + if chan_type == 'tcp': + host, port = self.get_channel_addr(name) + if ':' in host: + sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) + else: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((host, port)) + sock.settimeout(timeout) + ret = sock.makefile('rwb', buffering=0) + sock.close() + return ret + if chan_type == 'pty': + ser = Serial(self.get_channel_path(name)) + ser.timeout = timeout + return ser + raise InvalidChannelType(chan_type) + + def get_channels(self) -> List[str]: + return self._handles.channels.keys() + + def stop(self) -> None: + """Stop the emulator.""" + + _stop_processes(self._handles, self._wdir) + + try: + os.unlink(os.path.join(self._wdir, 'handles.json')) + except OSError: + pass + + def proc_running(self, proc: str) -> bool: + try: + return psutil.pid_exists(self._handles.procs[proc].pid) + except (NotRunning, KeyError): + return False + + def running(self) -> bool: + """Check if the main emulator process is already running.""" + + try: + return psutil.pid_exists(self._handles.procs[self._handles.emu].pid) + except (NotRunning, KeyError): + return False + + @abstractmethod + def reset(self) -> None: + """Perform a software reset.""" + + @abstractmethod + def cont(self) -> None: + """Resume the emulator's execution.""" + + @abstractmethod + def list_properties(self, path: str) -> List[Any]: + """Returns the property list for an emulator object.""" + + @abstractmethod + def set_property(self, path: str, prop: str, value: str) -> None: + """Sets the value of an emulator's object property.""" + + @abstractmethod + def get_property(self, path: str, prop: str) -> Any: + """Returns the value of an emulator's object property.""" + + +class Launcher(ABC): + """Starts an emulator based on the target and configuration file.""" + + def __init__( + self, + emu: str, + config_path: Optional[Path] = None, + ) -> None: + """Initializes a Launcher instance.""" + + self._wdir: Optional[Path] = None + """Working directory""" + + self._emu = emu + """Emulator type (e.g. "qemu", "renode").""" + + self._target: Optional[str] = None + """Target, initialized to None and set with _prep_start.""" + + self._config = Config(config_path, emu=emu) + """Global, emulator and target configuration.""" + + self._handles = Handles(self._emu, str(self._config.path)) + """Handles for processes, channels, etc.""" + + gdb_cmd = self._config.get(['gdb'], entry_type=list) + if gdb_cmd: + self._handles.set_gdb_cmd(gdb_cmd) + + @staticmethod + def get(emu: str, config_path: Optional[Path] = None) -> Any: + """Returns a launcher for a given emulator type.""" + config = Config(config_path) + try: + name = config.get(['emulators', emu, 'launcher']) + cls = _get_class(name) + except (ConfigError, ImportError): + raise InvalidEmulator(str(emu)) + return cls(config_path) + + @abstractmethod + def _pre_start( + self, + target: str, + file: Optional[Path] = None, + pause: bool = False, + debug: bool = False, + args: Optional[str] = None, + ) -> List[str]: + """Pre start work, returns command to start the emulator. + + The target and emulator configuration can be accessed through + :py:attr:`pw_emu.core.Launcher._config` with + :py:meth:`pw_emu.core.Config.get`, + :py:meth:`pw_emu.core.Config.get_target`, + :py:meth:`pw_emu.core.Config.get_emu`, + :py:meth:`pw_emu.core.Config.get_target_emu`. + """ + + @abstractmethod + def _post_start(self) -> None: + """Post start work, finalize emulator handles. + + Perform any post start emulator initialization and finalize the emulator + handles information. + + Typically an internal monitor channel is used to inquire information + about the the configured channels (e.g. TCP ports, pty paths) and + :py:attr:`pw_emu.core.Launcher._handles` is updated via + :py:meth:`pw_emu.core.Handles.add_channel_tcp`, + :py:meth:`pw_emu.core.Handles.add_channel_pty`, etc. + + """ + + @abstractmethod + def _get_connector(self, wdir: Path) -> Connector: + """Get a connector for this emulator type.""" + + def _path(self, name: Union[Path, str]) -> Path: + """Returns the full path for a given emulator file.""" + if self._wdir is None: + raise Error('internal error') + return Path(os.path.join(self._wdir, name)) + + def _subst_channel(self, subst_type: str, arg: str, string: str) -> str: + """Substitutes $pw_emu_channel_{func}{arg} statements.""" + + try: + chan = self._handles.channels[arg] + except KeyError: + return string + + if subst_type == 'channel_port': + if not isinstance(chan, Handles.TcpChannel): + return string + return str(chan.port) + + if subst_type == 'channel_host': + if not isinstance(chan, Handles.TcpChannel): + return string + return chan.host + + if subst_type == 'channel_path': + if not isinstance(chan, Handles.PtyChannel): + return string + return chan.path + + return string + + def _subst(self, string: str) -> str: + """Substitutes $pw_emu_<subst_type>{arg} statements.""" + + match = re.search(r'\$pw_emu_([^{]+){([^}]+)}', string) + if not match: + return string + + subst_type = match.group(1) + arg = match.group(2) + + if subst_type == 'wdir': + if self._wdir: + return os.path.join(self._wdir, arg) + return string + + if 'channel_' in subst_type: + return self._subst_channel(subst_type, arg, string) + + return string + + # pylint: disable=protected-access + # use os._exit after fork instead of os.exit + def _daemonize( + self, + name: str, + cmd: List[str], + ) -> None: + """Daemonize process for UNIX hosts.""" + + if sys.platform == 'win32': + raise Error('_daemonize not supported on win32') + + # pylint: disable=no-member + # avoid pylint false positive on win32 + pid = os.fork() + if pid < 0: + raise RunError(name, f'fork failed: {pid}') + if pid > 0: + return + + path: Path = Path('/dev/null') + fd = os.open(path, os.O_RDONLY) + os.dup2(fd, sys.stdin.fileno()) + os.close(fd) + + path = self._path(f'{name}.log') + fd = os.open(path, os.O_WRONLY | os.O_TRUNC | os.O_CREAT) + os.dup2(fd, sys.stdout.fileno()) + os.dup2(fd, sys.stderr.fileno()) + os.close(fd) + + os.setsid() + + if os.fork() > 0: + os._exit(0) + + try: + # Make the pid file create and pid write operations atomic to avoid + # races with readers. + with open(self._path(f'{name}.pid.tmp'), 'w') as file: + file.write(f'{os.getpid()}') + os.rename(self._path(f'{name}.pid.tmp'), self._path(f'{name}.pid')) + os.execvp(cmd[0], cmd) + finally: + os._exit(1) + + def _start_proc( + self, + name: str, + cmd: List[str], + foreground: bool = False, + ) -> Union[subprocess.Popen, None]: + """Run the main emulator process. + + The process pid is stored and can later be accessed by its name to + terminate it when the emulator is stopped. + + If foreground is True the process run in the foreground and a + subprocess.Popen object is returned. Otherwise the process is started in + the background and None is returned. + + When running in the background stdin is redirected to the NULL device + and stdout and stderr are redirected to a file named <name>.log which is + stored in the emulator's instance working directory. + + """ + for idx, item in enumerate(cmd): + cmd[idx] = self._subst(item) + + pid_file_path = self._path(f'{name}.pid') + if os.path.exists(pid_file_path): + os.unlink(pid_file_path) + + if foreground: + proc = subprocess.Popen(cmd) + self._handles.add_proc(name, proc.pid) + with open(pid_file_path, 'w') as file: + file.write(f'{proc.pid}') + return proc + + if sys.platform == 'win32': + file = open(self._path(f'{name}.log'), 'w') + proc = subprocess.Popen( + cmd, + stdin=subprocess.DEVNULL, + stdout=file, + stderr=file, + creationflags=subprocess.DETACHED_PROCESS, + ) + file.close() + with open(pid_file_path, 'w') as file: + file.write(f'{proc.pid}') + self._handles.add_proc(name, proc.pid) + # avoids resource warnings due to not calling wait which + # we don't want to do since we've started the process in + # the background + proc.returncode = 0 + else: + self._daemonize(name, cmd) + + # wait for the pid file to avoid double start race conditions + timeout = time.monotonic() + 30 + while not os.path.exists(self._path(f'{name}.pid')): + time.sleep(0.1) + if time.monotonic() > timeout: + break + if not os.path.exists(self._path(f'{name}.pid')): + raise RunError(name, 'pid file timeout') + try: + with open(pid_file_path, 'r') as file: + pid = int(file.readline()) + self._handles.add_proc(name, pid) + except (OSError, ValueError) as err: + raise RunError(name, str(err)) + + return None + + def _stop_procs(self): + """Stop all registered processes.""" + + for name, proc in self._handles.procs.items(): + _stop_process(proc.pid) + if os.path.exists(self._path(f'{name}.pid')): + os.unlink(self._path(f'{name}.pid')) + + def _start_procs(self, procs_list: str) -> None: + """Start additional processes besides the main emulator one.""" + + procs = self._config.get_target([procs_list], entry_type=dict) + for name, cmd in procs.items(): + self._start_proc(name, cmd) + + def start( + self, + wdir: Path, + target: str, + file: Optional[Path] = None, + pause: bool = False, + debug: bool = False, + foreground: bool = False, + args: Optional[str] = None, + ) -> Connector: + """Start the emulator for the given target. + + If file is set that the emulator will load the file before starting. + + If pause is True the emulator is paused. + + If debug is True the emulator is run in foreground with debug output + enabled. This is useful for seeing errors, traces, etc. + + If foreground is True the emulator is run in foreground otherwise it is + started in daemon mode. This is useful when there is another process + controlling the emulator's life cycle (e.g. cuttlefish) + + args are passed directly to the emulator + + """ + + try: + handles = Handles.load(wdir) + if psutil.pid_exists(handles.procs[handles.emu].pid): + raise AlreadyRunning(wdir) + except NotRunning: + pass + + self._wdir = wdir + self._target = target + self._config.set_target(target) + self._handles.set_target(target) + gdb_cmd = self._config.get_target(['gdb'], entry_type=list) + if gdb_cmd: + self._handles.set_gdb_cmd(gdb_cmd) + os.makedirs(wdir, mode=0o700, exist_ok=True) + + cmd = self._pre_start( + target=target, file=file, pause=pause, debug=debug, args=args + ) + + if debug: + foreground = True + _LAUNCHER_LOG.setLevel(logging.DEBUG) + + _LAUNCHER_LOG.debug('starting emulator with command: %s', ' '.join(cmd)) + + try: + self._start_procs('pre-start-cmds') + proc = self._start_proc(self._emu, cmd, foreground) + self._start_procs('post-start-cmds') + except RunError as err: + self._stop_procs() + raise err + + self._post_start() + self._handles.save(wdir) + + if proc: + proc.wait() + self._stop_procs() + + return self._get_connector(self._wdir) |