summaryrefslogtreecommitdiff
path: root/lib/operation_unittest.py
blob: ea9e184afc5e9bea623395a0db6fc846e011dd33 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Unittest for operation"""

from __future__ import print_function

import multiprocessing
import os
import sys

from chromite.lib import cros_logging as logging
from chromite.lib import cros_test_lib
from chromite.lib import operation
from chromite.lib import osutils
from chromite.lib import parallel
from chromite.lib import workspace_lib


class TestWrapperProgressBarOperation(operation.ProgressBarOperation):
  """Inherit from operation.ProgressBarOperation for testing."""
  def ParseOutput(self, output=None):
    print("Calling ParseOutput")
    print(self._stdout.read())


class FakeParallelEmergeOperation(operation.ParallelEmergeOperation):
  """Fake for operation.ParallelEmergeOperation."""
  def __init__(self, queue):
    super(FakeParallelEmergeOperation, self).__init__()
    self._queue = queue

  def ParseOutput(self, output=None):
    super(FakeParallelEmergeOperation, self).ParseOutput()
    self._queue.put('advance')


class FakeException(Exception):
  """Fake exception used for testing exception handling."""


class ProgressBarOperationTest(cros_test_lib.WorkspaceTestCase,
                               cros_test_lib.OutputTestCase,
                               cros_test_lib.LoggingTestCase):
  """Test the Progress Bar Operation class."""
  # pylint: disable=protected-access

  def setUp(self):
    terminal_width = 20
    self._terminal = self.PatchObject(
        operation.ProgressBarOperation, '_GetTerminalSize',
        return_value=operation._TerminalSize(100, terminal_width))
    self.PatchObject(os, 'isatty', return_value=True)

  def _GetStdoutPath(self):
    """Return path to the file where stdout is captured."""
    return os.path.join(self.workspace_path, workspace_lib.WORKSPACE_LOGS_DIR,
                        operation.STDOUT_FILE)

  def _GetStderrPath(self):
    """Return path to the file where stderr is captured."""
    return os.path.join(self.workspace_path, workspace_lib.WORKSPACE_LOGS_DIR,
                        operation.STDERR_FILE)

  def _VerifyProgressBar(self, width, percent, expected_shaded,
                         expected_unshaded):
    """Helper to test progress bar with different percentages and lengths."""
    terminal_width = width + (
        operation.ProgressBarOperation._PROGRESS_BAR_BORDER_SIZE)
    self._terminal.return_value = operation._TerminalSize(100, terminal_width)
    op = operation.ProgressBarOperation()
    with self.OutputCapturer() as output:
      op.ProgressBar(percent)
    stdout = output.GetStdout()

    #Check that the shaded and unshaded regions are the expected size.
    self.assertEqual(stdout.count('#'), expected_shaded)
    self.assertEqual(stdout.count('-'), expected_unshaded)

  def testProgressBar(self):
    """Test progress bar at different percentages."""
    self._VerifyProgressBar(10, 0.7, 7, 3)
    self._VerifyProgressBar(10, 0, 0, 10)
    self._VerifyProgressBar(10, 1, 10, 0)
    self._VerifyProgressBar(1, 0.9, 0, 1)
    # If width of progress bar is less than _PROGRESS_BAR_BORDER_SIZE, the width
    # defaults to 1.
    self._VerifyProgressBar(-5, 0, 0, 1)
    self._VerifyProgressBar(-5, 1, 1, 0)

  def testWaitUntilComplete(self):
    """Test WaitUntilComplete returns False if background task isn't complete.

    As the background task is not started in this test, we expect it not to
    complete.
    """
    op = operation.ProgressBarOperation()
    self.assertFalse(op.WaitUntilComplete(0))

  def testCaptureOutputInBackground(self):
    """Test CaptureOutputInBackground puts finished in reasonable time."""
    def func():
      print('hi')

    op = operation.ProgressBarOperation()
    op.CaptureOutputInBackground(func)

    # This function should really finish in < 1 sec. However, we wait for a
    # longer time so the test does not fail on highly loaded builders.
    self.assertTrue(op.WaitUntilComplete(10))

  def testRun(self):
    """Test that ParseOutput is called and foo is run in background."""
    expected_output = 'hi'
    def func():
      print(expected_output)

    op = TestWrapperProgressBarOperation()
    with self.OutputCapturer():
      op.Run(func, update_period=0.05)

    # Check that foo is executed and its output is captured.
    self.AssertOutputContainsLine(expected_output)
    # Check that ParseOutput is executed at least once. It can be called twice:
    #   Once in the while loop.
    #   Once after the while loop.
    #   However, it is possible for func to execute and finish before the while
    #   statement is executed even once in which case ParseOutput would only be
    #   called once.
    self.AssertOutputContainsLine('Calling ParseOutput')

  def testExceptionHandlingNotInWorkspace(self):
    """Test exception handling if not in a workspace."""
    def func():
      print('foo')
      print('bar', file=sys.stderr)
      raise FakeException()

    op = TestWrapperProgressBarOperation()
    with self.OutputCapturer():
      try:
        with cros_test_lib.LoggingCapturer() as logs:
          op.Run(func)
      except parallel.BackgroundFailure:
        pass

    # Check that the output was dumped correctly.
    self.AssertLogsContain(logs, 'Something went wrong.')
    self.AssertOutputContainsLine('Captured stdout was')
    self.AssertOutputContainsLine('Captured stderr was')
    self.AssertOutputContainsLine('foo')
    self.AssertOutputContainsLine('bar', check_stderr=True)

  def testExceptionHandlingInWorkspace(self):
    """Test that stdout/stderr files are moved correctly if in a workspace."""
    def func():
      print('foo')
      print('bar', file=sys.stderr)
      raise FakeException()

    self.CreateWorkspace()
    op = TestWrapperProgressBarOperation()
    stdout_file = self._GetStdoutPath()
    stderr_file = self._GetStderrPath()

    # Check that the files don't exist before the operation is called.
    self.assertNotExists(stdout_file)
    self.assertNotExists(stderr_file)

    try:
      with cros_test_lib.LoggingCapturer() as logs:
        op.Run(func)
    except parallel.BackgroundFailure as e:
      if not e.HasFailureType(FakeException):
        raise e

    # Check that the files have been moved to the right location.
    self.assertExists(stdout_file)
    self.assertExists(stderr_file)

    # Check that the log message contains the path.
    self.AssertLogsContain(logs, self.workspace_path)

  def testExceptionHandlingInWorkspaceFilesAlreadyExist(self):
    """Test that old stdout/stderr files are removed from log directory."""
    def func():
      print('foo')
      print('bar', file=sys.stderr)
      raise FakeException()

    self.CreateWorkspace()
    op = TestWrapperProgressBarOperation()
    stdout_file = self._GetStdoutPath()
    stderr_file = self._GetStderrPath()
    osutils.Touch(stdout_file, makedirs=True)
    osutils.Touch(stderr_file, makedirs=True)

    # Assert that the files are empty.
    self.assertEqual(osutils.ReadFile(stdout_file), '')
    self.assertEqual(osutils.ReadFile(stderr_file), '')

    try:
      op.Run(func)
    except parallel.BackgroundFailure as e:
      if not e.HasFailureType(FakeException):
        raise e

    # Check that the files contain the right information.
    self.assertIn('foo', osutils.ReadFile(stdout_file))
    self.assertIn('bar', osutils.ReadFile(stderr_file))

  def testLogLevel(self):
    """Test that the log level of the function running is set correctly."""
    func_log_level = logging.DEBUG
    test_log_level = logging.NOTICE
    expected_output = 'hi'
    def func():
      if logging.getLogger().getEffectiveLevel() == func_log_level:
        print(expected_output)

    logging.getLogger().setLevel(test_log_level)
    op = TestWrapperProgressBarOperation()
    with self.OutputCapturer():
      op.Run(func, update_period=0.05, log_level=func_log_level)

    # Check that OutputCapturer contains the expected output. This means that
    # the log level was changed.
    self.AssertOutputContainsLine(expected_output)
    # Check that the log level was restored after the function executed.
    self.assertEqual(logging.getLogger().getEffectiveLevel(), test_log_level)

  def testParallelEmergeOperationParseOutputTotalNotFound(self):
    """Test that ParallelEmergeOperation.ParseOutput if total is not set."""
    def func():
      print('hi')

    op = operation.ParallelEmergeOperation()
    with self.OutputCapturer():
      op.Run(func)

    # Check that the output is empty.
    self.AssertOutputContainsLine('hi', check_stderr=True, invert=True)

  def testParallelEmergeOperationParseOutputTotalIsZero(self):
    """Test that ParallelEmergeOperation.ParseOutput if total is zero."""
    def func():
      print('Total: 0 packages.')

    op = operation.ParallelEmergeOperation()
    with self.OutputCapturer():
      with cros_test_lib.LoggingCapturer() as logs:
        op.Run(func)

    # Check that no progress bar is printed.
    self.AssertOutputContainsLine('%', check_stderr=True, invert=True)
    # Check logs contain message.
    self.AssertLogsContain(logs, 'No packages to build.')

  def testParallelEmergeOperationParseOutputTotalNonZero(self):
    """Test that ParallelEmergeOperation.ParseOutput's progress bar updates."""
    def func(queue):
      print('Total: 2 packages.')
      for _ in xrange(2):
        queue.get()
        print('Completed ')

    queue = multiprocessing.Queue()
    op = FakeParallelEmergeOperation(queue)
    with self.OutputCapturer():
      op.Run(func, queue, update_period=0.005)

    # Check that progress bar prints correctly at 0%, 50%, and 100%.
    self.AssertOutputContainsLine('0%')
    self.AssertOutputContainsLine('50%')
    self.AssertOutputContainsLine('100%')