diff options
Diffstat (limited to 'pw_tokenizer/py/tokens_test.py')
-rwxr-xr-x | pw_tokenizer/py/tokens_test.py | 668 |
1 files changed, 470 insertions, 198 deletions
diff --git a/pw_tokenizer/py/tokens_test.py b/pw_tokenizer/py/tokens_test.py index c20576276..ee3431b07 100755 --- a/pw_tokenizer/py/tokens_test.py +++ b/pw_tokenizer/py/tokens_test.py @@ -14,16 +14,17 @@ # the License. """Tests for the tokens module.""" -import datetime +from datetime import datetime import io import logging from pathlib import Path +import shutil import tempfile from typing import Iterator import unittest from pw_tokenizer import tokens -from pw_tokenizer.tokens import default_hash, _LOG +from pw_tokenizer.tokens import c_hash, DIR_DB_SUFFIX, _LOG CSV_DATABASE = '''\ 00000000,2019-06-10,"" @@ -78,17 +79,80 @@ BINARY_DATABASE = ( b'%x%lld%1.2f%s\x00' b'Jello?\x00' b'%llu\x00' - b'Won\'t fit : %s%d\x00') + b'Won\'t fit : %s%d\x00' +) INVALID_CSV = """\ 1,,"Whoa there!" 2,this is totally invalid,"Whoa there!" 3,,"This one's OK" ,,"Also broken" -5,1845-2-2,"I'm %s fine" +5,1845-02-02,"I'm %s fine" 6,"Missing fields" """ +CSV_DATABASE_2 = '''\ +00000000, ,"" +141c35d5, ,"The answer: ""%s""" +29aef586, ,"1234" +2b78825f, ,"[:-)" +2e668cd6, ,"Jello, world!" +31631781, ,"%d" +61fd1e26, ,"%ld" +68ab92da, ,"%s there are %x (%.2f) of them%c" +7b940e2a, ,"Hello %s! %hd %e" +7da55d52, ,">:-[]" +7f35a9a5, ,"TestName" +851beeb6, ,"%u %d" +881436a0, ,"The answer is: %s" +88808930, ,"%u%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c" +92723f44, ,"???" +a09d6698, ,"won-won-won-wonderful" +aa9ffa66, ,"void pw::tokenizer::{anonymous}::TestName()" +ad002c97, ,"%llx" +b3653e13, ,"Jello!" +cc6d3131, ,"Jello?" +e13b0f94, ,"%llu" +e65aefef, ,"Won't fit : %s%d" +''' + +CSV_DATABASE_3 = """\ +17fa86d3, ,"hello" +18c5017c, ,"yes" +59b2701c, ,"The answer was: %s" +881436a0, ,"The answer is: %s" +d18ada0f, ,"something" +""" + +CSV_DATABASE_4 = '''\ +00000000, ,"" +141c35d5, ,"The answer: ""%s""" +17fa86d3, ,"hello" +18c5017c, ,"yes" +29aef586, ,"1234" +2b78825f, ,"[:-)" +2e668cd6, ,"Jello, world!" +31631781, ,"%d" +59b2701c, ,"The answer was: %s" +61fd1e26, ,"%ld" +68ab92da, ,"%s there are %x (%.2f) of them%c" +7b940e2a, ,"Hello %s! %hd %e" +7da55d52, ,">:-[]" +7f35a9a5, ,"TestName" +851beeb6, ,"%u %d" +881436a0, ,"The answer is: %s" +88808930, ,"%u%d%02x%X%hu%hhd%d%ld%lu%lld%llu%c%c%c" +92723f44, ,"???" +a09d6698, ,"won-won-won-wonderful" +aa9ffa66, ,"void pw::tokenizer::{anonymous}::TestName()" +ad002c97, ,"%llx" +b3653e13, ,"Jello!" +cc6d3131, ,"Jello?" +d18ada0f, ,"something" +e13b0f94, ,"%llu" +e65aefef, ,"Won't fit : %s%d" +''' + def read_db_from_csv(csv_str: str) -> tokens.Database: with io.StringIO(csv_str) as csv_db: @@ -97,31 +161,38 @@ def read_db_from_csv(csv_str: str) -> tokens.Database: def _entries(*strings: str) -> Iterator[tokens.TokenizedStringEntry]: for string in strings: - yield tokens.TokenizedStringEntry(default_hash(string), string) + yield tokens.TokenizedStringEntry(c_hash(string), string) class TokenDatabaseTest(unittest.TestCase): """Tests the token database class.""" - def test_csv(self): + + def test_csv(self) -> None: db = read_db_from_csv(CSV_DATABASE) self.assertEqual(str(db), CSV_DATABASE) db = read_db_from_csv('') self.assertEqual(str(db), '') - def test_csv_formatting(self): + def test_csv_formatting(self) -> None: db = read_db_from_csv('') self.assertEqual(str(db), '') - db = read_db_from_csv('abc123,2048-4-1,Fake string\n') + db = read_db_from_csv('abc123,2048-04-01,Fake string\n') self.assertEqual(str(db), '00abc123,2048-04-01,"Fake string"\n') - db = read_db_from_csv('1,1990-01-01,"Quotes"""\n' - '0,1990-02-01,"Commas,"",,"\n') - self.assertEqual(str(db), ('00000000,1990-02-01,"Commas,"",,"\n' - '00000001,1990-01-01,"Quotes"""\n')) - - def test_bad_csv(self): + db = read_db_from_csv( + '1,1990-01-01,"Quotes"""\n' '0,1990-02-01,"Commas,"",,"\n' + ) + self.assertEqual( + str(db), + ( + '00000000,1990-02-01,"Commas,"",,"\n' + '00000001,1990-01-01,"Quotes"""\n' + ), + ) + + def test_bad_csv(self) -> None: with self.assertLogs(_LOG, logging.ERROR) as logs: db = read_db_from_csv(INVALID_CSV) @@ -135,31 +206,31 @@ class TokenDatabaseTest(unittest.TestCase): self.assertEqual(db.token_to_entries[5][0].string, "I'm %s fine") self.assertFalse(db.token_to_entries[6]) - def test_lookup(self): + def test_lookup(self) -> None: db = read_db_from_csv(CSV_DATABASE) self.assertEqual(db.token_to_entries[0x9999], []) - matches = db.token_to_entries[0x2e668cd6] + matches = db.token_to_entries[0x2E668CD6] self.assertEqual(len(matches), 1) jello = matches[0] - self.assertEqual(jello.token, 0x2e668cd6) + self.assertEqual(jello.token, 0x2E668CD6) self.assertEqual(jello.string, 'Jello, world!') - self.assertEqual(jello.date_removed, datetime.datetime(2019, 6, 11)) + self.assertEqual(jello.date_removed, datetime(2019, 6, 11)) - matches = db.token_to_entries[0xe13b0f94] + matches = db.token_to_entries[0xE13B0F94] self.assertEqual(len(matches), 1) llu = matches[0] - self.assertEqual(llu.token, 0xe13b0f94) + self.assertEqual(llu.token, 0xE13B0F94) self.assertEqual(llu.string, '%llu') self.assertIsNone(llu.date_removed) - answer, = db.token_to_entries[0x141c35d5] + (answer,) = db.token_to_entries[0x141C35D5] self.assertEqual(answer.string, 'The answer: "%s"') - def test_collisions(self): - hash_1 = tokens.pw_tokenizer_65599_hash('o000', 96) - hash_2 = tokens.pw_tokenizer_65599_hash('0Q1Q', 96) + def test_collisions(self) -> None: + hash_1 = tokens.c_hash('o000', 96) + hash_2 = tokens.c_hash('0Q1Q', 96) self.assertEqual(hash_1, hash_2) db = tokens.Database.from_strings(['o000', '0Q1Q']) @@ -167,151 +238,188 @@ class TokenDatabaseTest(unittest.TestCase): self.assertEqual(len(db.token_to_entries[hash_1]), 2) self.assertCountEqual( [entry.string for entry in db.token_to_entries[hash_1]], - ['o000', '0Q1Q']) + ['o000', '0Q1Q'], + ) - def test_purge(self): + def test_purge(self) -> None: db = read_db_from_csv(CSV_DATABASE) original_length = len(db.token_to_entries) self.assertEqual(db.token_to_entries[0][0].string, '') self.assertEqual(db.token_to_entries[0x31631781][0].string, '%d') - self.assertEqual(db.token_to_entries[0x2e668cd6][0].string, - 'Jello, world!') - self.assertEqual(db.token_to_entries[0xb3653e13][0].string, 'Jello!') - self.assertEqual(db.token_to_entries[0xcc6d3131][0].string, 'Jello?') - self.assertEqual(db.token_to_entries[0xe65aefef][0].string, - "Won't fit : %s%d") - - db.purge(datetime.datetime(2019, 6, 11)) + self.assertEqual( + db.token_to_entries[0x2E668CD6][0].string, 'Jello, world!' + ) + self.assertEqual(db.token_to_entries[0xB3653E13][0].string, 'Jello!') + self.assertEqual(db.token_to_entries[0xCC6D3131][0].string, 'Jello?') + self.assertEqual( + db.token_to_entries[0xE65AEFEF][0].string, "Won't fit : %s%d" + ) + + db.purge(datetime(2019, 6, 11)) self.assertLess(len(db.token_to_entries), original_length) self.assertFalse(db.token_to_entries[0]) self.assertEqual(db.token_to_entries[0x31631781][0].string, '%d') - self.assertFalse(db.token_to_entries[0x2e668cd6]) - self.assertEqual(db.token_to_entries[0xb3653e13][0].string, 'Jello!') - self.assertEqual(db.token_to_entries[0xcc6d3131][0].string, 'Jello?') - self.assertFalse(db.token_to_entries[0xe65aefef]) + self.assertFalse(db.token_to_entries[0x2E668CD6]) + self.assertEqual(db.token_to_entries[0xB3653E13][0].string, 'Jello!') + self.assertEqual(db.token_to_entries[0xCC6D3131][0].string, 'Jello?') + self.assertFalse(db.token_to_entries[0xE65AEFEF]) - def test_merge(self): + def test_merge(self) -> None: """Tests the tokens.Database merge method.""" db = tokens.Database() # Test basic merging into an empty database. db.merge( - tokens.Database([ - tokens.TokenizedStringEntry( - 1, 'one', date_removed=datetime.datetime.min), - tokens.TokenizedStringEntry( - 2, 'two', date_removed=datetime.datetime.min), - ])) + tokens.Database( + [ + tokens.TokenizedStringEntry( + 1, 'one', date_removed=datetime.min + ), + tokens.TokenizedStringEntry( + 2, 'two', date_removed=datetime.min + ), + ] + ) + ) self.assertEqual({str(e) for e in db.entries()}, {'one', 'two'}) - self.assertEqual(db.token_to_entries[1][0].date_removed, - datetime.datetime.min) - self.assertEqual(db.token_to_entries[2][0].date_removed, - datetime.datetime.min) + self.assertEqual(db.token_to_entries[1][0].date_removed, datetime.min) + self.assertEqual(db.token_to_entries[2][0].date_removed, datetime.min) # Test merging in an entry with a removal date. db.merge( - tokens.Database([ - tokens.TokenizedStringEntry(3, 'three'), - tokens.TokenizedStringEntry( - 4, 'four', date_removed=datetime.datetime.min), - ])) - self.assertEqual({str(e) - for e in db.entries()}, - {'one', 'two', 'three', 'four'}) + tokens.Database( + [ + tokens.TokenizedStringEntry(3, 'three'), + tokens.TokenizedStringEntry( + 4, 'four', date_removed=datetime.min + ), + ] + ) + ) + self.assertEqual( + {str(e) for e in db.entries()}, {'one', 'two', 'three', 'four'} + ) self.assertIsNone(db.token_to_entries[3][0].date_removed) - self.assertEqual(db.token_to_entries[4][0].date_removed, - datetime.datetime.min) + self.assertEqual(db.token_to_entries[4][0].date_removed, datetime.min) # Test merging in one entry. - db.merge(tokens.Database([ - tokens.TokenizedStringEntry(5, 'five'), - ])) - self.assertEqual({str(e) - for e in db.entries()}, - {'one', 'two', 'three', 'four', 'five'}) - self.assertEqual(db.token_to_entries[4][0].date_removed, - datetime.datetime.min) + db.merge( + tokens.Database( + [ + tokens.TokenizedStringEntry(5, 'five'), + ] + ) + ) + self.assertEqual( + {str(e) for e in db.entries()}, + {'one', 'two', 'three', 'four', 'five'}, + ) + self.assertEqual(db.token_to_entries[4][0].date_removed, datetime.min) self.assertIsNone(db.token_to_entries[5][0].date_removed) # Merge in repeated entries different removal dates. db.merge( - tokens.Database([ - tokens.TokenizedStringEntry( - 4, 'four', date_removed=datetime.datetime.max), - tokens.TokenizedStringEntry( - 5, 'five', date_removed=datetime.datetime.max), - ])) + tokens.Database( + [ + tokens.TokenizedStringEntry( + 4, 'four', date_removed=datetime.max + ), + tokens.TokenizedStringEntry( + 5, 'five', date_removed=datetime.max + ), + ] + ) + ) self.assertEqual(len(db.entries()), 5) - self.assertEqual({str(e) - for e in db.entries()}, - {'one', 'two', 'three', 'four', 'five'}) - self.assertEqual(db.token_to_entries[4][0].date_removed, - datetime.datetime.max) + self.assertEqual( + {str(e) for e in db.entries()}, + {'one', 'two', 'three', 'four', 'five'}, + ) + self.assertEqual(db.token_to_entries[4][0].date_removed, datetime.max) self.assertIsNone(db.token_to_entries[5][0].date_removed) # Merge in the same repeated entries now without removal dates. db.merge( - tokens.Database([ - tokens.TokenizedStringEntry(4, 'four'), - tokens.TokenizedStringEntry(5, 'five') - ])) + tokens.Database( + [ + tokens.TokenizedStringEntry(4, 'four'), + tokens.TokenizedStringEntry(5, 'five'), + ] + ) + ) self.assertEqual(len(db.entries()), 5) - self.assertEqual({str(e) - for e in db.entries()}, - {'one', 'two', 'three', 'four', 'five'}) + self.assertEqual( + {str(e) for e in db.entries()}, + {'one', 'two', 'three', 'four', 'five'}, + ) self.assertIsNone(db.token_to_entries[4][0].date_removed) self.assertIsNone(db.token_to_entries[5][0].date_removed) # Merge in an empty databsse. db.merge(tokens.Database([])) - self.assertEqual({str(e) - for e in db.entries()}, - {'one', 'two', 'three', 'four', 'five'}) + self.assertEqual( + {str(e) for e in db.entries()}, + {'one', 'two', 'three', 'four', 'five'}, + ) - def test_merge_multiple_datbases_in_one_call(self): + def test_merge_multiple_datbases_in_one_call(self) -> None: """Tests the merge and merged methods with multiple databases.""" db = tokens.Database.merged( - tokens.Database([ - tokens.TokenizedStringEntry(1, - 'one', - date_removed=datetime.datetime.max) - ]), - tokens.Database([ - tokens.TokenizedStringEntry(2, - 'two', - date_removed=datetime.datetime.min) - ]), - tokens.Database([ - tokens.TokenizedStringEntry(1, - 'one', - date_removed=datetime.datetime.min) - ])) + tokens.Database( + [ + tokens.TokenizedStringEntry( + 1, 'one', date_removed=datetime.max + ) + ] + ), + tokens.Database( + [ + tokens.TokenizedStringEntry( + 2, 'two', date_removed=datetime.min + ) + ] + ), + tokens.Database( + [ + tokens.TokenizedStringEntry( + 1, 'one', date_removed=datetime.min + ) + ] + ), + ) self.assertEqual({str(e) for e in db.entries()}, {'one', 'two'}) db.merge( - tokens.Database([ - tokens.TokenizedStringEntry(4, - 'four', - date_removed=datetime.datetime.max) - ]), - tokens.Database([ - tokens.TokenizedStringEntry(2, - 'two', - date_removed=datetime.datetime.max) - ]), - tokens.Database([ - tokens.TokenizedStringEntry(3, - 'three', - date_removed=datetime.datetime.min) - ])) - self.assertEqual({str(e) - for e in db.entries()}, - {'one', 'two', 'three', 'four'}) - - def test_entry_counts(self): + tokens.Database( + [ + tokens.TokenizedStringEntry( + 4, 'four', date_removed=datetime.max + ) + ] + ), + tokens.Database( + [ + tokens.TokenizedStringEntry( + 2, 'two', date_removed=datetime.max + ) + ] + ), + tokens.Database( + [ + tokens.TokenizedStringEntry( + 3, 'three', date_removed=datetime.min + ) + ] + ), + ) + self.assertEqual( + {str(e) for e in db.entries()}, {'one', 'two', 'three', 'four'} + ) + + def test_entry_counts(self) -> None: self.assertEqual(len(CSV_DATABASE.splitlines()), 16) db = read_db_from_csv(CSV_DATABASE) @@ -324,42 +432,49 @@ class TokenDatabaseTest(unittest.TestCase): self.assertEqual(len(db.entries()), 18) self.assertEqual(len(db.token_to_entries), 17) - def test_mark_removed(self): + def test_mark_removed(self) -> None: """Tests that date_removed field is set by mark_removed.""" db = tokens.Database.from_strings( - ['MILK', 'apples', 'oranges', 'CHEESE', 'pears']) + ['MILK', 'apples', 'oranges', 'CHEESE', 'pears'] + ) self.assertTrue( - all(entry.date_removed is None for entry in db.entries())) - date_1 = datetime.datetime(1, 2, 3) + all(entry.date_removed is None for entry in db.entries()) + ) + date_1 = datetime(1, 2, 3) db.mark_removed(_entries('apples', 'oranges', 'pears'), date_1) self.assertEqual( - db.token_to_entries[default_hash('MILK')][0].date_removed, date_1) + db.token_to_entries[c_hash('MILK')][0].date_removed, date_1 + ) self.assertEqual( - db.token_to_entries[default_hash('CHEESE')][0].date_removed, - date_1) + db.token_to_entries[c_hash('CHEESE')][0].date_removed, date_1 + ) - now = datetime.datetime.now() + now = datetime.now() db.mark_removed(_entries('MILK', 'CHEESE', 'pears')) # New strings are not added or re-added in mark_removed(). - self.assertGreaterEqual( - db.token_to_entries[default_hash('MILK')][0].date_removed, date_1) - self.assertGreaterEqual( - db.token_to_entries[default_hash('CHEESE')][0].date_removed, - date_1) + milk_date = db.token_to_entries[c_hash('MILK')][0].date_removed + assert milk_date is not None + self.assertGreaterEqual(milk_date, date_1) + + cheese_date = db.token_to_entries[c_hash('CHEESE')][0].date_removed + assert cheese_date is not None + self.assertGreaterEqual(cheese_date, date_1) # These strings were removed. - self.assertGreaterEqual( - db.token_to_entries[default_hash('apples')][0].date_removed, now) - self.assertGreaterEqual( - db.token_to_entries[default_hash('oranges')][0].date_removed, now) - self.assertIsNone( - db.token_to_entries[default_hash('pears')][0].date_removed) - - def test_add(self): + apples_date = db.token_to_entries[c_hash('apples')][0].date_removed + assert apples_date is not None + self.assertGreaterEqual(apples_date, now) + + oranges_date = db.token_to_entries[c_hash('oranges')][0].date_removed + assert oranges_date is not None + self.assertGreaterEqual(oranges_date, now) + self.assertIsNone(db.token_to_entries[c_hash('pears')][0].date_removed) + + def test_add(self) -> None: db = tokens.Database() db.add(_entries('MILK', 'apples')) self.assertEqual({e.string for e in db.entries()}, {'MILK', 'apples'}) @@ -371,13 +486,62 @@ class TokenDatabaseTest(unittest.TestCase): self.assertEqual(len(db.entries()), 6) db.add(_entries('MILK')) - self.assertEqual({e.string - for e in db.entries()}, { - 'MILK', 'apples', 'oranges', 'CHEESE', 'pears', - 'only this one is new' - }) + self.assertEqual( + {e.string for e in db.entries()}, + { + 'MILK', + 'apples', + 'oranges', + 'CHEESE', + 'pears', + 'only this one is new', + }, + ) + + def test_add_duplicate_entries_keeps_none_as_removal_date(self) -> None: + db = tokens.Database() + db.add( + [ + tokens.TokenizedStringEntry(1, 'Spam', '', datetime.now()), + tokens.TokenizedStringEntry(1, 'Spam', ''), + tokens.TokenizedStringEntry(1, 'Spam', '', datetime.min), + ] + ) + self.assertEqual(len(db), 1) + self.assertIsNone(db.token_to_entries[1][0].date_removed) + + def test_add_duplicate_entries_keeps_newest_removal_date(self) -> None: + db = tokens.Database() + db.add( + [ + tokens.TokenizedStringEntry(1, 'Spam', '', datetime.now()), + tokens.TokenizedStringEntry(1, 'Spam', '', datetime.max), + tokens.TokenizedStringEntry(1, 'Spam', '', datetime.now()), + tokens.TokenizedStringEntry(1, 'Spam', '', datetime.min), + ] + ) + self.assertEqual(len(db), 1) + self.assertEqual(db.token_to_entries[1][0].date_removed, datetime.max) + + def test_difference(self) -> None: + first = tokens.Database( + [ + tokens.TokenizedStringEntry(1, 'one'), + tokens.TokenizedStringEntry(2, 'two'), + tokens.TokenizedStringEntry(3, 'three'), + ] + ) + second = tokens.Database( + [ + tokens.TokenizedStringEntry(1, 'one'), + tokens.TokenizedStringEntry(3, 'three'), + tokens.TokenizedStringEntry(4, 'four'), + ] + ) + difference = first.difference(second) + self.assertEqual({e.string for e in difference.entries()}, {'two'}) - def test_binary_format_write(self): + def test_binary_format_write(self) -> None: db = read_db_from_csv(CSV_DATABASE) with io.BytesIO() as fd: @@ -386,7 +550,7 @@ class TokenDatabaseTest(unittest.TestCase): self.assertEqual(BINARY_DATABASE, binary_db) - def test_binary_format_parse(self): + def test_binary_format_parse(self) -> None: with io.BytesIO(BINARY_DATABASE) as binary_db: db = tokens.Database(tokens.parse_binary(binary_db)) @@ -395,100 +559,208 @@ class TokenDatabaseTest(unittest.TestCase): class TestDatabaseFile(unittest.TestCase): """Tests the DatabaseFile class.""" - def setUp(self): + + def setUp(self) -> None: file = tempfile.NamedTemporaryFile(delete=False) file.close() self._path = Path(file.name) - def tearDown(self): + def tearDown(self) -> None: self._path.unlink() - def test_update_csv_file(self): + def test_update_csv_file(self) -> None: self._path.write_text(CSV_DATABASE) - db = tokens.DatabaseFile(self._path) + db = tokens.DatabaseFile.load(self._path) self.assertEqual(str(db), CSV_DATABASE) - db.add([tokens.TokenizedStringEntry(0xffffffff, 'New entry!')]) + db.add([tokens.TokenizedStringEntry(0xFFFFFFFF, 'New entry!')]) db.write_to_file() - self.assertEqual(self._path.read_text(), - CSV_DATABASE + 'ffffffff, ,"New entry!"\n') + self.assertEqual( + self._path.read_text(), + CSV_DATABASE + 'ffffffff, ,"New entry!"\n', + ) - def test_csv_file_too_short_raises_exception(self): + def test_csv_file_too_short_raises_exception(self) -> None: self._path.write_text('1234') with self.assertRaises(tokens.DatabaseFormatError): - tokens.DatabaseFile(self._path) + tokens.DatabaseFile.load(self._path) - def test_csv_invalid_format_raises_exception(self): + def test_csv_invalid_format_raises_exception(self) -> None: self._path.write_text('MK34567890') with self.assertRaises(tokens.DatabaseFormatError): - tokens.DatabaseFile(self._path) + tokens.DatabaseFile.load(self._path) - def test_csv_not_utf8(self): + def test_csv_not_utf8(self) -> None: self._path.write_bytes(b'\x80' * 20) with self.assertRaises(tokens.DatabaseFormatError): - tokens.DatabaseFile(self._path) + tokens.DatabaseFile.load(self._path) class TestFilter(unittest.TestCase): """Tests the filtering functionality.""" - def setUp(self): - self.db = tokens.Database([ - tokens.TokenizedStringEntry(1, 'Luke'), - tokens.TokenizedStringEntry(2, 'Leia'), - tokens.TokenizedStringEntry(2, 'Darth Vader'), - tokens.TokenizedStringEntry(2, 'Emperor Palpatine'), - tokens.TokenizedStringEntry(3, 'Han'), - tokens.TokenizedStringEntry(4, 'Chewbacca'), - tokens.TokenizedStringEntry(5, 'Darth Maul'), - tokens.TokenizedStringEntry(6, 'Han Solo'), - ]) - - def test_filter_include_single_regex(self): + + def setUp(self) -> None: + self.db = tokens.Database( + [ + tokens.TokenizedStringEntry(1, 'Luke'), + tokens.TokenizedStringEntry(2, 'Leia'), + tokens.TokenizedStringEntry(2, 'Darth Vader'), + tokens.TokenizedStringEntry(2, 'Emperor Palpatine'), + tokens.TokenizedStringEntry(3, 'Han'), + tokens.TokenizedStringEntry(4, 'Chewbacca'), + tokens.TokenizedStringEntry(5, 'Darth Maul'), + tokens.TokenizedStringEntry(6, 'Han Solo'), + ] + ) + + def test_filter_include_single_regex(self) -> None: self.db.filter(include=[' ']) # anything with a space self.assertEqual( set(e.string for e in self.db.entries()), - {'Darth Vader', 'Emperor Palpatine', 'Darth Maul', 'Han Solo'}) + {'Darth Vader', 'Emperor Palpatine', 'Darth Maul', 'Han Solo'}, + ) - def test_filter_include_multiple_regexes(self): + def test_filter_include_multiple_regexes(self) -> None: self.db.filter(include=['Darth', 'cc', '^Han$']) - self.assertEqual(set(e.string for e in self.db.entries()), - {'Darth Vader', 'Darth Maul', 'Han', 'Chewbacca'}) + self.assertEqual( + set(e.string for e in self.db.entries()), + {'Darth Vader', 'Darth Maul', 'Han', 'Chewbacca'}, + ) - def test_filter_include_no_matches(self): + def test_filter_include_no_matches(self) -> None: self.db.filter(include=['Gandalf']) self.assertFalse(self.db.entries()) - def test_filter_exclude_single_regex(self): + def test_filter_exclude_single_regex(self) -> None: self.db.filter(exclude=['^[^L]']) - self.assertEqual(set(e.string for e in self.db.entries()), - {'Luke', 'Leia'}) + self.assertEqual( + set(e.string for e in self.db.entries()), {'Luke', 'Leia'} + ) - def test_filter_exclude_multiple_regexes(self): + def test_filter_exclude_multiple_regexes(self) -> None: self.db.filter(exclude=[' ', 'Han', 'Chewbacca']) - self.assertEqual(set(e.string for e in self.db.entries()), - {'Luke', 'Leia'}) + self.assertEqual( + set(e.string for e in self.db.entries()), {'Luke', 'Leia'} + ) - def test_filter_exclude_no_matches(self): + def test_filter_exclude_no_matches(self) -> None: self.db.filter(exclude=['.*']) self.assertFalse(self.db.entries()) - def test_filter_include_and_exclude(self): + def test_filter_include_and_exclude(self) -> None: self.db.filter(include=[' '], exclude=['Darth', 'Emperor']) - self.assertEqual(set(e.string for e in self.db.entries()), - {'Han Solo'}) + self.assertEqual(set(e.string for e in self.db.entries()), {'Han Solo'}) - def test_filter_neither_include_nor_exclude(self): + def test_filter_neither_include_nor_exclude(self) -> None: self.db.filter() self.assertEqual( - set(e.string for e in self.db.entries()), { - 'Luke', 'Leia', 'Darth Vader', 'Emperor Palpatine', 'Han', - 'Chewbacca', 'Darth Maul', 'Han Solo' - }) + set(e.string for e in self.db.entries()), + { + 'Luke', + 'Leia', + 'Darth Vader', + 'Emperor Palpatine', + 'Han', + 'Chewbacca', + 'Darth Maul', + 'Han Solo', + }, + ) + + +class TestDirectoryDatabase(unittest.TestCase): + """Test DirectoryDatabase class is properly loaded.""" + + def setUp(self) -> None: + self._dir = Path(tempfile.mkdtemp('_pw_tokenizer_test')) + self._db_dir = self._dir / '_dir_database_test' + self._db_dir.mkdir(exist_ok=True) + self._db_csv = self._db_dir / f'first{DIR_DB_SUFFIX}' + + def tearDown(self) -> None: + shutil.rmtree(self._dir) + + def test_loading_empty_directory(self) -> None: + self.assertFalse(tokens.DatabaseFile.load(self._db_dir).entries()) + + def test_loading_a_single_file(self) -> None: + self._db_csv.write_text(CSV_DATABASE) + csv = tokens.DatabaseFile.load(self._db_csv) + directory_db = tokens.DatabaseFile.load(self._db_dir) + self.assertEqual(1, len(list(self._db_dir.iterdir()))) + self.assertEqual(str(csv), str(directory_db)) + + def test_loading_multiples_files(self) -> None: + self._db_csv.write_text(CSV_DATABASE_3) + first_csv = tokens.DatabaseFile.load(self._db_csv) + + path_to_second_csv = self._db_dir / f'second{DIR_DB_SUFFIX}' + path_to_second_csv.write_text(CSV_DATABASE_2) + second_csv = tokens.DatabaseFile.load(path_to_second_csv) + + path_to_third_csv = self._db_dir / f'third{DIR_DB_SUFFIX}' + path_to_third_csv.write_text(CSV_DATABASE_4) + third_csv = tokens.DatabaseFile.load(path_to_third_csv) + + all_databases_merged = tokens.Database.merged( + first_csv, second_csv, third_csv + ) + directory_db = tokens.DatabaseFile.load(self._db_dir) + self.assertEqual(3, len(list(self._db_dir.iterdir()))) + self.assertEqual(str(all_databases_merged), str(directory_db)) + + def test_loading_multiples_files_with_removal_dates(self) -> None: + self._db_csv.write_text(CSV_DATABASE) + first_csv = tokens.DatabaseFile.load(self._db_csv) + + path_to_second_csv = self._db_dir / f'second{DIR_DB_SUFFIX}' + path_to_second_csv.write_text(CSV_DATABASE_2) + second_csv = tokens.DatabaseFile.load(path_to_second_csv) + + path_to_third_csv = self._db_dir / f'third{DIR_DB_SUFFIX}' + path_to_third_csv.write_text(CSV_DATABASE_3) + third_csv = tokens.DatabaseFile.load(path_to_third_csv) + + all_databases_merged = tokens.Database.merged( + first_csv, second_csv, third_csv + ) + directory_db = tokens.DatabaseFile.load(self._db_dir) + self.assertEqual(3, len(list(self._db_dir.iterdir()))) + self.assertEqual(str(all_databases_merged), str(directory_db)) + + def test_rewrite(self) -> None: + self._db_dir.joinpath('junk_file').write_text('should be ignored') + + self._db_csv.write_text(CSV_DATABASE_3) + first_csv = tokens.DatabaseFile.load(self._db_csv) + + path_to_second_csv = self._db_dir / f'second{DIR_DB_SUFFIX}' + path_to_second_csv.write_text(CSV_DATABASE_2) + second_csv = tokens.DatabaseFile.load(path_to_second_csv) + + path_to_third_csv = self._db_dir / f'third{DIR_DB_SUFFIX}' + path_to_third_csv.write_text(CSV_DATABASE_4) + third_csv = tokens.DatabaseFile.load(path_to_third_csv) + + all_databases_merged = tokens.Database.merged( + first_csv, second_csv, third_csv + ) + + directory_db = tokens.DatabaseFile.load(self._db_dir) + directory_db.write_to_file(rewrite=True) + + self.assertEqual(1, len(list(self._db_dir.glob(f'*{DIR_DB_SUFFIX}')))) + self.assertEqual( + self._db_dir.joinpath('junk_file').read_text(), 'should be ignored' + ) + + directory_db = tokens.DatabaseFile.load(self._db_dir) + self.assertEqual(str(all_databases_merged), str(directory_db)) if __name__ == '__main__': |