6.824:Lab4-5 Cache

Lab4-5 Caching Locks and Extents

Introduction(Lab4)

在这两个实验中,主要建立了锁和存储服务的缓存,以减少服务器负载并提高客户端的性能。

例如在Lab3中的测试,在一个YFS文件夹中建立100个文件,需要像这个文件夹(directory)的锁请求100次。这次的实验就是修改锁服务,让锁客户端只需要发送一次acquire RPC,把这个锁保存在缓存中,直到有其他的yfs_client需要再释放。

这次的挑战需要修改客户端和服务器端的协议。例如当client2 需要某个被client1缓存在其本地的锁,需要服务通过revoke RPC revoke(找不出比较合适的词翻译) client1 的那个锁,返还给server。client2 才能够得到。

Getting Started(Lab4)

需要改动的文件有:

  • lock_client_cache.{cc,h}:这两个文件替换以前的lock_client文件,主要实现客户端缓存。
  • lock_server_cache.{cc,h}:同样的,代替以前的lock_server文件,实现对应的锁缓存服务
  • handle.{cc,h}:这个类主要包含了缓存节点之间的通信RPC,使用revoke和retry时使用。
  • tprintf.h:这个文件包含了一个宏,用来打印很多debug的信息,尤其是在分布式锁的调试中起到了很关键的作用。

Step One: Design the Protocol(Lab4)

锁客户端需要对每一个锁记录其状态,并要有一个协议来表示他们目前的状态。设计这套协议,并思考协议怎样促使这些状态转换。 这是推荐的一套客户端协议:

  • none :客户端不知道这个锁的存在
  • free :客户端拥有此锁并且也没有线程使用此锁
  • locked :客户端拥有此锁,并且有线程在使用此锁
  • aquiring :客户端正在请求此锁
  • releasing :客户端正在释放此锁

状态转移图如图[1]所示:

image

图[1] 整体系统结构。灰色圆圈中是每一个锁的状态。箭头上的时状态转移的条件。比如acq请求,release释放,revoke,retry 的远程调用等

为了满足这个记录客户端的这些信息,每个锁的结构需要重新规划。如下所示:

class ClientLock{
  public:
	pthread_cond_t wait_acq_;
	pthread_cond_t wait_free_;
	bool is_retried;
	bool is_revoked;
	bool is_finished;
	AcqRet::lock_status status;
	ClientLock():
	    is_retried(false),
	    is_revoked(false),
	    is_finished(false),
	    status(AcqRet::NONE){
	    pthread_cond_init(&wait_acq_, NULL);
	    pthread_cond_init(&wait_free_, NULL);
	}
};
class AcqRet {
public:
	enum status {OK = 0, RETRY};
	enum lock_status {NONE, FREE, LOCK, RELEASING, ACQING};
};

lock_protocol::status
lock_client_cache::acquire(lock_protocol::lockid_t lid)
{
  int ret = lock_protocol::OK;
  ScopeLock map_lock(&lock_map_mutex_);
  ClientLock &lock_item = m_lock_map_[lid];
  bool  is_used = false;
  while(!is_used){
    tprintf("lock_item.status is %d\n", lock_item.status);
	switch(lock_item.status)
	{
	  case AcqRet::NONE:
	      DealWithStatusNONE(lock_item, lid);
		  break;
	  case AcqRet::FREE:
	      DealWithStatusFREE(lock_item, lid);
	      is_used = true;
		  break;
	  case AcqRet::LOCK:
	      DealWithStatusLOCK(lock_item, lid);
		  break;
	  case AcqRet::ACQING:
	      DealWithStatusACQ(lock_item, lid);
		  break;
	  default:
		VERIFY(false);
	}
  }
  return ret;
}

