6from recipe_engine
import recipe_api
11"""SSH flavor, used for running code on a remote device via SSH.
13Must be subclassed to set self.device_dirs. The default implementation assumes
21 super(SSHFlavor, self).
__init__(m, app_name)
27 path =
'/tmp/ssh_machine.json'
28 ssh_info = self.
m.file.read_json(
'read ssh_machine.json', path,
29 test_data={
'user_ip':
'foo@127.0.0.1'})
30 self.
_user_ip = ssh_info.get(
u'user_ip')
33 def ssh(self, title, *cmd, **kwargs):
34 if 'infra_step' not in kwargs:
35 kwargs[
'infra_step'] =
True
37 ssh_cmd = [
'ssh',
'-oConnectTimeout=15',
'-oBatchMode=yes',
40 return self.
_run(title, ssh_cmd, **kwargs)
43 self.
ssh(
'mkdir %s' % path,
'mkdir',
'-p', path)
56 self.
ssh(
'rm %s' % path,
'rm',
'-rf', path)
60 rv = self.
ssh(
'read %s' % path,
61 'cat', path, stdout=self.
m.raw_io.output(),
63 return rv.stdout.decode(
'utf-8').rstrip()
if rv
and rv.stdout
else None
67 self.
ssh(
'rm %s' % path,
'rm',
'-f', path)
74 self.
_run(
'scp %s %s' % (host_path, device_path),
75 [
'scp', host_path, device_path], infra_step=
True)
83 def step(self, name, cmd, **kwargs):
86 self.
ssh(str(name), *cmd)
copy_file_to_device(self, host_path, device_path)
_run(self, title, cmd, infra_step=False, **kwargs)
device_path_join(self, *args)
create_clean_device_dir(self, path)
ensure_device_dir(self, path)
read_file_on_device(self, path, **kwargs)
copy_file_to_device(self, host_path, device_path)
scp_device_path(self, device_path)
create_clean_device_dir(self, path)
__init__(self, m, app_name)
ssh(self, title, *cmd, **kwargs)
step(self, name, cmd, **kwargs)