[已完结]CMU数据库(15-445)实验2-B+树索引实现(下)

说说C# 8.0 新增功能Index和Range的^0是什么?

4. Index_Iterator实现

这里就是需要实现迭代器的一些操作,比如begin、end、isend等等

下面是对于IndexIterator的构造函数

template <typename KeyType, typename ValueType, typename KeyComparator>
IndexIterator<KeyType, ValueType, KeyComparator>::
IndexIterator(BPlusTreeLeafPage<KeyType, ValueType, KeyComparator> *leaf,
              int index_, BufferPoolManager *buff_pool_manager):
    leaf_(leaf), index_(index_), buff_pool_manager_(buff_pool_manager) {}

1. 首先我们来看begin函数的实现

  1. 利用key值找到叶子结点
  2. 然后获取当前key值的index就是begin的位置
INDEX_TEMPLATE_ARGUMENTS
INDEXITERATOR_TYPE BPLUSTREE_TYPE::Begin(const KeyType &key) {
  auto leaf = reinterpret_cast<BPlusTreeLeafPage<KeyType, ValueType,KeyComparator> *>(FindLeafPage(key, false));
  int index = 0;
  if (leaf != nullptr) {
    index = leaf->KeyIndex(key, comparator_);
  }
  return IndexIterator<KeyType, ValueType, KeyComparator>(leaf, index, buffer_pool_manager_);
}

2. end函数的实现

  1. 找到最开始的结点
  2. 然后一直向后遍历直到nextPageId=-1结束
  3. 这里注意需要重载!===

end函数

INDEX_TEMPLATE_ARGUMENTS
INDEXITERATOR_TYPE BPLUSTREE_TYPE::end() {
  KeyType key{};
  auto leaf= reinterpret_cast<BPlusTreeLeafPage<KeyType, ValueType,KeyComparator> *>( FindLeafPage(key, true));
  page_id_t new_page;
  while(leaf->GetNextPageId()!=INVALID_PAGE_ID){
    new_page=leaf->GetNextPageId();
    leaf=reinterpret_cast<BPlusTreeLeafPage<KeyType, ValueType,KeyComparator> *>(buffer_pool_manager_->FetchPage(new_page));
  }
  buffer_pool_manager_->UnpinPage(new_page,false);
  return IndexIterator<KeyType, ValueType, KeyComparator>(leaf, leaf->GetSize(), buffer_pool_manager_);
}

==和 !=函数

bool operator==(const IndexIterator &itr) const {
  return this->index_==itr.index_&&this->leaf_==itr.leaf_;
}

bool operator!=(const IndexIterator &itr) const {
  return !this->operator==(itr);
}

3. 重载++和*(解引用符号)

  1. 重载++

简单的index++然后设置nextPageId即可

template <typename KeyType, typename ValueType, typename KeyComparator>
IndexIterator<KeyType, ValueType, KeyComparator> &IndexIterator<KeyType, ValueType, KeyComparator>::
operator++() {
//
 // std::cout<<"++"<<std::endl;
  ++index_;
  if (index_ == leaf_->GetSize() && leaf_->GetNextPageId() != INVALID_PAGE_ID) {
    // first unpin leaf_, then get the next leaf
    page_id_t next_page_id = leaf_->GetNextPageId();

    auto *page = buff_pool_manager_->FetchPage(next_page_id);
    if (page == nullptr) {
      throw Exception("all page are pinned while IndexIterator(operator++)");
    }
    // first acquire next page, then release previous page
    page->RLatch();

    buff_pool_manager_->FetchPage(leaf_->GetPageId())->RUnlatch();
    buff_pool_manager_->UnpinPage(leaf_->GetPageId(), false);
    buff_pool_manager_->UnpinPage(leaf_->GetPageId(), false);

    auto next_leaf =reinterpret_cast<BPlusTreeLeafPage<KeyType, ValueType,KeyComparator> *>(page->GetData());
    assert(next_leaf->IsLeafPage());
    index_ = 0;
    leaf_ = next_leaf;
  }
  return *this;
};
  1. 重载*

return array[index]即可

template <typename KeyType, typename ValueType, typename KeyComparator>
const MappingType &IndexIterator<KeyType, ValueType, KeyComparator>::
operator*() {
  if (isEnd()) {
    throw "IndexIterator: out of range";
  }
  return leaf_->GetItem(index_);
}

5. 并发机制的实现

0. 首先复习一下读写机制

  1. 读操作是可以多个进程之间共享latch的而写操作则必须互斥
  2. 加入MaxReader数就是为了防止等待的⌛️写进程饥饿

首先来看如果没有机制多线程会发生什么问题

  1. 线程T1想要删除44。
  2. 线程T2 想要查找41
  1. 假设T2在执行到D位置的时候又切换到线程T1
  2. 这个时候T1进行重新分配,会把41借到I结点上
  3. T1执行完成切换回T2这时候T2再去原来的执行寻找41就会找不到

就会出现下面的情况。

由此我们需要读写的存在

  1. 对于find操作

由于我们是只读操作,所以我们到下一个结点的时候就可以释放上一个结点的Latch

剩下的操作都是一样的

  1. 对于delete则不一样

因为我们需要写操作

这里我们不能释放结点A的Latch。因为我们的删除操作可能会合并根节点。

到D的时候。我们会发现D中的38删除之后不需要进行合并,所以对于A和B的写Write是可以安全释放了

  1. 对于Insert操作

这里我们就可以安全的释放掉A的锁。因为B中还有空位,我们插入是不会对A造成影响的

