summaryrefslogtreecommitdiff
path: root/cli/cros/cros_payload.py
blob: 9581b597db069c7a2b29d19617cb5e1fef2c346f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""cros payload: Show information about an update payload."""

from __future__ import print_function

import itertools
import os
import sys
import textwrap

from chromite.cbuildbot import constants
from chromite.cli import command

# Needed for the dev.host.lib import below.
sys.path.insert(0, os.path.join(constants.SOURCE_ROOT, 'src', 'platform'))

MAJOR_PAYLOAD_VERSION_CHROMEOS = 1
MAJOR_PAYLOAD_VERSION_BRILLO = 2

def DisplayValue(key, value):
  """Print out a key, value pair with values left-aligned."""
  if value != None:
    print('%-*s %s' % (24, key + ':', value))
  else:
    raise ValueError('Cannot display an empty value.')


def DisplayHexData(data, indent=0):
  """Print out binary data as a hex values."""
  for off in range(0, len(data), 16):
    chunk = data[off:off + 16]
    print(' ' * indent +
          ' '.join('%.2x' % ord(c) for c in chunk) +
          '   ' * (16 - len(chunk)) +
          ' | ' +
          ''.join(c if 32 <= ord(c) and ord(c) < 127 else '.' for c in chunk))


@command.CommandDecorator('payload')
class PayloadCommand(command.CliCommand):
  """Show basic information about an update payload.

  This command parses an update payload and displays information from
  its header and manifest.
  """

  EPILOG = """
Example:
  cros payload show chromeos_6716.0.0_daisy_canary-channel_full_snow-mp-v3.bin
