ThreadPool, Coroutine, Promise与Future

ThreadPool, Coroutine, Promise与Future

ThreadPool

线程池,顾名思义,就是放着很多线程的大“池子”。主要用作并发量大,但每个任务需要处理的时间不是很长。比如接受或者发送网络请求的任务。之所以使用线程池,原因就是在线程的传创建和销毁在高并发先开销十分大,如果将线程先创建好放入“池子”中待命,随用随取。降低不必要的开销十分划算。

实现也非常的简单,创建队列,使用生产消费者模型(向队列中增加任务即生产者,将任务执行完成即消费者)。主要实现逻辑如下:

ThreadPool::ThreadPool(int thread_size) 
{
    is_stop = false;
    thread_size_ = thread_size;
    pthread_mutex_init(&mutex_, NULL);
    pthread_cond_init(&empty_cond_, NULL);
    for(int i = thread_size_; i > 0; i--) {
        thread *t = new thread([=](){
            while(true){
                pthread_mutex_lock(&mutex_);
                while(task_list_.empty() && !is_stop) {
                    pthread_cond_wait(&empty_cond_, &mutex_);
                }
                if (is_stop) {
                    pthread_mutex_unlock(&mutex_);
                    return;
                }
                Task task = task_list_.front();
                task_list_.pop();
                pthread_mutex_unlock(&mutex_);
                
                task.f(task.arg);
            }
        });
        thread_vec_.push_back(move(t));
    }
}

Coroutine

协程,又称微线程。这个也名字起的也是很通俗易懂的。就是比线程规模还要小的调度单位。主要用在多并发且I/O开销较高的场景,并且能够减少callback逻辑的使用。在某些不需要并行要求不高的场景下,减少了锁的使用。

在操作系统中,最小的调度单位就已经是线程了。协程其实就是用户态的线程,即何时调度怎样调度由用户自己控制,而不是由系统控制。操作系统分配给用户一个时间片,用户又将这个时间片再次拆分,更加精细的规划利用。由于用户自己拆分利用,开销就要比系统调度的线程小,操作系统不需要在用户与内核态之间切换,不用更换页表。试想在一个高并发多I/O的任务中,线程遇到的I/O阻塞,被切换调度,另个一线程运行一会也阻塞,这样系统需要维护很多线程,很吃系统的资源。但是在协程中,一旦协程任务遇到了I/O,切换到另一个协程任务,开销只是切换了寄存器和栈,而且还是用户态下的。系统在维护少量线程的情况下能够执行更多的任务,将计算资源高效利用。而且调度是由用户控制,大多数情况下在并发下引起的竞争就不存在了,也就减少了锁的使用。

协程的实现要比线程池要麻烦一些,主要是因为用户需要在切换时需要自己保存每个任务的状态。在*nix下系统提供了ucontext库,将任务切换时寄存器与栈的切换封装好的。实现的核心如下列代码所示:


CoroutineBase::CoroutineBase(ucontext_t* main_ctx, void* arg):
		status_(CorStatus::suspend), stack_ptr_(NULL), main_ctx_(main_ctx),
		arg_(arg), yeild_(NULL)
{
	if (main_ctx_ == NULL) return;
	stack_ptr_ = new char[1024 * 32];
	getcontext(&ctx_);
	ctx_.uc_stack.ss_sp = stack_ptr_;
	ctx_.uc_stack.ss_size = 1024 * 32;
	ctx_.uc_stack.ss_flags = 0;
	ctx_.uc_link = main_ctx;
	makecontext(&ctx_, (void (*) (void)) &CoroutineBase::WrapperFunc, 2, this, arg_);
}

void* CoroutineBase::Yeild(void * yeild) {
	if (yeild != NULL) yeild = yeild_;
	status_ = CorStatus::suspend;
	swapcontext(&ctx_, main_ctx_);
	status_ = CorStatus::running;
	return yeild_;
}