void lock_client_cache::DealRelease(ClientLock & lock_item, lock_protocol::lockid_t lid)
{
    if (lock_item.status == AcqRet::NONE) return;
    if (lock_item.is_revoked){
        int r;
        lock_item.status = AcqRet::NONE;
        lock_item.is_finished = false;
        lock_item.is_revoked = false;
        pthread_mutex_unlock(&lock_map_mutex_);
        lu->dorelease(lid);
        lock_protocol::status ret = cl->call(lock_protocol::release, lid, id, r);
        tprintf("I send it %llu back to server\n", lid);
        VERIFY(ret == lock_protocol::OK);
        pthread_mutex_lock(&lock_map_mutex_);
    }else{
        lock_item.status = AcqRet::FREE;
        lock_item.is_finished = true;
    }
    pthread_cond_broadcast(&lock_item.wait_free_);
    pthread_cond_broadcast(&lock_item.wait_acq_);
}
void lock_client_cache::DealWithStatusACQ(ClientLock & lock_item, lock_protocol::lockid_t lid)
{
    if (lock_item.is_retried){
        SendAcqToSvr(lock_item, lid);
        tprintf("I am here retried 1\n");
    }
    while(lock_item.status == AcqRet::ACQING && !lock_item.is_retried){
        tprintf("I am waiting here\n");
        pthread_cond_wait(&lock_item.wait_acq_, &lock_map_mutex_);
    }
    if (lock_item.is_retried && lock_item.status == AcqRet::ACQING){
        SendAcqToSvr(lock_item, lid);
        tprintf("I am here retried 2\n");
    }
}
void lock_client_cache::DealWithStatusFREE(ClientLock & lock_item, lock_protocol::lockid_t lid)
{
    lock_item.is_finished = false;
     lock_item.status = AcqRet::LOCK;
}
void lock_client_cache::DealWithStatusLOCK(ClientLock & lock_item, lock_protocol::lockid_t lid)
{
    while(lock_item.status == AcqRet::LOCK){
        pthread_cond_wait(&lock_item.wait_free_, &lock_map_mutex_);
    }
}
void lock_client_cache::DealWithStatusNONE(ClientLock & lock_item, lock_protocol::lockid_t lid)
{

    SendAcqToSvr(lock_item, lid);
}

int lock_client_cache::SendAcqToSvr(ClientLock & lock_item, lock_protocol::lockid_t lid){
    int r;
    lock_item.is_retried  = false;
    pthread_mutex_unlock(&lock_map_mutex_);
    int ret = cl->call(lock_protocol::acquire, lid, id, r);
    pthread_mutex_lock(&lock_map_mutex_);
    tprintf("I am send a msg to svr, ret=%d\n", ret);
    if (ret == AcqRet::OK){
        lock_item.status = AcqRet::FREE;
    } else if (ret) {
        lock_item.status = AcqRet::ACQING;
    } else {
        tprintf("unknow svr return %d\n", ret);
    }
    return ret;

}

一个单独的客户端可能有多个线程等待相同的锁,但是每个客户端只有一个线程需要与server互动(interacting)。一旦某个有锁的线程释放掉此锁,那么就会唤醒那些等待这个所的其余线程。在等待的这些锁中,会有一个获得此锁。

当一个client通过acquire RPC请求server时,如果此锁没有被任何client占有时,server返回 OK ,如果此锁被某个client占有时,返回RETRY。同时,server发送一个revoke给占有此锁的client,让他交出这个锁。当这个锁归还给server时,再向请求这个锁的client发出retry RPC,让它重新请求这个锁。如下图所示:

一旦某个client获得了这个锁,client将会缓存此锁(cache,占有)。当他使用晚此锁时,不用release给server,如果这个client再需要acquire此锁时,就不需要向server请求了,从而减少了网络请求,提高了效率。知道有其他的client进行锁请求时,此client才(并且必须)交还此锁给server。

服务器端应该思考怎样保存锁信息,出了是否locked这种信息,还需要保存比如每一个锁现在被谁占(cached),哪些clients正在等待此锁的释放等。

服务端处理如图[2]所示:

image

图[2] 黑色为锁服务器在有缓存下的处理方式,红蓝为客户端,此图模拟了当锁遇到竞争时的状况,A先得锁,B等锁释放

struct CondLockCache : public CondLock{
	CondLockCache() {
	}
	~CondLockCache() {
	}
	std::string user_id_;       //lab4 add
};

//record which clints are waiting for this lock releasing
std::map<lock_protocol::lockid_t, std::queue<std::string> > m_retry_queue_; 

