某源码thread,socket研究3

发布时间:2019-07-05 10:30:37编辑:auto阅读(2005)

    ///
    /// @file Worker.h
    /// @brief 用户接口类
    /// @author guozhiming
    /// @date 2007-05-16
    ///
    #ifndef __WORKER__
    #define __WORKER__
    #include "ThreadPool.h"
    /// @brief 抽象类
    class G_Worker
    {
        public:
            /// @brief 构造函数
            G_Worker(unsigned int num);
            /// @brief 析构函数
            ~G_Worker();
            /// @brief 服务器帮定端口
            ///
            /// @param nPort 帮定端口
            ///
            /// @return true表示成功 , false表示失败
            bool Bind(unsigned int nPort);
            /// @brief 存虚函数子类继承并实现逻辑
            ///
            /// @param pStr 客户端发送的字符串
            virtual void recvMessage(void *pStr , int nSocket) = 0;
            /// @brief 发送数据到客户端
            ///
            /// @param pStr 数据
            /// @param nSocket 发送到客户端的套接字
            //
            /// @return
            int sendMessage(int nSocket , const void *pStr);
        protected:
                                                
        private:
            G_ThreadPool *g_threadPool;
    };
    #endif



    #include "Worker.h"
    #include "Log.h"
    G_Worker::G_Worker(unsigned int num)
    {
        g_threadPool = new G_ThreadPool(num , this);  //开num个线程
    }
    G_Worker::~G_Worker()
    {
        if(g_threadPool)
        {
            delete g_threadPool;
            g_threadPool = NULL;
        }
    }
    bool G_Worker::Bind(unsigned int nPort)
    {
        return g_threadPool->Bind(nPort);//整个线程池,绑定一个端口。。
    }
    int G_Worker::sendMessage(int nSocket , const void *pStr)
    {
        return g_threadPool->sendMessage(nSocket , pStr);//这是tcp还是udp发送呢。
    }



    ///
    /// @file ThreadPool.h
    /// @brief 线程池的实现 , 是个管理线程 , 负责调用每个线程之间的互相调用的关系
    /// @author guozhiming
    /// @date 2007-05-16
    ///
    #ifndef __G_THREADPOOL__
    #define __G_THREADPOOL__
    #include "ListenThread.h"
    #include "SendMessThread.h"
    #include "Queue.h"
    #include "RecvMessThread.h"
    #include "Worker.h"
    class G_ListenThread;
    class G_SendMessThread;
    class G_RecvMessThread;
    class G_Worker;
    class G_ThreadPool : public G_Thread
    {
        public:
            /// @brief 构造函数
            G_ThreadPool(unsigned int num , G_Worker *g_work);
            /// @brief 析构函数
            ~G_ThreadPool();
            /// @brief 服务器帮定端口
            ///
            /// @param nPort 帮定端口
            ///
            /// @return true表示成功 , false表示失败
            bool Bind(unsigned int nPort);
            /// @brief 主线程
            void Run();
            /// @brief 填加socket到队列中
            ///
            /// @param nSocket 套接口
            ///
            /// @return true 表示成功 , false 表示失败
            bool pushSocket(unsigned int nSocket);
            /// @brief 从队列中取套接字
            ///
            /// @param nSocket 取出的套接字存放在nSocket中
            ///
            /// @return true 表示成功 , false 表示失败
            bool popSocket(int &nSocket);
            /// @brief 从G_Data->G_RecvMessThread->G_ThreadPool->G_Worker 回掉
            ///
            /// @param pStr 客户发的字符串
            /// @param nSocket 接受客户连接的套接字
            void recvMessage(void *pStr , int nSocket);
            /// @brief 发送数据 从testPool->G_Worker->G_ThreadPool->G_SendMessThread->G_Data
            ///
            /// @param pStr 数据
            /// @param nSocket 套接口
            /// @return
            //
            int sendMessage(int nSocket , const void *pStr);
        private:
            G_Worker *g_worker;
            /// @brief 监听线程
            G_ListenThread *g_listenThread;
            /// @brief 发送消息线程
            G_SendMessThread *g_sendMessThread;
            /// @brief 存放socket队列
            G_Queue<int> g_sockQueue;
            /// @brief 存放空闲工作线程队列
            G_Queue<G_RecvMessThread*> g_idleRecvMessThreadQueue;
            /// @brief 存放忙碌工作线程队列
            G_Queue<G_RecvMessThread*> g_busyRecvMessThreadQueue;
            /// @brief 每个RecvMessThread线程中最大用户数
            static const int maxCounter = 2000;
            /// @brief 如果线程不够用新增加的线程
            static const int addTaskThread = 2;
    };
    #endif



    #include "ThreadPool.h"
    #include "Log.h"
    G_ThreadPool::G_ThreadPool(unsigned int num , G_Worker *g_work) : g_worker(g_work)
    {
        g_listenThread = new G_ListenThread(this);
        g_listenThread->Start();  ///启动监听线程
        g_sendMessThread = new G_SendMessThread();
        g_sendMessThread->Start();   ///发送消息线程
        for(int i=0; i<num; i++)
        {
            /// 启动处理client发送信息线程 , 收消息线程
            //明明只启动了接受线程!!!开了num个,为什么开这么多。网络的都交给epoll就好
            G_RecvMessThread *g_recvMessThread = new G_RecvMessThread(this);
            g_idleRecvMessThreadQueue.push(g_recvMessThread);//空闲列,busy列呢,没看到,暂时没放入值
            g_recvMessThread->Start();       
        }
        Start();   ///线程池自己启动
    }
    G_ThreadPool::~G_ThreadPool()
    {
        if(g_listenThread)
        {
            delete g_listenThread;
            g_listenThread = NULL;
        }
        if(g_sendMessThread)
        {
            delete g_sendMessThread;
            g_sendMessThread = NULL;
        }
                        
        g_sockQueue.clear();
        g_idleRecvMessThreadQueue.clear();
        g_busyRecvMessThreadQueue.clear();
    }
    bool G_ThreadPool::Bind(unsigned int nPort)
    {
        return g_listenThread->Bind(nPort);
    }
    int G_ThreadPool::sendMessage(int nSocket , const void *pStr)
    {
        g_sendMessThread->sendMessage(nSocket , pStr);
    }
    void G_ThreadPool::Run()
    {
        int nSocket; 
        G_RecvMessThread *g_recvMessThread;
        while(1)
        {
            pause();   ///等待ListenThread 发信号,这种事情,用epoll的epoll_wait就好了嘛
            while(popSocket(nSocket))     ///必须把存放socket队列中的套接口全部取出
            {
                g_sendMessThread->addEpoll(nSocket);//这里也是用epoll的啊
                while(1)//各种循环
                {
                    ///从空闲队列中获得对首TaskThread
                    if(g_idleRecvMessThreadQueue.getFront(g_recvMessThread))
                    {
                        ///如果TaskThread线程中客户大于maxCounter , 从空闲队列中pop并放到忙碌队列中
                        if(g_recvMessThread->getCounter() >= maxCounter)  //接受线程里面,不止一个mess?还要count?
                        {
                            if(g_idleRecvMessThreadQueue.pop(g_recvMessThread))
                            {
                                g_busyRecvMessThreadQueue.push(g_recvMessThread);
                            }
                            else
                            {
                                ///表示空闲队列中再没有TaskThread可以用 , 创建addTaskThread个TaskThread线程 , 并且把busy队列中的TaskThread放到idle队列中这样可以防止busy队列中的用户数减少但是他还在busy队列中
                                for(int i=0; i<addTaskThread; i++)
                                {
                                    G_RecvMessThread *g_recvMessThread = new G_RecvMessThread(this);
                                    g_idleRecvMessThreadQueue.push(g_recvMessThread);
                                    g_recvMessThread->Start();
                                }
                                while(g_busyRecvMessThreadQueue.pop(g_recvMessThread))//感觉这么麻烦。。。
                                {
                                    g_idleRecvMessThreadQueue.push(g_recvMessThread);
                                }
                            }
                        }
                        else
                        {
                            /// 填加到TaskThread 线程中
                            g_recvMessThread->addSocket(nSocket);
                            g_recvMessThread->continues();   /// 唤醒TaskThread 线程
                            break;
                        }
                    }
                    else
                    {
                        /// 空闲队列中没有任何线程 , 应该没有这种情况
                        debug_output("idleRecvMessThreadQueue is not g_recvMessThread\n");
                    }
                }
            }
        }
    }
    bool G_ThreadPool::pushSocket(unsigned int nSocket)
    {
        return g_sockQueue.push(nSocket);
    }
    bool G_ThreadPool::popSocket(int &nSocket)
    {
        return g_sockQueue.pop(nSocket);
    }
    void G_ThreadPool::recvMessage(void *pStr , int nSocket)
    {
        g_worker->recvMessage(pStr , nSocket);
    }



    ///
    /// @file TaskThread.h
    /// @brief 任务类 , 接受client发的消息进行处理
    /// @author guozhiming
    /// @date 2007-05-17
    ///
    #ifndef __TASKTHREAD__
    #define __TASKTHREAD__
    #include "def.h"
    #include "Thread.h"
    #include "ThreadPool.h"
    #include "Queue.h"
    #include "Data.h"
    class G_ThreadPool;
    class G_Data;
    class G_RecvMessThread : public G_Thread
    {
        public:
            /// @brief 构造函数
            G_RecvMessThread(G_ThreadPool *pool);
            /// @brief 析构函数
            ~G_RecvMessThread();
            /// @brief 主线程运行
            void Run();
            /// @brief 填加套接字
            ///
            /// @param nSocket 套接字
            void addSocket(int nSocket);
            /// @brief 获得连接的客户端数目
            ///
            /// @return 数目
            unsigned int getCounter();
            /// @brief      往队列中存放数据 ,,哪个队列?下面定义了一个queue
            ///
            /// @param pStr  数据
            ///
            /// @return true 成功 , false 失败
            bool pushData(std::string pStr);
        private:
                    
            /// @brief 设置套接口非阻塞模式
            ///
            /// @param sockfd 套接口
            ///
            /// @return true 成功 , false 失败
            bool setNonBlock(int sockfd);
            /// @brief epoll_create 返回文件描述符
            int epfd;
            struct epoll_event events[100];   //才100个。。封装了epoll啊。在recv里封了,难道send里,也封了一个
            /// @brief 记录接受客户端数目
            unsigned int counter;
            /// @brief 线程池对象
            G_ThreadPool *g_threadPool;
            /// @brief 存放数据的队列
            G_Queue<std::string> g_dataBufferQueue;
            G_Data *g_data;
    };
    #endif



    #include "RecvMessThread.h"
    #include "Log.h"
    G_RecvMessThread::G_RecvMessThread(G_ThreadPool *pool) : g_threadPool(pool)
    {
        counter = 0;
        epfd = epoll_create(256); //最多同时监视256个
        g_data = new G_Data(this); //这个数据结构要看看,有啥稀奇
    }
    G_RecvMessThread::~G_RecvMessThread()
    {
        close(epfd);
    }
    unsigned int G_RecvMessThread::getCounter()
    {
        return counter;
    }
    bool G_RecvMessThread::setNonBlock(int sockfd)
    {
        int opts = fcntl(sockfd , F_GETFL);
        if(-1 == opts)
        {
            debug_output("%s\n" , "fcntl F_GETFL is faild");
            return false;
        }
        opts = opts | O_NONBLOCK;
        if(fcntl(sockfd , F_SETFL , opts) < 0)
        {
            debug_output("%s\n" , "fcntl F_SETFL is faild");
            return false;
        }
        return true;
    }
    void G_RecvMessThread::addSocket(int nSocket)
    {
        struct epoll_event ev;
        bzero(&ev , sizeof(ev));
        setNonBlock(nSocket);
        ev.data.fd = nSocket;
        ev.events = EPOLLIN | EPOLLET;
        epoll_ctl(epfd , EPOLL_CTL_ADD , nSocket , &ev);
        counter++;
    }
    bool G_RecvMessThread::pushData(std::string pStr)
    {
        return g_dataBufferQueue.push(pStr);
    }
    void G_RecvMessThread::Run()
    {
        pause();    /// 暂停线程  //都用这招。。。
        int nfds , sock;
        struct epoll_event ev;
        bool nRet;
        char line[1024]; //一次最多1024个。。感觉这个模型蛮奇怪。。
        while(1)
        {
            nfds = epoll_wait(epfd,events,100,50);
            for(int i=0; i<nfds; i++)
            {
                if(events[i].events&EPOLLIN)
                {
                    if((sock = events[i].data.fd) < 0)
                        continue;
                    if(!(nRet=g_data->recvData(sock)))//这是最底层的收数据的啊
                    {
                        debug_output("client is quit\n");
                        ev.data.fd= sock;
                        epoll_ctl(epfd , EPOLL_CTL_DEL , sock , &ev);//收到了直接关掉了
                        close(sock);
                        events[i].data.fd = -1;
                        counter --;
                    }
                    else
                    {
                        std::string pBuffer;
                        while(g_dataBufferQueue.size())
                        {
                            g_dataBufferQueue.pop(pBuffer);
                            g_threadPool->recvMessage((void*)pBuffer.c_str() , sock); //这又是一个收数据的。
                        }
                    }
                    usleep(100);//还要休眠。。这么坑爹。。。
                }
            }
        }
    }


关键字

上一篇: Spokesmen, How to ma

下一篇: 3D moblin UI on IDF