CoroutineMain::CoroutineMain() {
	stack_ptr_ = new char[1024 * 32];
	getcontext(&ctx_main_);
	ctx_main_.uc_stack.ss_sp = stack_ptr_;
	ctx_main_.uc_stack.ss_size = 1024 * 32;
	ctx_main_.uc_stack.ss_flags = 0;
	makecontext(&ctx_main_, (void (*) (void)) &CoroutineMain::Run, 1, this);
}
void CoroutineMain::Run() {
	static int x = 8;
	auto list_it = list_ctx_.begin();
	while(!list_ctx_.empty()) {
		if ((*list_it)->get_status() == CorStatus::stop) {
			list_it = list_ctx_.erase(list_it);
		} else if ((*list_it)->get_status() == CorStatus::suspend) {
			ucontext_t* ctx = &(*list_it)->ctx_;
			(*list_it)->yeild_ = (void *)&x;
			swapcontext(&ctx_main_, ctx);
			list_it++;
			if (list_it == list_ctx_.end()) list_it = list_ctx_.begin();
		}
	}
}
void CoroutineMain::CreateCoroutine(CoroutineBase *cor) {
	if (cor != NULL) list_ctx_.push_back(cor);
}

在实现中我模仿python的yield,在协程中可以传入穿出参数。调试反编译swap可以看到其过程其实就是将现有协程的寄存器的信息保存(保留其上下文),在把要切换的寄存器信息恢复,跟操作系统进程间切换是很相似的。

(gdb) disassemble
Dump of assembler code for function swapcontext:
=> 0x00007ffff753e400 <+0>:	mov    %rbx,0x80(%rdi)
   0x00007ffff753e407 <+7>:	mov    %rbp,0x78(%rdi)
   0x00007ffff753e40b <+11>:	mov    %r12,0x48(%rdi)
   0x00007ffff753e40f <+15>:	mov    %r13,0x50(%rdi)
   0x00007ffff753e413 <+19>:	mov    %r14,0x58(%rdi)
   0x00007ffff753e417 <+23>:	mov    %r15,0x60(%rdi)
   0x00007ffff753e41b <+27>:	mov    %rdi,0x68(%rdi)
   0x00007ffff753e41f <+31>:	mov    %rsi,0x70(%rdi)
   0x00007ffff753e423 <+35>:	mov    %rdx,0x88(%rdi)
   0x00007ffff753e42a <+42>:	mov    %rcx,0x98(%rdi)
   0x00007ffff753e431 <+49>:	mov    %r8,0x28(%rdi)
   0x00007ffff753e435 <+53>:	mov    %r9,0x30(%rdi)
   0x00007ffff753e439 <+57>:	mov    (%rsp),%rcx
   0x00007ffff753e43d <+61>:	mov    %rcx,0xa8(%rdi)
   0x00007ffff753e444 <+68>:	lea    0x8(%rsp),%rcx
   0x00007ffff753e449 <+73>:	mov    %rcx,0xa0(%rdi)
   0x00007ffff753e450 <+80>:	lea    0x1a8(%rdi),%rcx
   0x00007ffff753e457 <+87>:	mov    %rcx,0xe0(%rdi)
   0x00007ffff753e45e <+94>:	fnstenv (%rcx)
   0x00007ffff753e460 <+96>:	stmxcsr 0x1c0(%rdi)
   0x00007ffff753e467 <+103>:	mov    %rsi,%r12
   0x00007ffff753e46a <+106>:	lea    0x128(%rdi),%rdx
   0x00007ffff753e471 <+113>:	lea    0x128(%rsi),%rsi
   0x00007ffff753e478 <+120>:	mov    $0x2,%edi
   0x00007ffff753e47d <+125>:	mov    $0x8,%r10d
   0x00007ffff753e483 <+131>:	mov    $0xe,%eax
   0x00007ffff753e488 <+136>:	syscall
   0x00007ffff753e48a <+138>:	cmp    $0xfffffffffffff001,%rax
   0x00007ffff753e490 <+144>:	jae    0x7ffff753e4f0 <swapcontext+240>
   0x00007ffff753e492 <+146>:	mov    %r12,%rsi
   0x00007ffff753e495 <+149>:	mov    0xe0(%rsi),%rcx
   0x00007ffff753e49c <+156>:	fldenv (%rcx)
   0x00007ffff753e49e <+158>:	ldmxcsr 0x1c0(%rsi)
   0x00007ffff753e4a5 <+165>:	mov    0xa0(%rsi),%rsp
   0x00007ffff753e4ac <+172>:	mov    0x80(%rsi),%rbx
   0x00007ffff753e4b3 <+179>:	mov    0x78(%rsi),%rbp
   0x00007ffff753e4b7 <+183>:	mov    0x48(%rsi),%r12
   0x00007ffff753e4bb <+187>:	mov    0x50(%rsi),%r13
   0x00007ffff753e4bf <+191>:	mov    0x58(%rsi),%r14
   0x00007ffff753e4c3 <+195>:	mov    0x60(%rsi),%r15
   0x00007ffff753e4c7 <+199>:	mov    0xa8(%rsi),%rcx
   0x00007ffff753e4ce <+206>:	push   %rcx
   0x00007ffff753e4cf <+207>:	mov    0x68(%rsi),%rdi
   0x00007ffff753e4d3 <+211>:	mov    0x88(%rsi),%rdx
   0x00007ffff753e4da <+218>:	mov    0x98(%rsi),%rcx
   0x00007ffff753e4e1 <+225>:	mov    0x28(%rsi),%r8
   0x00007ffff753e4e5 <+229>:	mov    0x30(%rsi),%r9
   0x00007ffff753e4e9 <+233>:	mov    0x70(%rsi),%rsi
   0x00007ffff753e4ed <+237>:	xor    %eax,%eax
   0x00007ffff753e4ef <+239>:	retq
   0x00007ffff753e4f0 <+240>:	mov    0x37a971(%rip),%rcx        # 0x7ffff78b8e68
   0x00007ffff753e4f7 <+247>:	neg    %eax
   0x00007ffff753e4f9 <+249>:	mov    %eax,%fs:(%rcx)
   0x00007ffff753e4fc <+252>:	or     $0xffffffffffffffff,%rax
   0x00007ffff753e500 <+256>:	retq