int lock_server_cache::acquire(lock_protocol::lockid_t lid, std::string id, 
                               int &r)
{
      AcqRet::status ret;
	  ScopeLock map_lock(&m_lock_mutex_);
	  CondLockCache& lock_item =  m_map_lock_[lid];
	  if (lock_item.is_locked == false){
		  lock_item.is_locked = true;
		  lock_item.user_id_ = id;
		  ret = AcqRet::OK;
	  }else{
		  VERIFY (!lock_item.user_id_.empty());
		  pthread_mutex_lock(&m_retry_mutex_);
		  m_retry_queue_[lid].push(id);
		  pthread_mutex_unlock(&m_retry_mutex_);
		  //ConnectToClient conn_c(lock_item.user_id_);
		  ret = AcqRet::RETRY;
		  pthread_mutex_unlock(&m_lock_mutex_);
		  int ret_r;
		  ret_r = handle(lock_item.user_id_).safebind()->call(rlock_protocol::revoke, lid, r);
		  if (ret_r != rlock_protocol::OK) tprintf("rlock_protocol::revoke failed!!\n");
	  }
  return ret;
}

在锁服务的acquire大致逻辑是,先判断此锁是否被锁,如果没有,直接返回OK,如果有,记录此客户端ID(IP:PORT)放入retry等待队列。随后像拥有此锁的客户端发送revoke,让其归还此锁。

lock_protocol::status
lock_server_cache::release(lock_protocol::lockid_t lid, std::string id, 
         int &r)
{
  tprintf("%s send %llu to server\n", id.c_str(), lid);
  lock_protocol::status ret = lock_protocol::OK;
  ScopeLock map_lock(&m_lock_mutex_);
  if (m_map_lock_.count(lid) <= 0){
	  tprintf("lid is not acquired by anyone, why do you release?\n");
  }
  CondLockCache& lock_item =  m_map_lock_[lid];
  lock_item.is_locked = false;
  ScopeLock retry_lock(&m_retry_mutex_);
  std::queue<std::string> &rty_q = m_retry_queue_[lid];
  tprintf("now the retry queue size is %zu\n", rty_q.size());
  if (!rty_q.empty()){
      int ret_r;
	  std::string user_id = rty_q.front();
	  rty_q.pop();
	  //ConnectToClient conn_c(user_id);
	  pthread_mutex_unlock(&m_lock_mutex_);
	  pthread_mutex_unlock(&m_retry_mutex_);
	  ret_r = handle(user_id).safebind()->call(rlock_protocol::retry, lid, r);
	  if (ret_r != rlock_protocol::OK) tprintf("rlock_protocol::retry failed!!\n");
	  pthread_mutex_lock(&m_lock_mutex_);
	  pthread_mutex_lock(&m_retry_mutex_);
	  if(!rty_q.empty()){
	    pthread_mutex_unlock(&m_lock_mutex_);
        pthread_mutex_unlock(&m_retry_mutex_);
        ret_r = handle(user_id).safebind()->call(rlock_protocol::revoke, lid, r);
        if (ret_r != rlock_protocol::OK) tprintf("rlock_protocol::revoke failed!!\n");
        pthread_mutex_lock(&m_lock_mutex_);
        pthread_mutex_lock(&m_retry_mutex_);
	  }
  }
  return ret;
}

还锁时,查看等待队列,如果有等待发送retry给等待Client_X。这里需要注意:⚠️如果等待队列里还有等待此锁的客户端,要事先发送revoke给 Client_X,虽然它还没有拿到此锁,因为如果不这样,就再没有客户会请求此锁,也不会有revoke发送给Client_X

这里的user_id 直接记录了当前锁持有client的IP和port 同样在m_retry_queue_这个map中 queue中的string也是IP:PORT的形式,使用这样的方式第一能够保证唯一性,第二方便。 提示:当发送RPC时,要释放目前占有的锁,一个RPC可能维持的时间较长,如果你不希望其他的线程跟着一起等的话,就将它们释放掉。而且不释放还会导致分布式死锁。

下列两个问题能够帮助你思考和设计:

  • 当一个client的线程保持一个锁,另一个线程发出acquire请求时,将会发生什么?这时时不会发送rpc请求的。
  • 当一个客户端持有锁时,revoke请求应当怎样处理?当一个client在收到acquire的反馈之前收到了retry,该怎么办?

