发布时间:2019-07-05 10:30:37编辑:auto阅读(2005)
/// /// @file Worker.h /// @brief 用户接口类 /// @author guozhiming /// @date 2007-05-16 /// #ifndef __WORKER__ #define __WORKER__ #include "ThreadPool.h" /// @brief 抽象类 class G_Worker { public: /// @brief 构造函数 G_Worker(unsigned int num); /// @brief 析构函数 ~G_Worker(); /// @brief 服务器帮定端口 /// /// @param nPort 帮定端口 /// /// @return true表示成功 , false表示失败 bool Bind(unsigned int nPort); /// @brief 存虚函数子类继承并实现逻辑 /// /// @param pStr 客户端发送的字符串 virtual void recvMessage(void *pStr , int nSocket) = 0; /// @brief 发送数据到客户端 /// /// @param pStr 数据 /// @param nSocket 发送到客户端的套接字 // /// @return int sendMessage(int nSocket , const void *pStr); protected: private: G_ThreadPool *g_threadPool; }; #endif
#include "Worker.h" #include "Log.h" G_Worker::G_Worker(unsigned int num) { g_threadPool = new G_ThreadPool(num , this); //开num个线程 } G_Worker::~G_Worker() { if(g_threadPool) { delete g_threadPool; g_threadPool = NULL; } } bool G_Worker::Bind(unsigned int nPort) { return g_threadPool->Bind(nPort);//整个线程池,绑定一个端口。。 } int G_Worker::sendMessage(int nSocket , const void *pStr) { return g_threadPool->sendMessage(nSocket , pStr);//这是tcp还是udp发送呢。 }
/// /// @file ThreadPool.h /// @brief 线程池的实现 , 是个管理线程 , 负责调用每个线程之间的互相调用的关系 /// @author guozhiming /// @date 2007-05-16 /// #ifndef __G_THREADPOOL__ #define __G_THREADPOOL__ #include "ListenThread.h" #include "SendMessThread.h" #include "Queue.h" #include "RecvMessThread.h" #include "Worker.h" class G_ListenThread; class G_SendMessThread; class G_RecvMessThread; class G_Worker; class G_ThreadPool : public G_Thread { public: /// @brief 构造函数 G_ThreadPool(unsigned int num , G_Worker *g_work); /// @brief 析构函数 ~G_ThreadPool(); /// @brief 服务器帮定端口 /// /// @param nPort 帮定端口 /// /// @return true表示成功 , false表示失败 bool Bind(unsigned int nPort); /// @brief 主线程 void Run(); /// @brief 填加socket到队列中 /// /// @param nSocket 套接口 /// /// @return true 表示成功 , false 表示失败 bool pushSocket(unsigned int nSocket); /// @brief 从队列中取套接字 /// /// @param nSocket 取出的套接字存放在nSocket中 /// /// @return true 表示成功 , false 表示失败 bool popSocket(int &nSocket); /// @brief 从G_Data->G_RecvMessThread->G_ThreadPool->G_Worker 回掉 /// /// @param pStr 客户发的字符串 /// @param nSocket 接受客户连接的套接字 void recvMessage(void *pStr , int nSocket); /// @brief 发送数据 从testPool->G_Worker->G_ThreadPool->G_SendMessThread->G_Data /// /// @param pStr 数据 /// @param nSocket 套接口 /// @return // int sendMessage(int nSocket , const void *pStr); private: G_Worker *g_worker; /// @brief 监听线程 G_ListenThread *g_listenThread; /// @brief 发送消息线程 G_SendMessThread *g_sendMessThread; /// @brief 存放socket队列 G_Queue<int> g_sockQueue; /// @brief 存放空闲工作线程队列 G_Queue<G_RecvMessThread*> g_idleRecvMessThreadQueue; /// @brief 存放忙碌工作线程队列 G_Queue<G_RecvMessThread*> g_busyRecvMessThreadQueue; /// @brief 每个RecvMessThread线程中最大用户数 static const int maxCounter = 2000; /// @brief 如果线程不够用新增加的线程 static const int addTaskThread = 2; }; #endif
#include "ThreadPool.h" #include "Log.h" G_ThreadPool::G_ThreadPool(unsigned int num , G_Worker *g_work) : g_worker(g_work) { g_listenThread = new G_ListenThread(this); g_listenThread->Start(); ///启动监听线程 g_sendMessThread = new G_SendMessThread(); g_sendMessThread->Start(); ///发送消息线程 for(int i=0; i<num; i++) { /// 启动处理client发送信息线程 , 收消息线程 //明明只启动了接受线程!!!开了num个,为什么开这么多。网络的都交给epoll就好 G_RecvMessThread *g_recvMessThread = new G_RecvMessThread(this); g_idleRecvMessThreadQueue.push(g_recvMessThread);//空闲列,busy列呢,没看到,暂时没放入值 g_recvMessThread->Start(); } Start(); ///线程池自己启动 } G_ThreadPool::~G_ThreadPool() { if(g_listenThread) { delete g_listenThread; g_listenThread = NULL; } if(g_sendMessThread) { delete g_sendMessThread; g_sendMessThread = NULL; } g_sockQueue.clear(); g_idleRecvMessThreadQueue.clear(); g_busyRecvMessThreadQueue.clear(); } bool G_ThreadPool::Bind(unsigned int nPort) { return g_listenThread->Bind(nPort); } int G_ThreadPool::sendMessage(int nSocket , const void *pStr) { g_sendMessThread->sendMessage(nSocket , pStr); } void G_ThreadPool::Run() { int nSocket; G_RecvMessThread *g_recvMessThread; while(1) { pause(); ///等待ListenThread 发信号,这种事情,用epoll的epoll_wait就好了嘛 while(popSocket(nSocket)) ///必须把存放socket队列中的套接口全部取出 { g_sendMessThread->addEpoll(nSocket);//这里也是用epoll的啊 while(1)//各种循环 { ///从空闲队列中获得对首TaskThread if(g_idleRecvMessThreadQueue.getFront(g_recvMessThread)) { ///如果TaskThread线程中客户大于maxCounter , 从空闲队列中pop并放到忙碌队列中 if(g_recvMessThread->getCounter() >= maxCounter) //接受线程里面,不止一个mess?还要count? { if(g_idleRecvMessThreadQueue.pop(g_recvMessThread)) { g_busyRecvMessThreadQueue.push(g_recvMessThread); } else { ///表示空闲队列中再没有TaskThread可以用 , 创建addTaskThread个TaskThread线程 , 并且把busy队列中的TaskThread放到idle队列中这样可以防止busy队列中的用户数减少但是他还在busy队列中 for(int i=0; i<addTaskThread; i++) { G_RecvMessThread *g_recvMessThread = new G_RecvMessThread(this); g_idleRecvMessThreadQueue.push(g_recvMessThread); g_recvMessThread->Start(); } while(g_busyRecvMessThreadQueue.pop(g_recvMessThread))//感觉这么麻烦。。。 { g_idleRecvMessThreadQueue.push(g_recvMessThread); } } } else { /// 填加到TaskThread 线程中 g_recvMessThread->addSocket(nSocket); g_recvMessThread->continues(); /// 唤醒TaskThread 线程 break; } } else { /// 空闲队列中没有任何线程 , 应该没有这种情况 debug_output("idleRecvMessThreadQueue is not g_recvMessThread\n"); } } } } } bool G_ThreadPool::pushSocket(unsigned int nSocket) { return g_sockQueue.push(nSocket); } bool G_ThreadPool::popSocket(int &nSocket) { return g_sockQueue.pop(nSocket); } void G_ThreadPool::recvMessage(void *pStr , int nSocket) { g_worker->recvMessage(pStr , nSocket); }
/// /// @file TaskThread.h /// @brief 任务类 , 接受client发的消息进行处理 /// @author guozhiming /// @date 2007-05-17 /// #ifndef __TASKTHREAD__ #define __TASKTHREAD__ #include "def.h" #include "Thread.h" #include "ThreadPool.h" #include "Queue.h" #include "Data.h" class G_ThreadPool; class G_Data; class G_RecvMessThread : public G_Thread { public: /// @brief 构造函数 G_RecvMessThread(G_ThreadPool *pool); /// @brief 析构函数 ~G_RecvMessThread(); /// @brief 主线程运行 void Run(); /// @brief 填加套接字 /// /// @param nSocket 套接字 void addSocket(int nSocket); /// @brief 获得连接的客户端数目 /// /// @return 数目 unsigned int getCounter(); /// @brief 往队列中存放数据 ,,哪个队列?下面定义了一个queue /// /// @param pStr 数据 /// /// @return true 成功 , false 失败 bool pushData(std::string pStr); private: /// @brief 设置套接口非阻塞模式 /// /// @param sockfd 套接口 /// /// @return true 成功 , false 失败 bool setNonBlock(int sockfd); /// @brief epoll_create 返回文件描述符 int epfd; struct epoll_event events[100]; //才100个。。封装了epoll啊。在recv里封了,难道send里,也封了一个 /// @brief 记录接受客户端数目 unsigned int counter; /// @brief 线程池对象 G_ThreadPool *g_threadPool; /// @brief 存放数据的队列 G_Queue<std::string> g_dataBufferQueue; G_Data *g_data; }; #endif
#include "RecvMessThread.h" #include "Log.h" G_RecvMessThread::G_RecvMessThread(G_ThreadPool *pool) : g_threadPool(pool) { counter = 0; epfd = epoll_create(256); //最多同时监视256个 g_data = new G_Data(this); //这个数据结构要看看,有啥稀奇 } G_RecvMessThread::~G_RecvMessThread() { close(epfd); } unsigned int G_RecvMessThread::getCounter() { return counter; } bool G_RecvMessThread::setNonBlock(int sockfd) { int opts = fcntl(sockfd , F_GETFL); if(-1 == opts) { debug_output("%s\n" , "fcntl F_GETFL is faild"); return false; } opts = opts | O_NONBLOCK; if(fcntl(sockfd , F_SETFL , opts) < 0) { debug_output("%s\n" , "fcntl F_SETFL is faild"); return false; } return true; } void G_RecvMessThread::addSocket(int nSocket) { struct epoll_event ev; bzero(&ev , sizeof(ev)); setNonBlock(nSocket); ev.data.fd = nSocket; ev.events = EPOLLIN | EPOLLET; epoll_ctl(epfd , EPOLL_CTL_ADD , nSocket , &ev); counter++; } bool G_RecvMessThread::pushData(std::string pStr) { return g_dataBufferQueue.push(pStr); } void G_RecvMessThread::Run() { pause(); /// 暂停线程 //都用这招。。。 int nfds , sock; struct epoll_event ev; bool nRet; char line[1024]; //一次最多1024个。。感觉这个模型蛮奇怪。。 while(1) { nfds = epoll_wait(epfd,events,100,50); for(int i=0; i<nfds; i++) { if(events[i].events&EPOLLIN) { if((sock = events[i].data.fd) < 0) continue; if(!(nRet=g_data->recvData(sock)))//这是最底层的收数据的啊 { debug_output("client is quit\n"); ev.data.fd= sock; epoll_ctl(epfd , EPOLL_CTL_DEL , sock , &ev);//收到了直接关掉了 close(sock); events[i].data.fd = -1; counter --; } else { std::string pBuffer; while(g_dataBufferQueue.size()) { g_dataBufferQueue.pop(pBuffer); g_threadPool->recvMessage((void*)pBuffer.c_str() , sock); //这又是一个收数据的。 } } usleep(100);//还要休眠。。这么坑爹。。。 } } } }
上一篇: Spokesmen, How to ma
下一篇: 3D moblin UI on IDF
48801
47840
38612
35799
30223
26970
26007
20842
20626
19003
411°
489°
519°
527°
514°
496°
566°
629°
746°
786°