Hunter的大杂烩 技术学习笔记

September 10, 2006

从setjmp到getcontext,最后遇到了libpth

Filed under: Linux,技术话题 — hunter @ 3:13 am

最近在准备开始把系统重构一次,底层还是用app platform平台,但是
代码需要全部OO重新设计和分析一次,乘此机会,顺便把原来的设计和代码
中不合理的地方来个大扫除。
在设计的时候,遇到最大的问题,就是app platform只是一个事件驱动
的tcp server framework,单进程架构,在处理异步通信的时候,代码结构
会比较恶心:
void excute(command)
{
switch(state)
{
case start:
int iRetcode= GetUserIdRequest();
…..
state = wait_recv;
break;
case wait_recv:
int iUserId = GetUserIdResponse();
….
break;
}
}


完美的流程应该如下:
void excute(command)
{
int iUserId = Obj.GetUserId();
}

class OBJ
{
int GetUserId()
{
SendGetUserIdRequest();
sceduler(); //主动释放时间片
RecvGetUserIdRequest();
}
}

一开始的想法就是,要实现这种形式恐怕要用到多线程,最坏的结果就是平台
使用多线程结构,但是只有一个处理线程,处理线程在调用远程接口时,
将控制权转给main thread,但是对于每个task,它应该以为自己还是在
串行化执行,不用考虑到函数重入和线程安全之类的问题。
但是这种模式还是无法解决如何从远程接口函数中跳出来的问题
sceduler(); //如何让远程接口getuserid()把控制权转移给main thread?
//同时可以让处理线程继续处理下一个task的请求?
除非每个task都是一个thread吧,但是在slk linux 8.1下,没有NPTL的支
持,如果每个进程内有100 个task,那就是要 102个 thread在跑,这种
系统结构一点也没降低系统负担。
对系统级thread实现死心之后,有一段时间放弃了继续研究的想法,直到
某天突然想到了setjmp这个调用,它好像可以保留一个现场,然后从sub
func的调用中跳转出来的,虽然其内部不了解实现,但是不妨碍我做了一个
测试程序(伪代码):

main()
{
while(1)
{
int dwEvent = rand()%13;
if (dwEvent >=10)
create_new_task();
else
{
if (setjmp() != 1)
process_task();
}
}
}
void process_task()
{
// do some thing
longjmp(1);
}
但是我的想法太天真了,经过反复几次调用之后,栈空间就乱套了,然后
程序就core了起来,在网上搜索之后,发现了一篇文章
setjmp构建简单协作式多任务系统),发现它在创建task的时候,为
每个task都保留了一个不重复的栈地址(用汇编移动寄存器sp的值),猜测
setjmp只是保存了一些寄存器的上下文,而没有保存栈的数据,导致函数
重入几次之后,前一个函数的栈数据就给后一个函数覆盖了,不过文中没有
提及linux下栈地址是如何更改的….

又郁闷了几天,然后又是无意中想到--用户级的thread是如何调度
task的?每个线程都以为自己是唯一正在执行的task,每个操作都是顺序
执行的,这不正式我要达到的目的(让task内的代码看上去就象是顺序执行,
把异步的通信隐藏在平台容器内实现)吗?立马下载了libpth,仔细研究
对方的代码(比较痛苦,因为是跨平台的,要发现到底哪个函数是为linux而
写,花了不少时间),最终发现里面有一个对getcontext调用的
封装:pth_uctx,这个就是libpth如何做到让每个函数都以为自己是一个
独立的“线程”的关键,在此基础上,对原来的例子做了一些改动,
实现一个测试程序,基本达到了我预想中的效果:
void schedule(TASK &stTask)
{
assert(stTask.bIsPending == false);
stTask.bIsPending = true;
if (pth_uctx_switch(stTask.stUCTX, gstTaskList.astTaskList[0].stUCTX) != 1)
{
fprintf(stderr,”pth_uctx_switch error %s\n”, strerror(errno));
exit(0);
}
assert(stTask.bIsPending);
stTask.bIsPending = false;
}

class CUserInfo
{
TASK *m_pstTask;
public:
CUserInfo(TASK *pstTask)
{
m_pstTask = pstTask;
};
string GetUserNickName()
{
volatile int dwGuard = 20060907;
SocketContext *pstSocketContext = &m_pstTask->stContext;
char *ptr = 0;
while(1)
{
if (pstSocketContext->shBytesReceived > 0)
{
if ((ptr = strstr(pstSocketContext->sRecvBuffer, “\n”) ) != NULL)
{
break;
}
}
schedule(*m_pstTask);
assert(dwGuard == 20060907);
}
*ptr = 0;
string sName(pstSocketContext->sRecvBuffer);
PopToNull(pstSocketContext, pstSocketContext->shBytesReceived, RECV_QUEUE);
return sName;
};
void EchoName(const string &sName)
{
volatile int dwGuard = 20060907;
SocketContext *pstSocketContext = &m_pstTask->stContext;
string sName2 =sName + “\n”;
while(1)
{
if (Push(pstSocketContext, sName2.c_str(), sName2.length(), SEND_QUEUE) == sName2.length())
{
break;
}
schedule(*m_pstTask);
assert(dwGuard == 20060907);
}
};
};

void do_process(void *ctx)
{
volatile int dwEvent = (int)ctx;
assert(gstTaskList.astTaskList[dwEvent].bHasDone == false);

DEBUGLOG(“do_process an event at %d\n”, dwEvent);

CUserInfo oUserInfo(&gstTaskList.astTaskList[dwEvent]);
string sName = oUserInfo.GetUserNickName();
oUserInfo.EchoName(sName);

DEBUGLOG(“got name:%s\n”, sName.c_str());
gstTaskList.astTaskList[dwEvent].bHasDone = true;
gdwTotalProcessCount++;
}
……

int process_task(TASK &stTask)
{
if (stTask.bHasDone) // 长连接test
{
if (pth_uctx_make(stTask.stUCTX, NULL, 1024*32, NULL,
do_process, (void *)stTask.stContext.iSocket,
gstTaskList.astTaskList[0].stUCTX) != 1)
{
fprintf(stderr,”pth_uctx_make error %s\n”, strerror(errno));
exit(0);
}
stTask.bHasDone = false;
}

DEBUGLOG(“process an event at %d %d\n”, stTask.stContext.iSocket, stTask.bIsPending);
if (pth_uctx_switch(gstTaskList.astTaskList[0].stUCTX, stTask.stUCTX) != 1)
{
fprintf(stderr,”pth_uctx_switch error %s\n”, strerror(errno));
exit(0);
}

DEBUGLOG(“process an event at %d %d end\n”, stTask.stContext.iSocket, stTask.bIsPending);
return 0;
}
….

 

过程是有趣的,结果更是令人振奋的!如果把这个研究结果和我们的平台

结合,那岂不是一个小型中间件?呵呵

为什么用单进程+多task架构?

1. 用户级的调用开销是非常小的,因此可以在一个进程内开启成百上千

个task来支持大并发

2. 多进程+进程内task调度可以在性能和可靠性上达到一个较好的平衡

(类似NPTL的N:M的线程模型)

3. 开发人员的知识可以平滑的从单进程开发模式逐渐向多线程上迁移,避免

学习代沟。

 

No Comments

No comments yet.

RSS feed for comments on this post.

Sorry, the comment form is closed at this time.

Powered by WordPress