aboutsummaryrefslogtreecommitdiff
path: root/db/db_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'db/db_impl.cc')
-rw-r--r--db/db_impl.cc98
1 files changed, 68 insertions, 30 deletions
diff --git a/db/db_impl.cc b/db/db_impl.cc
index 0ca6386..56182a0 100644
--- a/db/db_impl.cc
+++ b/db/db_impl.cc
@@ -454,13 +454,8 @@ Status DBImpl::WriteLevel0Table(MemTable* mem, VersionEdit* edit,
if (s.ok() && meta.file_size > 0) {
const Slice min_user_key = meta.smallest.user_key();
const Slice max_user_key = meta.largest.user_key();
- if (base != NULL && !base->OverlapInLevel(0, min_user_key, max_user_key)) {
- // Push the new sstable to a higher level if possible to reduce
- // expensive manifest file ops.
- while (level < config::kMaxMemCompactLevel &&
- !base->OverlapInLevel(level + 1, min_user_key, max_user_key)) {
- level++;
- }
+ if (base != NULL) {
+ level = base->PickLevelForMemTableOutput(min_user_key, max_user_key);
}
edit->AddFile(level, meta.number, meta.file_size,
meta.smallest, meta.largest);
@@ -506,25 +501,55 @@ Status DBImpl::CompactMemTable() {
return s;
}
-void DBImpl::TEST_CompactRange(
- int level,
- const std::string& begin,
- const std::string& end) {
+void DBImpl::CompactRange(const Slice* begin, const Slice* end) {
+ int max_level_with_files = 1;
+ {
+ MutexLock l(&mutex_);
+ Version* base = versions_->current();
+ for (int level = 1; level < config::kNumLevels; level++) {
+ if (base->OverlapInLevel(level, begin, end)) {
+ max_level_with_files = level;
+ }
+ }
+ }
+ TEST_CompactMemTable(); // TODO(sanjay): Skip if memtable does not overlap
+ for (int level = 0; level < max_level_with_files; level++) {
+ TEST_CompactRange(level, begin, end);
+ }
+}
+
+void DBImpl::TEST_CompactRange(int level, const Slice* begin,const Slice* end) {
assert(level >= 0);
assert(level + 1 < config::kNumLevels);
- MutexLock l(&mutex_);
- while (manual_compaction_ != NULL) {
- bg_cv_.Wait();
- }
+ InternalKey begin_storage, end_storage;
+
ManualCompaction manual;
manual.level = level;
- manual.begin = begin;
- manual.end = end;
- manual_compaction_ = &manual;
- MaybeScheduleCompaction();
- while (manual_compaction_ == &manual) {
- bg_cv_.Wait();
+ manual.done = false;
+ if (begin == NULL) {
+ manual.begin = NULL;
+ } else {
+ begin_storage = InternalKey(*begin, kMaxSequenceNumber, kValueTypeForSeek);
+ manual.begin = &begin_storage;
+ }
+ if (end == NULL) {
+ manual.end = NULL;
+ } else {
+ end_storage = InternalKey(*end, 0, static_cast<ValueType>(0));
+ manual.end = &end_storage;
+ }
+
+ MutexLock l(&mutex_);
+ while (!manual.done) {
+ while (manual_compaction_ != NULL) {
+ bg_cv_.Wait();
+ }
+ manual_compaction_ = &manual;
+ MaybeScheduleCompaction();
+ while (manual_compaction_ == &manual) {
+ bg_cv_.Wait();
+ }
}
}
@@ -590,12 +615,20 @@ void DBImpl::BackgroundCompaction() {
Compaction* c;
bool is_manual = (manual_compaction_ != NULL);
+ InternalKey manual_end;
if (is_manual) {
- const ManualCompaction* m = manual_compaction_;
- c = versions_->CompactRange(
+ ManualCompaction* m = manual_compaction_;
+ c = versions_->CompactRange(m->level, m->begin, m->end);
+ m->done = (c == NULL);
+ if (c != NULL) {
+ manual_end = c->input(0, c->num_input_files(0) - 1)->largest;
+ }
+ Log(options_.info_log,
+ "Manual compaction at level-%d from %s .. %s; will stop at %s\n",
m->level,
- InternalKey(m->begin, kMaxSequenceNumber, kValueTypeForSeek),
- InternalKey(m->end, 0, static_cast<ValueType>(0)));
+ (m->begin ? m->begin->DebugString().c_str() : "(begin)"),
+ (m->end ? m->end->DebugString().c_str() : "(end)"),
+ (m->done ? "(end)" : manual_end.DebugString().c_str()));
} else {
c = versions_->PickCompaction();
}
@@ -638,7 +671,13 @@ void DBImpl::BackgroundCompaction() {
}
if (is_manual) {
- // Mark it as done
+ ManualCompaction* m = manual_compaction_;
+ if (!m->done) {
+ // We only compacted part of the requested range. Update *m
+ // to the range that is left to be compacted.
+ m->tmp_storage = manual_end;
+ m->begin = &m->tmp_storage;
+ }
manual_compaction_ = NULL;
}
}
@@ -1109,10 +1148,6 @@ Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {
versions_->SetLastSequence(last_sequence);
}
- if (options.post_write_snapshot != NULL) {
- *options.post_write_snapshot =
- status.ok() ? snapshots_.New(last_sequence) : NULL;
- }
ReleaseLoggingResponsibility(&self);
return status;
}
@@ -1225,6 +1260,9 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) {
}
}
return true;
+ } else if (in == "sstables") {
+ *value = versions_->current()->DebugString();
+ return true;
}
return false;