6.824:Lab 1 Lock Server

Prologue

本文是基于MIT 6.824 Distributed Systems 2012年实验,实现一个多服务器的文件系统 YFS(Yet Another File System)。以下是实验指导+报告,实验代码以上传至github,可做交流。

Lab 1:Lock Server

Introduction

在这一系列的实验中,你将要实现一个功能齐全的分布式文件系统。为了能够正常的工作,yfs服务器需要一个锁服务来完成文件系统操作。在这个实验中,将要完成一个简单的锁服务。

锁服务的核心逻辑由两个模块组成,锁服务客户端和服务端。他们之间通过RPC(Remote procedure call 远程调用)进行通信。客户端通过发送acquire请求给锁服务器获取一个锁。锁服务保证每个锁每次只发送给同一个客户端。当客户端完成临界资源区操作后,发送release请求给服务器,释放该锁。此时服务器再将此锁给其他的客户端。

另外为了实现锁服务,需要RPC保证每一次请求至多传一次(at-more-once)来消除RPC的重复传输。重复传输之所以会存在是因为RPC系统必须重传丢失的RPC报文,如果原先的请求没有丢弃,这种重传会导致RPC重复传递。

如果重复的RPC被传递,并且处理不当的话,经常会使应用出现问题。比如当A客户发送acquire给服务器索要锁x,服务器给了A,当A通过release释放这个锁的时候,原先acquire请求才到达服务器,服务器又把这个锁给了A,但是A永远不会释放这个锁了(因为他没有acquire这个锁)。这种行为明显是有问题的。

Your Job

首要任务就是在一个网络状态良好的情况下实现一个功能正确的锁服务器。功能正确即:在任何一个时间点,一个锁至多有一个用户持有。

实验将使用lock_tester进行测试以检验锁服务的正确性。比如检验服务器是否在任意时间一个锁至多分配给一个用户。

第二个任务是去报RPC服务每次请求至多请求一次(at-most-once,以后为了简便使用at-most-once)你可以设置环境变量RPC_LOSSY请求来模拟网络数据丢失的情况。需要修改的文件为rpc.{cc,h}, lock_client.{cc,h}, lock_server.{cc,h}, lock_smain.{cc,h}。

在这一实验中,你不用考虑服务器或者客户端的失败或者失效,但是需要注意一些有问题的应用。

Detailed Guidance

原则上你可以使用任何的方法来保证在“your job”提到的任务,并通过测试工具的测试。但你可以参考Detailed Guidance中的提示更方便的完成。

Step One 在网络状态良好的情况下实现锁服务器

