最新消息:yaf表单扩展中新增加了浮点数、日期和集合的校验。php yaf框架扩展实践三——表单

Dive into goroutine internal

Golang 210浏览 0评论

Goroutine的 实现 中有一些简单的概念。

M:对应一个OS线程,执行goroutine封装成的任务(这个任务有自己的代码逻辑、栈、程序计数器)

G:对应一个goroutine,相当于一个个的任务

P:调度器,维护任务(goroutine)运行队列,并交给M执行。更确切的说,是M从调度器取任务执行。

go关键字创建一个goroutine,实际上会转化为 runtime.newproc 的调用:

go f(args)

可以看作

runtime.newproc(size, f, args)

每个goroutine都需要有一个自己的栈,G结构的sched字段维护了栈地址以及程序计数器等代码运行所需的基本信息。也就是说这个goroutine放弃cpu的时候需要保存这些信息,待下次重新获得cpu的时候,需要将这些信息装载到对应的cpu寄存器中。

函数到G的转换:

struct	Gobuf
{
  // The offsets of sp, pc, and g are known to (hard-coded in) libmach.
  uintptr	sp;
  uintptr	pc;
  G*	g;
  void*	ctxt;
  uintreg	ret;
  uintptr	lr;
};

// adjust Gobuf as it if executed a call to fn with context ctxt
// and then did an immediate gosave.
void
runtime·gostartcall(Gobuf *gobuf, void (*fn)(void), void *ctxt)
{
  uintptr *sp;
  
  sp = (uintptr*)gobuf->sp;
  if(sizeof(uintreg) > sizeof(uintptr))
    *--sp = 0;
  *--sp = (uintptr)gobuf->pc;
  gobuf->sp = (uintptr)sp;
  gobuf->pc = (uintptr)fn;
  gobuf->ctxt = ctxt;
}

创建G

runtime·newproc创建一个G,然后放到P的运行队列。

void
runtime·newproc(int32 siz, FuncVal* fn, ...)
{
  byte *argp;

  if(thechar == '5')
    argp = (byte*)(&fn+2);  // skip caller's saved LR
  else
    argp = (byte*)(&fn+1);
  runtime·newproc1(fn, argp, siz, 0, runtime·getcallerpc(&siz));
}

