Source code for devlib.host

#    Copyright 2015-2024 ARM Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import socket
import select
import signal
import shutil
import subprocess
import logging
import sys
from getpass import getpass
from shlex import quote
import threading

from devlib.exception import (
    TargetStableError, TargetTransientCalledProcessError, TargetStableCalledProcessError
)
from devlib.utils.misc import check_output
from devlib.connection import ConnectionBase, PopenBackgroundCommand
from devlib.utils.network import forward_server, get_free_host_port


if sys.version_info >= (3, 8):
    def copy_tree(src, dst):
        from shutil import copy, copytree
        copytree(
            src,
            dst,
            # dirs_exist_ok=True only exists in Python >= 3.8
            dirs_exist_ok=True,
            # Do not copy creation and modification time to behave like other
            # targets.
            copy_function=copy
        )
else:
    def copy_tree(src, dst):
        from distutils.dir_util import copy_tree
        # Mirror the behavior of all other targets which only copy the
        # content without metadata
        copy_tree(src, dst, preserve_mode=False, preserve_times=False)


PACKAGE_BIN_DIRECTORY = os.path.join(os.path.dirname(__file__), 'bin')


# pylint: disable=redefined-outer-name
def kill_children(pid, signal=signal.SIGKILL):
    with open('/proc/{0}/task/{0}/children'.format(pid), 'r') as fd:
        for cpid in map(int, fd.read().strip().split()):
            kill_children(cpid, signal)
            os.kill(cpid, signal)


[docs] class LocalConnection(ConnectionBase): name = 'local' host = 'localhost' @property def connected_as_root(self): if self._connected_as_root is None: result = self.execute('id', as_root=False) self._connected_as_root = 'uid=0(' in result return self._connected_as_root @connected_as_root.setter def connected_as_root(self, state): self._connected_as_root = state # pylint: disable=unused-argument def __init__(self, platform=None, keep_password=True, unrooted=False, password=None, timeout=None): super().__init__() self._connected_as_root = None self.logger = logging.getLogger('local_connection') self.keep_password = keep_password self.unrooted = unrooted self.password = password def _copy_path(self, source, dest): self.logger.debug('copying {} to {}'.format(source, dest)) if os.path.isdir(source): copy_tree(source, dest) else: shutil.copy(source, dest) def _copy_paths(self, sources, dest): for source in sources: self._copy_path(source, dest) def push(self, sources, dest, timeout=None, as_root=False): # pylint: disable=unused-argument self._copy_paths(sources, dest) def pull(self, sources, dest, timeout=None, as_root=False): # pylint: disable=unused-argument self._copy_paths(sources, dest) @staticmethod def _write_process_input(popen, inputtext): if inputtext is None: return elif popen.stdin is None: raise RuntimeError("process stdin is not piped") if isinstance(inputtext, str): inputtext = inputtext.encode(sys.stdin.encoding or "utf-8") try: popen.stdin.write(inputtext) popen.stdin.flush() except BrokenPipeError: # Early exit of child process pass finally: try: popen.stdin.close() except BrokenPipeError: pass # pylint: disable=unused-argument def execute(self, command, timeout=None, check_exit_code=True, as_root=False, strip_colors=True, will_succeed=False): self.logger.debug(command) use_sudo = as_root and not self.connected_as_root password_input = None if use_sudo: if self.unrooted: raise TargetStableError('unrooted') password_input = self._get_password() + "\n" # Empty prompt with -p '' to minimize sudo prompt output. command = "exec sudo -k -p '' -S -- sh -c {}".format(quote(command)) ignore = None if check_exit_code else 'all' try: stdout, stderr = check_output( command, shell=True, timeout=timeout, ignore=ignore, inputtext=password_input, ) except subprocess.CalledProcessError as e: cls = TargetTransientCalledProcessError if will_succeed else TargetStableCalledProcessError raise cls( e.returncode, command, e.output, e.stderr, ) # Remove the one-character prompt of sudo -S -p if use_sudo and stderr: stderr = stderr[1:] return stdout + stderr def background(self, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, as_root=False): password_input = None if as_root and not self.connected_as_root: if self.unrooted: raise TargetStableError('unrooted') password_input = self._get_password() + "\n" # Empty prompt with -p '' to avoid adding a leading space to the # output. command = "exec sudo -k -p '' -S -- sh -c {}".format(quote(command)) # Make sure to get a new PGID so PopenBackgroundCommand() can kill # all sub processes that could be started without troubles. def make_init_kwargs(command): popen = subprocess.Popen( command, stdout=stdout, stderr=stderr, stdin=subprocess.PIPE, shell=True, start_new_session=True, ) self._write_process_input(popen, password_input) return dict( popen=popen, ) return PopenBackgroundCommand.from_factory( conn=self, cmd=command, as_root=as_root, make_init_kwargs=make_init_kwargs, ) def _close(self): pass def cancel_running_command(self): pass def wait_for_device(self, timeout=30): return def reboot_bootloader(self, timeout=30): raise NotImplementedError() def _get_password(self): if self.password: return self.password password = getpass('sudo password:') if self.keep_password: self.password = password return password def forward_port(self, target_port, host_port=None): if host_port is None: host_port = get_free_host_port() def handler(client_sock, target_port): try: with client_sock, socket.create_connection( ("localhost", target_port) ) as target_sock: while True: r = select.select([client_sock, target_sock], [], [])[0] if client_sock in r: data = client_sock.recv(1024) if not data: break target_sock.sendall(data) if target_sock in r: data = target_sock.recv(1024) if not data: break client_sock.sendall(data) except Exception as e: self.logger.error(f"Connection closed: {e}") threading.Thread( target=forward_server, args=(host_port, target_port, handler), daemon=True ).start() return host_port