"""

  def __init__(self, options):
    # TODO(alliewood)(chromium:454629) update once update_payload is moved
    # into chromite. google.protobuf may not be available outside the chroot.
    from dev.host.lib import update_payload
    self._update_payload = update_payload

    super(PayloadCommand, self).__init__(options)
    self.payload = None

  @classmethod
  def AddParser(cls, parser):
    super(PayloadCommand, cls).AddParser(parser)
    parser.add_argument(
        'action', choices=['show'],
        help='Show information about an update payload.')
    parser.add_argument(
        'payload_file', type=file,
        help='The payload file that you want information from.')
    parser.add_argument('--list_ops', default=False, action='store_true',
                        help='List the install operations and their extents.')
    parser.add_argument('--stats', default=False, action='store_true',
                        help='Show information about overall input/output.')
    parser.add_argument('--signatures', default=False, action='store_true',
                        help='Show signatures stored in the payload.')

  def _DisplayHeader(self):
    """Show information from the payload header."""
    header = self.payload.header
    DisplayValue('Payload version', header.version)
    DisplayValue('Manifest length', header.manifest_len)

  def _DisplayManifest(self):
    """Show information from the payload manifest."""
    manifest = self.payload.manifest
    if self.payload.header.version == MAJOR_PAYLOAD_VERSION_BRILLO:
      DisplayValue('Number of partitions', len(manifest.partitions))
      for partition in manifest.partitions:
        DisplayValue('  Number of "%s" ops' % partition.partition_name,
                     len(partition.operations))
    else:
      DisplayValue('Number of operations', len(manifest.install_operations))
      DisplayValue('Number of kernel ops',
                   len(manifest.kernel_install_operations))
    DisplayValue('Block size', manifest.block_size)
    DisplayValue('Minor version', manifest.minor_version)

  def _DisplaySignatures(self):
    """Show information about the signatures from the manifest."""
    header = self.payload.header
    if header.metadata_signature_len:
      offset = header.size + header.manifest_len
      DisplayValue('Metadata signatures blob',
                   'file_offset=%d (%d bytes)' %
                   (offset, header.metadata_signature_len))
      signatures_blob = self.payload.ReadDataBlob(
          -header.metadata_signature_len,
          header.metadata_signature_len)
      self._DisplaySignaturesBlob('Metadata', signatures_blob)
    else:
      print('No metadata signatures stored in the payload')

    manifest = self.payload.manifest
    if manifest.HasField('signatures_offset'):
      signature_msg = 'blob_offset=%d' % manifest.signatures_offset
      if manifest.signatures_size:
        signature_msg += ' (%d bytes)' % manifest.signatures_size
      DisplayValue('Payload signatures blob', signature_msg)
      signatures_blob = self.payload.ReadDataBlob(manifest.signatures_offset,
                                                  manifest.signatures_size)
      self._DisplaySignaturesBlob('Payload', signatures_blob)
    else:
      print('No payload signatures stored in the payload')

  @staticmethod
  def _DisplaySignaturesBlob(signature_name, signatures_blob):
    from dev.host.lib.update_payload import update_metadata_pb2
    signatures = update_metadata_pb2.Signatures()
    signatures.ParseFromString(signatures_blob)
    print('%s signatures: (%d entries)' %
          (signature_name, len(signatures.signatures)))
    for signature in signatures.signatures:
      print('  version=%s, hex_data: (%d bytes)' %
            (signature.version if signature.HasField('version') else None,
             len(signature.data)))
      DisplayHexData(signature.data, indent=4)


  def _DisplayOps(self, name, operations):
    """Show information about the install operations from the manifest.

    The list shown includes operation type, data offset, data length, source
    extents, source length, destination extents, and destinations length.

    Args:
      name: The name you want displayed above the operation table.
      operations: The install_operations object that you want to display
                  information about.
    """
    def _DisplayExtents(extents, name):
      num_blocks = sum([ext.num_blocks for ext in extents])
      ext_str = ' '.join(
          '(%s,%s)' % (ext.start_block, ext.num_blocks) for ext in extents)
      # Make extent list wrap around at 80 chars.
      ext_str = '\n      '.join(textwrap.wrap(ext_str, 74))
      extent_plural = 's' if len(extents) > 1 else ''
      block_plural = 's' if num_blocks > 1 else ''
      print('    %s: %d extent%s (%d block%s)' %
            (name, len(extents), extent_plural, num_blocks, block_plural))
      print('      %s' % ext_str)

    op_dict = self._update_payload.common.OpType.NAMES
    print('%s:' % name)
    for op, op_count in itertools.izip(operations, itertools.count()):
      print('  %d: %s' % (op_count, op_dict[op.type]))
      if op.HasField('data_offset'):
        print('    Data offset: %s' % op.data_offset)
      if op.HasField('data_length'):
        print('    Data length: %s' % op.data_length)
      if len(op.src_extents):
        _DisplayExtents(op.src_extents, 'Source')
      if len(op.dst_extents):
        _DisplayExtents(op.dst_extents, 'Destination')

  def _GetStats(self, manifest):
    """Returns various statistics about a payload file.

    Returns a dictionary containing the number of blocks read during payload
    application, the number of blocks written, and the number of seeks done
    when writing during operation application.
    """
    read_blocks = 0
    written_blocks = 0
    num_write_seeks = 0
    if self.payload.header.version == MAJOR_PAYLOAD_VERSION_BRILLO:
      partitions_operations = [part.operations for part in manifest.partitions]
    else:
      partitions_operations = [manifest.install_operations,
                               manifest.kernel_install_operations]
    for operations in partitions_operations:
      last_ext = None
      for curr_op in operations:
        read_blocks += sum([ext.num_blocks for ext in curr_op.src_extents])
        written_blocks += sum([ext.num_blocks for ext in curr_op.dst_extents])
        for curr_ext in curr_op.dst_extents:
          # See if the extent is contiguous with the last extent seen.
          if last_ext and (curr_ext.start_block !=
                           last_ext.start_block + last_ext.num_blocks):
            num_write_seeks += 1
          last_ext = curr_ext

    if manifest.minor_version == 1:
      # Rootfs and kernel are written during the filesystem copy in version 1.
      written_blocks += manifest.old_rootfs_info.size / manifest.block_size
      written_blocks += manifest.old_kernel_info.size / manifest.block_size
    # Old and new rootfs and kernel are read once during verification
    read_blocks += manifest.old_rootfs_info.size / manifest.block_size
    read_blocks += manifest.old_kernel_info.size / manifest.block_size
    read_blocks += manifest.new_rootfs_info.size / manifest.block_size
    read_blocks += manifest.new_kernel_info.size / manifest.block_size
    stats = {'read_blocks': read_blocks,
             'written_blocks': written_blocks,
             'num_write_seeks': num_write_seeks}
    return stats

  def _DisplayStats(self, manifest):
    stats = self._GetStats(manifest)
    DisplayValue('Blocks read', stats['read_blocks'])
    DisplayValue('Blocks written', stats['written_blocks'])
    DisplayValue('Seeks when writing', stats['num_write_seeks'])

  def Run(self):
    """Parse the update payload and display information from it."""
    self.payload = self._update_payload.Payload(self.options.payload_file)
    self.payload.Init()
    self._DisplayHeader()
    self._DisplayManifest()
    if self.options.signatures:
      self._DisplaySignatures()
    if self.options.stats:
      self._DisplayStats(self.payload.manifest)
    if self.options.list_ops:
      print()
      if self.payload.header.version == MAJOR_PAYLOAD_VERSION_BRILLO:
        for partition in self.payload.manifest.partitions:
          self._DisplayOps('%s install operations' % partition.partition_name,
                           partition.operations)
      else:
        self._DisplayOps('Install operations',
                         self.payload.manifest.install_operations)
        self._DisplayOps('Kernel install operations',
                         self.payload.manifest.kernel_install_operations)