首先需要在网络状态良好的情况下实现锁服务器,不用考虑重复传输的RPC。

  • 使用RPC系统
    RPC的源代码在rpc的子目录下,一个服务通过申请端口监听方式实现一个RPC对象的方式(rpcs)使用RPC。客户端通过创建客户端对象(rpcc)链接服务端的IP和端口发起RPC。

    每一个RPC通过有一个唯一的辨识ID。通过在lock_protocol.h中对acquire和release的定义了一个RPC辨识ID,如果需要注册其他的RPC,同样也需要注册(见lock_main.cc)。

    你可以通过实现lock_client和lock_server的方式学习RPC系统。RPC的请求参数从1-6个不等,返回一个整数作为状态码。返回0就是成功,返回其他的非0正整数就是存在着各种问题,比如超时等等。

    RPC系统将传入参数序列化为一个stream通过网络传输到另一端,另一端通过反序列化的方式将传入参数还原出来。需要注意的时,RPC反序列化时是不会检查传输信息的类型的。比如你传入一个uint32_t的类型,但是另一端需要把它反序列解成uint64_t,这是可以的。不过会出现各种难以预计的错误。所以调用端和接收端对于参数要统一。

    RPC库提供了一些常用的C++对象的序列化方法,如:std::string, int 和 char(见rpc.cc)。如果你的RPC调用了不同类型的对象作为传参,那么你就需要自己实现它的序列和反序列的方法。

  • 实现锁服务
    锁服务可以处理不同的锁。每一个锁有一个唯一的标识符lock_protocol::lockid_t。如果一个客户请求这个锁没有在锁服务的集合中,那就创建这个锁,并给这个客户。如果有这个锁,那么检查此锁的状态,如果这个锁被别的客户所使用,那么就让此客户等待,直到占有这个锁的客户将此锁归还。

    需要修改lock_server.{cc,h}这个文件接收来自客户端的acquire和release请求。并记录每一个锁的状态。

    一个锁有两种不同的状态

    • free:这个锁没有任何一个客户持有
    • locked:某个客户端正在持有此锁

    服务器端的锁管理可以使用C++ STL中的 std::map来实现每一个锁状态的保管

  • 实现锁客户端
    客户类lock_client时lock_server的客户端侧的接口。这个接口提供两个方法acquire和release接收和发送RPC接口。可以参考lock_dome.cc这个文件来参考接口的使用方法。注意lock_client::acquire必须要等到请求到锁后才能返回

  • 处理多线程并发
    lock_client和lock_ser都使用了多线程并发的策略。在服务端,RPC提供了一个线程池,每次使用一个空闲的线程。在客户端一侧,不同的线程也可以并发地调用acquire和release。

    应当使用线程互斥来保证共享数据的安全性。你应该使用线程条件变量以保证锁服务请求中等待逻辑。(可以参考pthread_cond_t的使用方法)

    线程在一个循环中等待,判断是否可换新。这样可以保护线程在pthread_cond_wait和pthread_cond_timewait()函数唤醒。

    一个简单的互斥使用策略是,对于lock_server使用一个单一的mutex来保护临界区(那个锁状态的map),使用较粗粒度的互斥锁就可以了。

    逻辑如图[1]所示:

    image

    图[1]: clientA向服务器端请求,获取锁。clientB在向服务器请求后,由于A占用着锁,所以需要等待,直到A向服务器释放此锁

lock_protocol::status
lock_server::acquire_lock(int clt, lock_protocol::lockid_t lid, int &r)
{
	lock_protocol::status ret = lock_protocol::OK;
	pthread_mutex_lock(&pcontrol_.pmutex);
	if (lock_map_.count(lid) <= 0) {
		lock_map_[lid].is_locked = true;
	} else {
		while (lock_map_[lid].is_locked == true) {
			pthread_cond_wait(&lock_map_[lid].pcond, &pcontrol_.pmutex);
		}
		lock_map_[lid].is_locked = true;
	}
	ret = lock_protocol::OK;
	pthread_mutex_unlock(&pcontrol_.pmutex);
	return ret;
}

lock_protocol::status
lock_server::release_lock(int clt, lock_protocol::lockid_t lid, int &r) {
	lock_protocol::status ret = lock_protocol::OK;
	pthread_mutex_lock(&pcontrol_.pmutex);
	if (lock_map_.count(lid) <= 0){
		pthread_mutex_unlock(&pcontrol_.pmutex);
		ret = lock_protocol::IOERR;
	}
	if (lock_map_[lid].is_locked) {
		lock_map_[lid].is_locked = false;
		pthread_cond_signal(&lock_map_[lid].pcond);
	} else {
		std::cout << "lid:" << lid << "is not locked, why release?" << std::endl;
	}
	pthread_mutex_unlock(&pcontrol_.pmutex);
	return ret;
}
struct CondLock {
	CondLock() {
		is_locked = false;
		pthread_cond_init(&pcond, NULL);
	}
	virtual ~CondLock() {
		pthread_cond_destroy(&pcond);
	}
	bool is_locked;
	pthread_cond_t pcond;
};

Setp Two:RPC实现at-more-once传输

RPC代码已经完成了客户端的at-most-once:客户端在等待回复超时时,重新发送请求,完成了每次请求时at-most-once服务端所需的信息。但是服务端的at-most-once代码是有部分缺失的。实现的逻辑在rpcs:chechduplicate_and_update和rpcs::add_reply。任务就是完成这两个函数。

