summaryrefslogtreecommitdiff
path: root/lib/paygen/gslock_unittest.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/paygen/gslock_unittest.py')
-rw-r--r--lib/paygen/gslock_unittest.py178
1 files changed, 85 insertions, 93 deletions
diff --git a/lib/paygen/gslock_unittest.py b/lib/paygen/gslock_unittest.py
index 70a60a638..19347e60f 100644
--- a/lib/paygen/gslock_unittest.py
+++ b/lib/paygen/gslock_unittest.py
@@ -7,8 +7,6 @@
from __future__ import print_function
import multiprocessing
-import os
-import socket
from chromite.lib import cros_build_lib
from chromite.lib import cros_test_lib
@@ -108,42 +106,33 @@ def _InProcessDataUpdate(lock_uri_data_uri):
class GSLockTest(cros_test_lib.MockTestCase):
"""This test suite covers the GSLock file."""
+ # For contention tests, how many parallel workers to spawn. To really
+ # stress test, you can bump it up to 200, but 20 seems to provide good
+ # coverage w/out sucking up too many resources.
+ NUM_THREADS = 20
+
@cros_test_lib.NetworkTest()
def setUp(self):
self.ctx = gs.GSContext()
- # Use the unique id to make sure the tests can be run multiple places.
- unique_id = '%s.%d' % (socket.gethostname(), os.getpid())
-
- self.lock_uri = 'gs://chromeos-releases-test/test-%s-gslock' % unique_id
- self.data_uri = 'gs://chromeos-releases-test/test-%s-data' % unique_id
-
- # Clear out any flags left from previous failure
- self.ctx.Remove(self.lock_uri, ignore_missing=True)
- self.ctx.Remove(self.data_uri, ignore_missing=True)
-
- @cros_test_lib.NetworkTest()
- def tearDown(self):
- self.assertFalse(self.ctx.Exists(self.lock_uri))
- self.assertFalse(self.ctx.Exists(self.data_uri))
-
@cros_test_lib.NetworkTest()
def testLock(self):
"""Test getting a lock."""
# Force a known host name.
self.PatchObject(cros_build_lib, 'MachineDetails', return_value='TestHost')
- lock = gslock.Lock(self.lock_uri)
+ with gs.TemporaryURL('gslock') as lock_uri:
+ lock = gslock.Lock(lock_uri)
- self.assertFalse(self.ctx.Exists(self.lock_uri))
- lock.Acquire()
- self.assertTrue(self.ctx.Exists(self.lock_uri))
+ self.assertFalse(self.ctx.Exists(lock_uri))
+ lock.Acquire()
+ self.assertTrue(self.ctx.Exists(lock_uri))
- contents = self.ctx.Cat(self.lock_uri)
- self.assertEqual(contents, 'TestHost')
+ contents = self.ctx.Cat(lock_uri)
+ self.assertEqual(contents, 'TestHost')
- lock.Release()
- self.assertFalse(self.ctx.Exists(self.lock_uri))
+ lock.Release()
+ self.assertFalse(self.ctx.Exists(lock_uri))
@cros_test_lib.NetworkTest()
def testLockRepetition(self):
@@ -151,117 +140,120 @@ class GSLockTest(cros_test_lib.MockTestCase):
# Force a known host name.
self.PatchObject(cros_build_lib, 'MachineDetails', return_value='TestHost')
- lock = gslock.Lock(self.lock_uri)
+ with gs.TemporaryURL('gslock') as lock_uri:
+ lock = gslock.Lock(lock_uri)
- self.assertFalse(self.ctx.Exists(self.lock_uri))
- lock.Acquire()
- self.assertTrue(self.ctx.Exists(self.lock_uri))
+ self.assertFalse(self.ctx.Exists(lock_uri))
+ lock.Acquire()
+ self.assertTrue(self.ctx.Exists(lock_uri))
- lock.Acquire()
- self.assertTrue(self.ctx.Exists(self.lock_uri))
+ lock.Acquire()
+ self.assertTrue(self.ctx.Exists(lock_uri))
- lock.Release()
- self.assertFalse(self.ctx.Exists(self.lock_uri))
+ lock.Release()
+ self.assertFalse(self.ctx.Exists(lock_uri))
- lock.Acquire()
- self.assertTrue(self.ctx.Exists(self.lock_uri))
+ lock.Acquire()
+ self.assertTrue(self.ctx.Exists(lock_uri))
- lock.Release()
- self.assertFalse(self.ctx.Exists(self.lock_uri))
+ lock.Release()
+ self.assertFalse(self.ctx.Exists(lock_uri))
@cros_test_lib.NetworkTest()
def testLockConflict(self):
"""Test lock conflict."""
+ with gs.TemporaryURL('gslock') as lock_uri:
+ lock1 = gslock.Lock(lock_uri)
+ lock2 = gslock.Lock(lock_uri)
- lock1 = gslock.Lock(self.lock_uri)
- lock2 = gslock.Lock(self.lock_uri)
+ # Manually lock 1, and ensure lock2 can't lock.
+ lock1.Acquire()
+ self.assertRaises(gslock.LockNotAcquired, lock2.Acquire)
+ lock1.Release()
- # Manually lock 1, and ensure lock2 can't lock.
- lock1.Acquire()
- self.assertRaises(gslock.LockNotAcquired, lock2.Acquire)
- lock1.Release()
+ # Use a with clause on 2, and ensure 1 can't lock.
+ with lock2:
+ self.assertRaises(gslock.LockNotAcquired, lock1.Acquire)
- # Use a with clause on 2, and ensure 1 can't lock.
- with lock2:
- self.assertRaises(gslock.LockNotAcquired, lock1.Acquire)
+ # Ensure we can renew a given lock.
+ lock1.Acquire()
+ lock1.Renew()
+ lock1.Release()
- # Ensure we can renew a given lock.
- lock1.Acquire()
- lock1.Renew()
- lock1.Release()
-
- # Ensure we get an error renewing a lock we don't hold.
- self.assertRaises(gslock.LockNotAcquired, lock1.Renew)
+ # Ensure we get an error renewing a lock we don't hold.
+ self.assertRaises(gslock.LockNotAcquired, lock1.Renew)
@cros_test_lib.NetworkTest()
def testLockTimeout(self):
"""Test getting a lock when an old timed out one is present."""
+ with gs.TemporaryURL('gslock') as lock_uri:
+ # Both locks are always timed out.
+ lock1 = gslock.Lock(lock_uri, lock_timeout_mins=-1)
+ lock2 = gslock.Lock(lock_uri, lock_timeout_mins=-1)
- # Both locks are always timed out.
- lock1 = gslock.Lock(self.lock_uri, lock_timeout_mins=-1)
- lock2 = gslock.Lock(self.lock_uri, lock_timeout_mins=-1)
-
- lock1.Acquire()
- lock2.Acquire()
-
- self.ctx.Remove(self.lock_uri)
+ lock1.Acquire()
+ lock2.Acquire()
@cros_test_lib.NetworkTest()
def testRaceToAcquire(self):
"""Have lots of processes race to acquire the same lock."""
- count = 20
+ count = self.NUM_THREADS
pool = multiprocessing.Pool(processes=count)
- results = pool.map(_InProcessAcquire, [self.lock_uri] * count)
+ with gs.TemporaryURL('gslock') as lock_uri:
+ results = pool.map(_InProcessAcquire, [lock_uri] * count)
- # Clean up the lock since the processes explicitly only acquire.
- self.ctx.Remove(self.lock_uri)
+ # Clean up the lock since the processes explicitly only acquire.
+ self.ctx.Remove(lock_uri)
- # Ensure that only one of them got the lock.
- self.assertEqual(results.count(True), 1)
+ # Ensure that only one of them got the lock.
+ self.assertEqual(results.count(True), 1)
@cros_test_lib.NetworkTest()
def testRaceToDoubleAcquire(self):
"""Have lots of processes race to double acquire the same lock."""
- count = 20
+ count = self.NUM_THREADS
pool = multiprocessing.Pool(processes=count)
- results = pool.map(_InProcessDoubleAcquire, [self.lock_uri] * count)
+ with gs.TemporaryURL('gslock') as lock_uri:
+ results = pool.map(_InProcessDoubleAcquire, [lock_uri] * count)
- # Clean up the lock sinc the processes explicitly only acquire.
- self.ctx.Remove(self.lock_uri)
+ # Clean up the lock sinc the processes explicitly only acquire.
+ self.ctx.Remove(lock_uri)
- # Ensure that only one of them got the lock (and got it twice).
- self.assertEqual(results.count(0), count - 1)
- self.assertEqual(results.count(2), 1)
+ # Ensure that only one of them got the lock (and got it twice).
+ self.assertEqual(results.count(0), count - 1)
+ self.assertEqual(results.count(2), 1)
@cros_test_lib.NetworkTest()
def testMultiProcessDataUpdate(self):
"""Have lots of processes update a GS file proctected by a lock."""
- count = 20 # To really stress, bump up to 200.
+ count = self.NUM_THREADS
pool = multiprocessing.Pool(processes=count)
- results = pool.map(_InProcessDataUpdate,
- [(self.lock_uri, self.data_uri)] * count)
-
- self.assertEqual(self.ctx.Cat(self.data_uri), str(count))
+ with gs.TemporaryURL('gslock') as lock_uri:
+ data_uri = lock_uri + '.data'
+ results = pool.map(_InProcessDataUpdate,
+ [(lock_uri, data_uri)] * count)
- # Ensure that all report success
- self.assertEqual(results.count(True), count)
+ self.assertEqual(self.ctx.Cat(data_uri), str(count))
- # Clean up the data file.
- self.ctx.Remove(self.data_uri)
+ # Ensure that all report success
+ self.assertEqual(results.count(True), count)
@cros_test_lib.NetworkTest()
def testDryrunLock(self):
"""Ensure that lcok can be obtained and released in dry-run mode."""
- lock = gslock.Lock(self.lock_uri, dry_run=True)
- self.assertIsNone(lock.Acquire())
- self.assertFalse(self.ctx.Exists(self.lock_uri))
- self.assertIsNone(lock.Release())
+ with gs.TemporaryURL('gslock') as lock_uri:
+ lock = gslock.Lock(lock_uri, dry_run=True)
+ self.assertIsNone(lock.Acquire())
+ self.assertFalse(self.ctx.Exists(lock_uri))
+ self.assertIsNone(lock.Release())
+ @cros_test_lib.NetworkTest()
def testDryrunLockRepetition(self):
"""Test aquiring same lock multiple times in dry-run mode."""
- lock = gslock.Lock(self.lock_uri, dry_run=True)
- self.assertIsNone(lock.Acquire())
- self.assertIsNone(lock.Acquire())
- self.assertIsNone(lock.Release())
- self.assertIsNone(lock.Acquire())
- self.assertIsNone(lock.Release())
+ with gs.TemporaryURL('gslock') as lock_uri:
+ lock = gslock.Lock(lock_uri, dry_run=True)
+ self.assertIsNone(lock.Acquire())
+ self.assertIsNone(lock.Acquire())
+ self.assertIsNone(lock.Release())
+ self.assertIsNone(lock.Acquire())
+ self.assertIsNone(lock.Release())