G*
runtime·newproc1(FuncVal *fn, byte *argp, int32 narg, int32 nret, void *callerpc)
{
  /// get G
...
  p = m->p;
  if((newg = gfget(p)) != nil) {
    if(newg->stackguard - StackGuard != newg->stack0)
      runtime·throw("invalid stack in newg");
  } else {
    newg = runtime·malg(StackMin);
    allgadd(newg);
  }
...
  ///设置相关字段
  newg->sched.sp = (uintptr)sp;
  newg->sched.pc = (uintptr)runtime·goexit;
  newg->sched.g = newg;
  runtime·gostartcallfn(&newg->sched, fn);
  newg->gopc = (uintptr)callerpc;
  newg->status = Grunnable;
...
  ///Try to put g on local runnable queue
  runqput(p, newg);
...

创建M

当G太多,M太少,还有空闲的P的时候,就创建一些新的M,去执行G:

// Create a new m.  It will start off with a call to fn, or else the scheduler.
static void
newm(void(*fn)(void), P *p){
  M *mp;

  mp = runtime·allocm(p);
  mp->nextp = p;
  mp->mstartfn = fn;	
...
  runtime·newosproc(mp, (byte*)mp->g0->stackbase);
}

//os_linux.c
void
runtime·newosproc(M *mp, void *stk)
{
  int32 ret;
  int32 flags;
  Sigset oset;

  /*
   * note: strace gets confused if we use CLONE_PTRACE here.
   */
  flags = CLONE_VM	/* share memory */
    | CLONE_FS	/* share cwd, etc */
    | CLONE_FILES	/* share fd table */
    | CLONE_SIGHAND	/* share sig handler table */
    | CLONE_THREAD	/* revisit - okay for now */
    ;

  mp->tls[0] = mp->id;	// so 386 asm can find it
  if(0){
    runtime·printf("newosproc stk=%p m=%p g=%p clone=%p id=%d/%d ostk=%p\n",
      stk, mp, mp->g0, runtime·clone, mp->id, (int32)mp->tls[0], &mp);
  }

  // Disable signals during clone, so that the new thread starts
  // with signals disabled.  It will enable them in minit.
  runtime·rtsigprocmask(SIG_SETMASK, &sigset_all, &oset, sizeof oset);
  ret = runtime·clone(flags, stk, mp, mp->g0, runtime·mstart);
  runtime·rtsigprocmask(SIG_SETMASK, &oset, nil, sizeof oset);

  if(ret < 0) {
    runtime·printf("runtime: failed to create new OS thread (have %d already; errno=%d)\n", runtime·mcount(), -ret);
    runtime·throw("runtime.newosproc");
  }
}

可以看到,OS线程实际上是执行的runtime·mstart函数:

// Called to start an M.
void
runtime·mstart(void)
{
  // Install signal handlers; after minit so that minit can
  // prepare the thread to be able to handle the signals.
  if(m == &runtime·m0)
    runtime·initsig();
  
  if(m->mstartfn)
    m->mstartfn();
  if(m->helpgc) {
    m->helpgc = 0;
    stopm();
  } else if(m != &runtime·m0) {
    acquirep(m->nextp);
    m->nextp = nil;
  }
  schedule(); ///调度
}

schedule()进行任务(goroutine)调度。

调度M

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
static void
schedule(void)
{
  G *gp;
  uint32 tick;
...
  // by constantly respawning each other.
  tick = m->p->schedtick;
  // This is a fancy way to say tick%61==0,
  // it uses 2 MUL instructions instead of a single DIV and so is faster on modern processors.
  if(tick - (((uint64)tick*0x4325c53fu)>>36)*61 == 0 && runtime·sched.runqsize > 0) {
    runtime·lock(&runtime·sched);
    gp = globrunqget(m->p, 1);
    runtime·unlock(&runtime·sched);
    if(gp)
      resetspinning();
  }
  if(gp == nil) {
    gp = runqget(m->p);
    if(gp && m->spinning)
      runtime·throw("schedule: spinning with local work");
  }
  if(gp == nil) {
    gp = findrunnable();  // blocks until work is available
    resetspinning();
  }

...
  execute(gp);  ///运行G
}

运行G

// Schedules gp to run on the current M.
// Never returns.
static void
execute(G *gp)
{
  int32 hz;

  if(gp->status != Grunnable) {
    runtime·printf("execute: bad g status %d\n", gp->status);
    runtime·throw("execute: bad g status");
  }
  gp->status = Grunning;
  gp->waitsince = 0;
  gp->preempt = false;
  gp->stackguard0 = gp->stackguard;
  m->p->schedtick++;
  m->curg = gp;
  gp->m = m;

  // Check whether the profiler needs to be turned on or off.
  hz = runtime·sched.profilehz;
  if(m->profilehz != hz)
    runtime·resetcpuprofiler(hz);

  runtime·gogo(&gp->sched);
}
///asm_amd64.s

// void gogo(Gobuf*)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $0-8
  MOVQ	8(SP), BX		// gobuf
  MOVQ	gobuf_g(BX), DX
  MOVQ	0(DX), CX		// make sure g != nil
  get_tls(CX)
  MOVQ	DX, g(CX)
  MOVQ	gobuf_sp(BX), SP	// restore SP
  MOVQ	gobuf_ret(BX), AX
  MOVQ	gobuf_ctxt(BX), DX
  MOVQ	$0, gobuf_sp(BX)	// clear to help garbage collector
  MOVQ	$0, gobuf_ret(BX)
  MOVQ	$0, gobuf_ctxt(BX)
  MOVQ	gobuf_pc(BX), BX
  JMP	BX

转载请注明:快乐编程 » Dive into goroutine internal

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址