6.824:Lab4-5 Cache
Lab4-5 Caching Locks and Extents
Introduction(Lab4)
在这两个实验中,主要建立了锁和存储服务的缓存,以减少服务器负载并提高客户端的性能。
例如在Lab3中的测试,在一个YFS文件夹中建立100个文件,需要像这个文件夹(directory)的锁请求100次。这次的实验就是修改锁服务,让锁客户端只需要发送一次acquire RPC,把这个锁保存在缓存中,直到有其他的yfs_client需要再释放。
这次的挑战需要修改客户端和服务器端的协议。例如当client2 需要某个被client1缓存在其本地的锁,需要服务通过revoke RPC revoke(找不出比较合适的词翻译) client1 的那个锁,返还给server。client2 才能够得到。
Getting Started(Lab4)
需要改动的文件有:
- lock_client_cache.{cc,h}:这两个文件替换以前的lock_client文件,主要实现客户端缓存。
- lock_server_cache.{cc,h}:同样的,代替以前的lock_server文件,实现对应的锁缓存服务
- handle.{cc,h}:这个类主要包含了缓存节点之间的通信RPC,使用revoke和retry时使用。
- tprintf.h:这个文件包含了一个宏,用来打印很多debug的信息,尤其是在分布式锁的调试中起到了很关键的作用。
Step One: Design the Protocol(Lab4)
锁客户端需要对每一个锁记录其状态,并要有一个协议来表示他们目前的状态。设计这套协议,并思考协议怎样促使这些状态转换。 这是推荐的一套客户端协议:
- none :客户端不知道这个锁的存在
- free :客户端拥有此锁并且也没有线程使用此锁
- locked :客户端拥有此锁,并且有线程在使用此锁
- aquiring :客户端正在请求此锁
- releasing :客户端正在释放此锁
状态转移图如图[1]所示:
图[1] 整体系统结构。灰色圆圈中是每一个锁的状态。箭头上的时状态转移的条件。比如acq请求,release释放,revoke,retry 的远程调用等
为了满足这个记录客户端的这些信息,每个锁的结构需要重新规划。如下所示:
class ClientLock{
public:
pthread_cond_t wait_acq_;
pthread_cond_t wait_free_;
bool is_retried;
bool is_revoked;
bool is_finished;
AcqRet::lock_status status;
ClientLock():
is_retried(false),
is_revoked(false),
is_finished(false),
status(AcqRet::NONE){
pthread_cond_init(&wait_acq_, NULL);
pthread_cond_init(&wait_free_, NULL);
}
};
class AcqRet {
public:
enum status {OK = 0, RETRY};
enum lock_status {NONE, FREE, LOCK, RELEASING, ACQING};
};
lock_protocol::status
lock_client_cache::acquire(lock_protocol::lockid_t lid)
{
int ret = lock_protocol::OK;
ScopeLock map_lock(&lock_map_mutex_);
ClientLock &lock_item = m_lock_map_[lid];
bool is_used = false;
while(!is_used){
tprintf("lock_item.status is %d\n", lock_item.status);
switch(lock_item.status)
{
case AcqRet::NONE:
DealWithStatusNONE(lock_item, lid);
break;
case AcqRet::FREE:
DealWithStatusFREE(lock_item, lid);
is_used = true;
break;
case AcqRet::LOCK:
DealWithStatusLOCK(lock_item, lid);
break;
case AcqRet::ACQING:
DealWithStatusACQ(lock_item, lid);
break;
default:
VERIFY(false);
}
}
return ret;
}
void lock_client_cache::DealRelease(ClientLock & lock_item, lock_protocol::lockid_t lid)
{
if (lock_item.status == AcqRet::NONE) return;
if (lock_item.is_revoked){
int r;
lock_item.status = AcqRet::NONE;
lock_item.is_finished = false;
lock_item.is_revoked = false;
pthread_mutex_unlock(&lock_map_mutex_);
lu->dorelease(lid);
lock_protocol::status ret = cl->call(lock_protocol::release, lid, id, r);
tprintf("I send it %llu back to server\n", lid);
VERIFY(ret == lock_protocol::OK);
pthread_mutex_lock(&lock_map_mutex_);
}else{
lock_item.status = AcqRet::FREE;
lock_item.is_finished = true;
}
pthread_cond_broadcast(&lock_item.wait_free_);
pthread_cond_broadcast(&lock_item.wait_acq_);
}
void lock_client_cache::DealWithStatusACQ(ClientLock & lock_item, lock_protocol::lockid_t lid)
{
if (lock_item.is_retried){
SendAcqToSvr(lock_item, lid);
tprintf("I am here retried 1\n");
}
while(lock_item.status == AcqRet::ACQING && !lock_item.is_retried){
tprintf("I am waiting here\n");
pthread_cond_wait(&lock_item.wait_acq_, &lock_map_mutex_);
}
if (lock_item.is_retried && lock_item.status == AcqRet::ACQING){
SendAcqToSvr(lock_item, lid);
tprintf("I am here retried 2\n");
}
}
void lock_client_cache::DealWithStatusFREE(ClientLock & lock_item, lock_protocol::lockid_t lid)
{
lock_item.is_finished = false;
lock_item.status = AcqRet::LOCK;
}
void lock_client_cache::DealWithStatusLOCK(ClientLock & lock_item, lock_protocol::lockid_t lid)
{
while(lock_item.status == AcqRet::LOCK){
pthread_cond_wait(&lock_item.wait_free_, &lock_map_mutex_);
}
}
void lock_client_cache::DealWithStatusNONE(ClientLock & lock_item, lock_protocol::lockid_t lid)
{
SendAcqToSvr(lock_item, lid);
}
int lock_client_cache::SendAcqToSvr(ClientLock & lock_item, lock_protocol::lockid_t lid){
int r;
lock_item.is_retried = false;
pthread_mutex_unlock(&lock_map_mutex_);
int ret = cl->call(lock_protocol::acquire, lid, id, r);
pthread_mutex_lock(&lock_map_mutex_);
tprintf("I am send a msg to svr, ret=%d\n", ret);
if (ret == AcqRet::OK){
lock_item.status = AcqRet::FREE;
} else if (ret) {
lock_item.status = AcqRet::ACQING;
} else {
tprintf("unknow svr return %d\n", ret);
}
return ret;
}
一个单独的客户端可能有多个线程等待相同的锁,但是每个客户端只有一个线程需要与server互动(interacting)。一旦某个有锁的线程释放掉此锁,那么就会唤醒那些等待这个所的其余线程。在等待的这些锁中,会有一个获得此锁。
当一个client通过acquire RPC请求server时,如果此锁没有被任何client占有时,server返回 OK ,如果此锁被某个client占有时,返回RETRY。同时,server发送一个revoke给占有此锁的client,让他交出这个锁。当这个锁归还给server时,再向请求这个锁的client发出retry RPC,让它重新请求这个锁。如下图所示:
一旦某个client获得了这个锁,client将会缓存此锁(cache,占有)。当他使用晚此锁时,不用release给server,如果这个client再需要acquire此锁时,就不需要向server请求了,从而减少了网络请求,提高了效率。知道有其他的client进行锁请求时,此client才(并且必须)交还此锁给server。
服务器端应该思考怎样保存锁信息,出了是否locked这种信息,还需要保存比如每一个锁现在被谁占(cached),哪些clients正在等待此锁的释放等。
服务端处理如图[2]所示:
图[2] 黑色为锁服务器在有缓存下的处理方式,红蓝为客户端,此图模拟了当锁遇到竞争时的状况,A先得锁,B等锁释放
struct CondLockCache : public CondLock{
CondLockCache() {
}
~CondLockCache() {
}
std::string user_id_; //lab4 add
};
//record which clints are waiting for this lock releasing
std::map<lock_protocol::lockid_t, std::queue<std::string> > m_retry_queue_;
int lock_server_cache::acquire(lock_protocol::lockid_t lid, std::string id,
int &r)
{
AcqRet::status ret;
ScopeLock map_lock(&m_lock_mutex_);
CondLockCache& lock_item = m_map_lock_[lid];
if (lock_item.is_locked == false){
lock_item.is_locked = true;
lock_item.user_id_ = id;
ret = AcqRet::OK;
}else{
VERIFY (!lock_item.user_id_.empty());
pthread_mutex_lock(&m_retry_mutex_);
m_retry_queue_[lid].push(id);
pthread_mutex_unlock(&m_retry_mutex_);
//ConnectToClient conn_c(lock_item.user_id_);
ret = AcqRet::RETRY;
pthread_mutex_unlock(&m_lock_mutex_);
int ret_r;
ret_r = handle(lock_item.user_id_).safebind()->call(rlock_protocol::revoke, lid, r);
if (ret_r != rlock_protocol::OK) tprintf("rlock_protocol::revoke failed!!\n");
}
return ret;
}
在锁服务的acquire大致逻辑是,先判断此锁是否被锁,如果没有,直接返回OK,如果有,记录此客户端ID(IP:PORT)放入retry等待队列。随后像拥有此锁的客户端发送revoke,让其归还此锁。
lock_protocol::status
lock_server_cache::release(lock_protocol::lockid_t lid, std::string id,
int &r)
{
tprintf("%s send %llu to server\n", id.c_str(), lid);
lock_protocol::status ret = lock_protocol::OK;
ScopeLock map_lock(&m_lock_mutex_);
if (m_map_lock_.count(lid) <= 0){
tprintf("lid is not acquired by anyone, why do you release?\n");
}
CondLockCache& lock_item = m_map_lock_[lid];
lock_item.is_locked = false;
ScopeLock retry_lock(&m_retry_mutex_);
std::queue<std::string> &rty_q = m_retry_queue_[lid];
tprintf("now the retry queue size is %zu\n", rty_q.size());
if (!rty_q.empty()){
int ret_r;
std::string user_id = rty_q.front();
rty_q.pop();
//ConnectToClient conn_c(user_id);
pthread_mutex_unlock(&m_lock_mutex_);
pthread_mutex_unlock(&m_retry_mutex_);
ret_r = handle(user_id).safebind()->call(rlock_protocol::retry, lid, r);
if (ret_r != rlock_protocol::OK) tprintf("rlock_protocol::retry failed!!\n");
pthread_mutex_lock(&m_lock_mutex_);
pthread_mutex_lock(&m_retry_mutex_);
if(!rty_q.empty()){
pthread_mutex_unlock(&m_lock_mutex_);
pthread_mutex_unlock(&m_retry_mutex_);
ret_r = handle(user_id).safebind()->call(rlock_protocol::revoke, lid, r);
if (ret_r != rlock_protocol::OK) tprintf("rlock_protocol::revoke failed!!\n");
pthread_mutex_lock(&m_lock_mutex_);
pthread_mutex_lock(&m_retry_mutex_);
}
}
return ret;
}
还锁时,查看等待队列,如果有等待发送retry给等待Client_X。这里需要注意:⚠️如果等待队列里还有等待此锁的客户端,要事先发送revoke给 Client_X,虽然它还没有拿到此锁,因为如果不这样,就再没有客户会请求此锁,也不会有revoke发送给Client_X
这里的user_id 直接记录了当前锁持有client的IP和port 同样在m_retry_queue_这个map中 queue中的string也是IP:PORT的形式,使用这样的方式第一能够保证唯一性,第二方便。 提示:当发送RPC时,要释放目前占有的锁,一个RPC可能维持的时间较长,如果你不希望其他的线程跟着一起等的话,就将它们释放掉。而且不释放还会导致分布式死锁。
下列两个问题能够帮助你思考和设计:
- 当一个client的线程保持一个锁,另一个线程发出acquire请求时,将会发生什么?这时时不会发送rpc请求的。
- 当一个客户端持有锁时,revoke请求应当怎样处理?当一个client在收到acquire的反馈之前收到了retry,该怎么办?
提示:当一个client在收到acquire的反馈之前收到了retry,client应当记下这个请求,如果忽视了这个retry,如果你的acquire收到的是RETRY的话,client将永远陷入等待之中。(因为server不会再发送retry)
- 当一个client在收到acquire的反馈之前收到了revoke,该怎么办?记录起来(跟记录retry一样),返回OK,使用完这个锁后立即归还给server,不要做缓存。(因为server同样不会再发送revoke,此client以为没有其他clients需要,一直不肯归还,其他clients一直饥饿)
Introduction(Lab5)
接下来要做的是对extents,也就是存储数据的服务实现缓存。原因与第一个锁缓存服务一样,都是为了减小开销,增加性能。最主要的任务就是要确保extents缓存下,某个client最终处理的某个数据,是上一个client在处理完后的结果。(不能在处理相同的数据时,各处理各的数据)。
首先需要在client端添加一个本地的extent存储做cache。extent 客户端将再这个基础上对cache进行操作,同时客户端只有在没有某个extent并获取extent或者属性(attr)时访问extent服务器,并在其他extent客户端需要访问修改的时候将脏数据回写入extent服务器。与锁缓存服务有异曲同工之妙!
Step One:Extent Cache(Lab5)
在第一步中,需要做一个带缓存的extent 客户端,不用考虑数据的一致性。首先新的extent_client::get应该检查客户端是否已经在本地的cache中,如果不在,访问extent服务器,将数据取回来,缓存在自己的数据库中。如果在,直接返回。put()函数中,直接替换cached中的数据,没有必要将已经修改的数据(脏数据)传给服务器,保存在缓存中。remove()应该删除本地的extent服务器。同样extent客户端应当记录一个数据的属性。
class FileData {
public:
FileContent file_content_;
bool is_present;
bool is_dirty;
FileData():
is_present(false),
is_dirty(false)
{}
};
这是extent_client中extent的数据结构,除了以前的file content结构之外,添加了is_present
,is_dirty
两个标记。一个表示是否在本地cache中,一个表示已经修改,需要将脏数据回写入extent服务器。
Step Two:Cache Consistency(Lab5)
在第二步中,要确保每一个get()请求请求到的数据时最近的put()改动过的数据,即使get与put的调用方时两个不同的extent客户端。extent客户端和锁服务相互合作以确保每一个inum(indoe)中的数据是一致的。当你释放锁的时候,要flush一下extent对应的inum数据,即回写入extent脏数据以更新extent服务器数据。(如果remove了,直接删了extent服务中对应的数据就好)
假设clientA请求了锁inum,从extent服务器上get了对应的文件数据并在客户端自己的cache中修改了数据,当前clientA即缓存着锁服务的锁,也缓存着extent服务的数据。那么当clientB想要访问这份数据时,首先要拿到client缓存的锁,所以clientA要现将脏数据写入extent服务器中(返还这份数据),再释放对应的锁。这样clientB就能够看到clientA修改完后的数据了。
这里还需要注意使用锁的两个目的:
- 确保文件系统中的每一个操作的原子性
- 完成了extent缓存的一致性
确保每一个yfs_client对extent的操作包含在acquire和release请求中。
YFS服务应当在释放锁到锁服务器上的时候再调用flush,而不是释放锁的时候。不要每一次调用lock_client::release()flush,而是要在lock_client调用cl->call(lock_protocol::release, lid, id, r);
即真正返还锁给锁服务器时(锁服务向该client调用revoke时)再调用。否则就有些矫枉过正了。
在这里,实验环境提供了lock_release_user类,这是一个虚类仅仅支持dorelease的成员方法。我们需要实现一个通过调用dorelease能够调用flush的子类。
extent_protocol::status
extent_client_cache::get(extent_protocol::extentid_t eid, std::string &buf)
{
extent_protocol::status ret = extent_protocol::OK;
if (extent_file_cache_[eid].is_present){
time_t now = time(0);
extent_file_cache_[eid].file_content_.first.atime = static_cast<int>(now);
buf = extent_file_cache_[eid].file_content_.second;
}else{
extent_protocol::attr &attr = extent_file_cache_[eid].file_content_.first;
ret = cl->call(extent_protocol::get, eid, buf);
ret = cl->call(extent_protocol::getattr, eid, attr);
extent_file_cache_[eid].is_present = true;
extent_file_cache_[eid].file_content_.second = buf;
}
return ret;
}
extent_protocol::status
extent_client_cache::getattr(extent_protocol::extentid_t eid,
extent_protocol::attr &attr)
{
extent_protocol::status ret = extent_protocol::OK;
if (extent_file_cache_[eid].is_present){
attr = extent_file_cache_[eid].file_content_.first;
} else {
ret = cl->call(extent_protocol::getattr, eid, attr);
}
return ret;
}
extent_protocol::status
extent_client_cache::put(extent_protocol::extentid_t eid, std::string buf)
{
extent_protocol::status ret = extent_protocol::OK;
time_t now = time(0);
std::map<uint64_t, FileData>::iterator file_cache_it = extent_file_cache_.find(eid);
if (file_cache_it == extent_file_cache_.end()){
extent_file_cache_[eid].file_content_.first.atime = static_cast<int>(now);
}
extent_file_cache_[eid].file_content_.first.mtime = static_cast<int>(now);
extent_file_cache_[eid].file_content_.first.ctime = static_cast<int>(now);
//int r;
//ret = cl->call(extent_protocol::put, eid, buf, r);
extent_file_cache_[eid].file_content_.first.size = buf.size();
extent_file_cache_[eid].file_content_.second = buf;
extent_file_cache_[eid].is_dirty = true;
extent_file_cache_[eid].is_present = true;
return ret;
}
extent_protocol::status
extent_client_cache::remove(extent_protocol::extentid_t eid)
{
extent_protocol::status ret = extent_protocol::OK;
std::map<uint64_t, FileData>::iterator file_cache_it = extent_file_cache_.find(eid);
if (file_cache_it != extent_file_cache_.end()){
extent_file_cache_.erase(file_cache_it);
}else{
int r;
ret = cl->call(extent_protocol::remove, eid, r);
}
return ret;
}
extent_protocol::status extent_client_cache::flush(extent_protocol::extentid_t eid)
{
extent_protocol::status ret = extent_protocol::OK;
std::map<uint64_t, FileData>::iterator file_cache_it = extent_file_cache_.find(eid);
if (file_cache_it == extent_file_cache_.end()){
int r;
ret = cl->call(extent_protocol::remove, eid, r);
return ret;
}
if (file_cache_it->second.is_dirty){
int r;
std::string &buf = file_cache_it->second.file_content_.second;
ret = cl->call(extent_protocol::put, eid, buf, r);
tprintf("put %d back to server, ret=%d\n", eid, ret);
}
extent_file_cache_[eid].is_dirty = false;
extent_file_cache_[eid].is_present = false;
return ret;
}
在缓存实验后,我们可以打开rpc的记录日志(在环境中设置
export RPC_COUNT=25
),它会记录所有rpc请求的数量。在通过执行test-lab-3-c,可以观察在加缓存和没有缓存情况下,客户端请求服务器的次数。实验手册上说如果设计得当,锁服务的请求会降低10倍(很惭愧,只降低了3倍)。其他的请求次数也会有大幅度下降。图[3]为实验对比:
图[3]acq没有试验手册下降的多,release到是下降了10倍左右,在extent的试验中,Put和remove下降最大,Get与GetAttr下降明显。GetInum因为没有做缓存,而且在我做实验中设计的不好,随机生成,不太方便做缓存,所以也就没下降
Hints
- 确保使用线程mutex来保护客户端的extent cache,防止多线程资源竞争。
- 如果你对extent中的某个data只是做了只读(read-only)的操作,久不要将这一数据flush入extent服务器了,应该使用一个标记位来记录数据是否被修改(
is_dirty
) - 这个实现可以不怎么修改yfs_client.cc这一文件,可以通过继承client_extent的方式,实现extent_client_cache,重新定义put get等方式。(因为yfs_client调用extent的方式是指针,这一点实验设计的很巧妙)
22 May 2018 by John Brown