提示:当一个client在收到acquire的反馈之前收到了retry,client应当记下这个请求,如果忽视了这个retry,如果你的acquire收到的是RETRY的话,client将永远陷入等待之中。(因为server不会再发送retry)

  • 当一个client在收到acquire的反馈之前收到了revoke,该怎么办?记录起来(跟记录retry一样),返回OK,使用完这个锁后立即归还给server,不要做缓存。(因为server同样不会再发送revoke,此client以为没有其他clients需要,一直不肯归还,其他clients一直饥饿)

Introduction(Lab5)

接下来要做的是对extents,也就是存储数据的服务实现缓存。原因与第一个锁缓存服务一样,都是为了减小开销,增加性能。最主要的任务就是要确保extents缓存下,某个client最终处理的某个数据,是上一个client在处理完后的结果。(不能在处理相同的数据时,各处理各的数据)。

首先需要在client端添加一个本地的extent存储做cache。extent 客户端将再这个基础上对cache进行操作,同时客户端只有在没有某个extent并获取extent或者属性(attr)时访问extent服务器,并在其他extent客户端需要访问修改的时候将脏数据回写入extent服务器。与锁缓存服务有异曲同工之妙!

Step One:Extent Cache(Lab5)

在第一步中,需要做一个带缓存的extent 客户端,不用考虑数据的一致性。首先新的extent_client::get应该检查客户端是否已经在本地的cache中,如果不在,访问extent服务器,将数据取回来,缓存在自己的数据库中。如果在,直接返回。put()函数中,直接替换cached中的数据,没有必要将已经修改的数据(脏数据)传给服务器,保存在缓存中。remove()应该删除本地的extent服务器。同样extent客户端应当记录一个数据的属性。

class FileData {
 public:
    FileContent file_content_;
    bool is_present;
    bool is_dirty;

    FileData():
        is_present(false),
        is_dirty(false)
    {}
};

这是extent_client中extent的数据结构,除了以前的file content结构之外,添加了is_present,is_dirty两个标记。一个表示是否在本地cache中,一个表示已经修改,需要将脏数据回写入extent服务器。

Step Two:Cache Consistency(Lab5)

在第二步中,要确保每一个get()请求请求到的数据时最近的put()改动过的数据,即使get与put的调用方时两个不同的extent客户端。extent客户端和锁服务相互合作以确保每一个inum(indoe)中的数据是一致的。当你释放锁的时候,要flush一下extent对应的inum数据,即回写入extent脏数据以更新extent服务器数据。(如果remove了,直接删了extent服务中对应的数据就好)

假设clientA请求了锁inum,从extent服务器上get了对应的文件数据并在客户端自己的cache中修改了数据,当前clientA即缓存着锁服务的锁,也缓存着extent服务的数据。那么当clientB想要访问这份数据时,首先要拿到client缓存的锁,所以clientA要现将脏数据写入extent服务器中(返还这份数据),再释放对应的锁。这样clientB就能够看到clientA修改完后的数据了。

这里还需要注意使用锁的两个目的:

  1. 确保文件系统中的每一个操作的原子性
  2. 完成了extent缓存的一致性

确保每一个yfs_client对extent的操作包含在acquire和release请求中。

YFS服务应当在释放锁到锁服务器上的时候再调用flush,而不是释放锁的时候。不要每一次调用lock_client::release()flush,而是要在lock_client调用cl->call(lock_protocol::release, lid, id, r);即真正返还锁给锁服务器时(锁服务向该client调用revoke时)再调用。否则就有些矫枉过正了。

在这里,实验环境提供了lock_release_user类,这是一个虚类仅仅支持dorelease的成员方法。我们需要实现一个通过调用dorelease能够调用flush的子类。

extent_protocol::status
extent_client_cache::get(extent_protocol::extentid_t eid, std::string &buf)
{
  extent_protocol::status ret = extent_protocol::OK;
  if (extent_file_cache_[eid].is_present){
      time_t now = time(0);
      extent_file_cache_[eid].file_content_.first.atime = static_cast<int>(now);
      buf = extent_file_cache_[eid].file_content_.second;
  }else{
    extent_protocol::attr &attr = extent_file_cache_[eid].file_content_.first;
    ret = cl->call(extent_protocol::get, eid, buf);
    ret = cl->call(extent_protocol::getattr, eid, attr);
    extent_file_cache_[eid].is_present = true;
    extent_file_cache_[eid].file_content_.second = buf;
  }
  return ret;
}