End of assembler dump.

Promise & Future

Promise与Future来源于函数式编程,是一种分离计算(Promise)与结果(Future)的范式,在并行化中可以更加灵活的进行计算。在分布式计算中,减少通信往返时引起的延迟,同时这种方式可以使异步程序更直观地表达,而后继传递式(continuation-passing) –wiki

我从字面上理解就是Promise就是一种承诺,保证去计算,在以后(future)会有答案。主要就是将看似并行的代码用线性的思想写出来。比如计算两个数之和, 用传统的方法去写(伪代码):


atomic int i = 0

def callback():
    i += 1

def GetValue( &rst, callback ):
    rst = net_request_api_latency_2s()
    callback()
    return

def main():
    int a  = 0, b = 0
    thread( GetValue(a, callback) )
    thread( GetValue(b, callback) )
    
    while (i < 2) {}
    
    print(a + b)

如果使用Promise与Future 逻辑就不用这么复杂了:

def GetValue():
    rst = net_request_api_latency_2s()
    return rst
    
def main():
    Future a = Promise(GetValue)
    Future b = Promise(GetValue)
    
    print(a.get_value() + b.get_value())

逻辑非常的清晰。

从实现角度来看主要就是封装一下thread,寄存一下结果,由future等待获取结果。

template <class T>
class Promise{
    public:
        explicit Promise(function<T (void*)> func, void *arg) {
            ret_ = new T;
            thread t([=](){*ret_ = func(arg);});
            t_ = move(t);
        }
        Future<T> get_furture() {
            return Future<T>(move(t_), ret_);
        }
        thread t_;
        T *ret_;
};
template <class T>
class Future {
    public:
    Future(thread&& t, T *ret):t_(move(t)), ret_(ret){};
    Future(Future<T>&&) = default;
    T get() {
        t_.join();
        return *ret_;
    }
    ~Future() {
        delete ret_;
    }
    thread t_;
    T* ret_;
};

实验代码在GitHub上可获取

20 Aug 2018 by John Brown