当我们执行到D这里发现D中已经满了。所以此时我们不会释放B的锁,因为我们会对B进行写操作

上面的算法虽然是正确的但是有瓶颈问题。由于只有一个线程可以获得写Latch。而插入和删除的时候都需要对头结点加写Latch。所以多线程在有许多个插入或者删除操作的时候,性能就会大打折扣

这里要引入乐观

乐观的假设大部分操作是不需要进行合并和分裂的。因此在我们向下的时候都是读Latch而不是写Latch。只有在叶子结点才是write Latch

  1. 从上到下都是读Latch。而且逐步释放
  2. 到叶子结点需要修改的时候才为写Latch。这个删除是安全的所以直接结束
  1. 当我们到最后一步发现不安全的时候。则需要像上面我们没有引入乐观的时候一样。重新执行一遍

B-Link Tree简介

【项目实践】商业计算怎样才能保证精度不丢失

延迟更新父结点

这里用一个来标记这里需要被更新但是还没有执行

这个时候我们执行其他操作也是正确的比如查找31

这里我们执行insert 33

当执行到结点C的时候。因为这个时候有另一个线程持有了write Latch。所以这个时候操作要执行。随后在插入33

最后一点补充关于扫描操作的

  1. 线程1在C结点上持有write Latch
  2. 线程2已经扫描完了结点B想要获得结点C的read Latch

这时候会发生问题,因为线程2无法拿到read Latch

这里有几种解决方法

  1. 可以等到T1的写操作完成
  2. 可以重新执行T2
  3. 可以直接让线程T2停止抢得这个Latch。

注意这里的LatchLock并不一样

1. 辅助函数UnlockUnpinPages的实现

  1. 如果是读操作则释放read锁
  2. 否则释放write锁
INDEX_TEMPLATE_ARGUMENTS
void BPLUSTREE_TYPE::
UnlockUnpinPages(Operation op, Transaction *transaction) {
  if (transaction == nullptr) {
    return;
  }

  for (auto page:*transaction->GetPageSet()) {
    if (op == Operation::READ) {
      page->RUnlatch();
      buffer_pool_manager_->UnpinPage(page->GetPageId(), false);
    } else {
      page->WUnlatch();
      buffer_pool_manager_->UnpinPage(page->GetPageId(), true);
    }
  }
  transaction->GetPageSet()->clear();

  for (const auto &page_id: *transaction->GetDeletedPageSet()) {
    buffer_pool_manager_->DeletePage(page_id);
  }
  transaction->GetDeletedPageSet()->clear();

  // if root is locked, unlock it

  node_mutex_.unlock();
  }

四个自带的解锁和上锁操作

/** Acquire the page write latch. */
inline void WLatch() { rwlatch_.WLock(); }

/** Release the page write latch. */
inline void WUnlatch() { rwlatch_.WUnlock(); }

/** Acquire the page read latch. */
inline void RLatch() { rwlatch_.RLock(); }

/** Release the page read latch. */
inline void RUnlatch() { rwlatch_.RUnlock(); }

这里的rwlatch是自己实现的读写锁类下面来探究一下这个类

由于c++ 并发编程我现在还不太会。。。所以就简单看一下啦后面学完并发编程再补充

  1. WLock函数

    1. 首先获取一个锁
    2. 用一个记号writer_entered表示是否有写操作
    3. 如果之前已经有了现在的操作就需要等(这个线程处于阻塞状态)
    4. 当前如果有其他线程执行读操作。则仍需要阻塞(别人读的时候你不能写)
    void WLock() {
      std::unique_lock<mutex_t> latch(mutex_);
      while (writer_entered_) {
        reader_.wait(latch);
      }
      writer_entered_ = true;
      while (reader_count_ > 0) {
        writer_.wait(latch);
      }
    }
    
  2. WunLock函数

    1. 写标记置为false
    2. 然后通知所有的线程
    void WUnlock() {
      std::lock_guard<mutex_t> guard(mutex_);
      writer_entered_ = false;
      reader_.notify_all();
    }
    
  3. RLock函数

    1. 如果当前有人在写或者已经有最多的人读了则阻塞
    2. 否则只需要让读的计数++

    因为是允许多个线程一起读这样并不会出错

    void RLock() {
      std::unique_lock<mutex_t> latch(mutex_);
      while (writer_entered_ || reader_count_ == MAX_READERS) {
        reader_.wait(latch);
      }
      reader_count_++;
    }
    
  4. RUnLatch函数

    1. 计数--
    2. 如果当前有人在写并且无人读的话需要通知所有其他线程
    3. 如果在计数--之前达到了最大读数,释放这个锁之后需要通知其他线程,现在又可以读了。
    void RUnlock() {
      std::lock_guard<mutex_t> guard(mutex_);
      reader_count_--;
      if (writer_entered_) {
        if (reader_count_ == 0) {
          writer_.notify_one();
        }
      } else {
        if (reader_count_ == MAX_READERS - 1) {
          reader_.notify_one();
        }
      }
    }
    

6. Summary

好了终于磕磕绊绊的写完了Lab2.关于数据库的Lab2应该会停一段时间。这段时间要补一补深度学习(毕竟要毕业)然后赶工一下老师给的活。同时学一下c++并发编程和看一下侯捷老师的课程。

最后附上GitHub的
https://github.com/JayL-zxl/CMU15-445Lab

three.js cannon.js物理引擎之Heightfield

相关推荐

发表评论

路人甲

网友评论(0)