extent_protocol::status
extent_client_cache::getattr(extent_protocol::extentid_t eid,
               extent_protocol::attr &attr)
{
  extent_protocol::status ret = extent_protocol::OK;
  if (extent_file_cache_[eid].is_present){
      attr = extent_file_cache_[eid].file_content_.first;
  } else {
      ret = cl->call(extent_protocol::getattr, eid, attr);
  }
  return ret;
}

extent_protocol::status
extent_client_cache::put(extent_protocol::extentid_t eid, std::string buf)
{
  extent_protocol::status ret = extent_protocol::OK;
  time_t now = time(0);

  std::map<uint64_t, FileData>::iterator file_cache_it = extent_file_cache_.find(eid);
  if (file_cache_it == extent_file_cache_.end()){
      extent_file_cache_[eid].file_content_.first.atime = static_cast<int>(now);
  }
  extent_file_cache_[eid].file_content_.first.mtime = static_cast<int>(now);
  extent_file_cache_[eid].file_content_.first.ctime = static_cast<int>(now);
  //int r;
  //ret = cl->call(extent_protocol::put, eid, buf, r);
  extent_file_cache_[eid].file_content_.first.size = buf.size();
  extent_file_cache_[eid].file_content_.second = buf;
  extent_file_cache_[eid].is_dirty = true;
  extent_file_cache_[eid].is_present = true;
  return ret;
}

extent_protocol::status
extent_client_cache::remove(extent_protocol::extentid_t eid)
{
  extent_protocol::status ret = extent_protocol::OK;
  std::map<uint64_t, FileData>::iterator file_cache_it = extent_file_cache_.find(eid);
  if (file_cache_it != extent_file_cache_.end()){
      extent_file_cache_.erase(file_cache_it);
  }else{
      int r;
      ret = cl->call(extent_protocol::remove, eid, r);
  }

  return ret;
}

extent_protocol::status extent_client_cache::flush(extent_protocol::extentid_t eid)
{
    extent_protocol::status ret = extent_protocol::OK;
    std::map<uint64_t, FileData>::iterator file_cache_it = extent_file_cache_.find(eid);
    if (file_cache_it == extent_file_cache_.end()){
        int r;
        ret = cl->call(extent_protocol::remove, eid, r);
        return ret;
    }

    if (file_cache_it->second.is_dirty){
        int r;
        std::string &buf = file_cache_it->second.file_content_.second;
        ret = cl->call(extent_protocol::put, eid, buf, r);
        tprintf("put %d back to server, ret=%d\n", eid, ret);
    }
    extent_file_cache_[eid].is_dirty   = false;
    extent_file_cache_[eid].is_present = false;
    return ret;
}

在缓存实验后,我们可以打开rpc的记录日志(在环境中设置 export RPC_COUNT=25 ),它会记录所有rpc请求的数量。在通过执行test-lab-3-c,可以观察在加缓存和没有缓存情况下,客户端请求服务器的次数。实验手册上说如果设计得当,锁服务的请求会降低10倍(很惭愧,只降低了3倍)。其他的请求次数也会有大幅度下降。图[3]为实验对比:

image

图[3]acq没有试验手册下降的多,release到是下降了10倍左右,在extent的试验中,Put和remove下降最大,Get与GetAttr下降明显。GetInum因为没有做缓存,而且在我做实验中设计的不好,随机生成,不太方便做缓存,所以也就没下降

Hints

  • 确保使用线程mutex来保护客户端的extent cache,防止多线程资源竞争。
  • 如果你对extent中的某个data只是做了只读(read-only)的操作,久不要将这一数据flush入extent服务器了,应该使用一个标记位来记录数据是否被修改(is_dirty
  • 这个实现可以不怎么修改yfs_client.cc这一文件,可以通过继承client_extent的方式,实现extent_client_cache,重新定义put get等方式。(因为yfs_client调用extent的方式是指针,这一点实验设计的很巧妙)

22 May 2018 by John Brown