在你的锁服务已经沟通过了lock_tester之前,开启模拟网络丢失的全局变量“export RPC_LOSSY=5”,这时再使用lock_tester会发现很大概率无法成功完成。

在处理这个实验室,先大体上了解RPC的框架结构。rpc.cc已经包含了一些处理重复请求的代码,你的任务就是补全这些代码。

rpcc类时时RPC的客户端类,核心代码在call1这个函数中,他序列化RPC请求并传输给RPC服务器,call1在序列化请求时会填充一些RPC的信息:

// add RPC fields before the RPC request data
   req_header h(ca.xid, proc, clt_nonce_, srv_nonce_, xid_rep_window_.front());
   req.pack_req_header(h);

每一个 req_headre的目的是什么,在call1完成RPC请求的准备后,会在一个while(1)的循环中,等待超时后重传。

rpcs类管理服务端RPC请求,当收到一个RPC的请求后,调用got_pdu将这个请求分配给线程池中的一个线程。线程池包含了几个固定的线程数。头部的header结构包含了足够的信息来消除所有的重复请求。

怎样确保at-most-once传输?一个方法就是让服务端记住每一个接收的RPC,每一个RPC有一个唯一的标识符xid(每一个客户保持唯一),和clt_nonce(所有客户保持唯一)。服务端同样要记住每一个RPC返回的值。这种方法保证了at-most-once,但是这么做内存会随着RPC的id和回复不断增长。有一种很好的替换方式就是RPC使用滑动窗口的策略记录xid,要求客户端严格按照指定的顺序来生成xid,比如0,1,2,3.。。这样RPC如果安全传到,就忘掉这个xid,因为xid是按顺序递增,不会有相同的重复,低的就可以忽略。 首先

  1. 检查每一个请求如果是重复的,就返回记录他的请求
  2. 如果不是重复的请求就将这个请求记住
  3. 缩小已经被记住的请求并回复。 第二,如果一个RPC调用服务器端已经完成,可以使用reply_window_来记录已经RPC的返回值。 完成这两步就可使用RPC_LOSSY进行测试。
rpcs::rpcstate_t 
rpcs::checkduplicate_and_update(unsigned int clt_nonce, unsigned int xid,
		unsigned int xid_rep, char **b, int *sz)
{
	ScopedLock rwl(&reply_window_m_);

        // You fill this in for Lab 1.

	std::list<reply_t> :: iterator lit = reply_window_[clt_nonce].begin();
	while(lit != reply_window_[clt_nonce].end()) {
		if (lit->xid == xid) {
			if (lit->cb_present == false) {
				return INPROGRESS;
			} else {
				*b = lit->buf;
				*sz = lit->sz;
				return DONE;
			}
		}
		if (lit->xid <= xid_rep && lit->cb_present) {
			free(lit->buf);
			lit = reply_window_[clt_nonce].erase(lit);
			continue;
		}

		lit++;
	}

	if (reply_window_[clt_nonce].size() > 0){
		if (reply_window_[clt_nonce].back().xid > xid)
			return FORGOTTEN;
	}

	reply_t now_reply(xid);
	lit = reply_window_[clt_nonce].begin();
	while(lit != reply_window_[clt_nonce].end()){
		if (lit->xid < xid)
			break;
		lit++;
	}
	if (lit != reply_window_[clt_nonce].begin())
		lit--;
	reply_window_[clt_nonce].insert(lit, now_reply);
	return NEW;
}
void
rpcs::add_reply(unsigned int clt_nonce, unsigned int xid,
		char *b, int sz)
{
	ScopedLock rwl(&reply_window_m_);
        // You fill this in for Lab 1.
	std::list<reply_t> :: iterator lit = reply_window_[clt_nonce].begin();
	for (; lit != reply_window_[clt_nonce].end(); lit++) {
		if (lit->xid == xid) {
			lit->buf = b;
			lit->sz = sz;
			lit->cb_present = true;
			break;
		}
	}
}

22 May 2018 by John Brown