一、概念:

Reactor与Proactor两种模式的区别。这里我们只关注read操作,因为write操作也是差不多的。下面是Reactor的做法:

  1. 某个事件处理器宣称它对某个socket上的读事件很感兴趣;

  2. 事件分离者等着这个事件的发生;

  3. 当事件发生了,事件分离器被唤醒,这负责通知先前那个事件处理器;

  4. 事件处理器收到消息,于是去那个socket上读数据了. 如果需要,它再次宣称对这个socket上的读事件感兴趣,一直重复上面的步骤;

下面再来看看真正意义的异步模式Proactor是如何做的:

  1. 事件处理器直接投递发一个读操作(当然,操作系统必须支持这个异步操作)。这个时候,事件处理器根本不关心读事件,它只管发这么个请求,它魂牵梦萦的是这个读操作的完成事件。这个事件处理器很拽,发个命令就不管具体的事情了,只等着别人(系统)帮他搞定的时候给他回个话。

  2. 事件分离器等着这个读事件的完成(比较下与Reactor的不同);

  3. 当事件分离器默默等待完成事情到来的同时,操作系统已经在一边开始干活了,它从目标读取数据,放入用户提供的缓存区中,最后通知事件分离器,这个事情我搞完了;

  4. 事件分离器通知之前的事件处理器: 你吩咐的事情搞定了;

  5. 事件处理器这时会发现想要读的数据已经乖乖地放在他提供的缓存区中,想怎么处理都行了。如果有需要,事件处理器还像之前一样发起另外一个读操作,和上面的几个步骤一样。

二、代码示例:

ACE_Proactor::run_event_loop();  循环启动
ACE_Proactor::end_event_loop();  循环停止

-----------------------------------

Reactor:

-----------------------------------

-----------------------------------

Proactor:

 #include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h" class HA_Proactive_Service : public ACE_Service_Handler
{
public:
HA_Proactive_Service()
{
ACE_OS::printf("Service_Handler constructed for accepter \n");
}
~HA_Proactive_Service ()
{
if (this->handle () != ACE_INVALID_HANDLE)
{
ACE_OS::closesocket (this->handle ());
}
} virtual void open (ACE_HANDLE h, ACE_Message_Block&)
{
//在OPEN函数中完成读写操作
this->handle (h);
if (this->reader_.open (*this) != ||
this->writer_.open (*this) != )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("HA_Proactive_Service open"))); return;
}
ACE_OS::printf("ready!\n"); //异步发送
send_to_remote();
//异步读取
reveive_from_remote(); // mb is now controlled by Proactor framework.
return;
} //异步读完成后会调用此函数
virtual void handle_read_stream
(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == )
{
mb.release (); return;
}
//else
//输出读取内容
ACE_OS::printf("received:%s\n",mb.rd_ptr());
mb.release();
//继续下一次异步读取
reveive_from_remote(); return;
}
//异步写完成后会调用此函数
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
result.message_block ().release();
ACE_OS::sleep();
//上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
//handle_write_stream,所以会一直发送
send_to_remote();
return;
}
//remote
void reveive_from_remote(void)
{
ACE_Message_Block *mb;
ACE_NEW_NORETURN (mb, ACE_Message_Block ());
if (this->reader_.read (*mb, mb->space ()) != )
{
ACE_OS::printf("Begin read fail\n"); return;
}
}
//把当前时间发送到远端
void send_to_remote(void)
{
std::string book = "S: ";
time_t now = ACE_OS::gettimeofday().sec();
book = book+ ctime(&now);
ACE_Message_Block *mb = new ACE_Message_Block();
//获取当前时间的字符串格式
mb->copy(book.c_str() );
//send message to accepter
if (this->writer_.write(*mb,mb->length()) !=)
{
ACE_OS::printf("Begin write fail\n"); return;
}
else
{
ACE_OS::printf("sended %s\n",mb->rd_ptr());
}
} // Listing 3
private:
ACE_Asynch_Read_Stream reader_;
ACE_Asynch_Write_Stream writer_;
}; int main(int argc, char *argv[])
{
int port=;
ACE_Asynch_Acceptor<HA_Proactive_Service> acceptor; if (acceptor.open (ACE_INET_Addr (port)) == -)
return -; while(true)
ACE_Proactor::instance ()->handle_events (); return ;
}

Acceptor.cpp

 #include "ace/Proactor.h"
#include "ace/Asynch_Connector.h" class HA_Proactive_Service : public ACE_Service_Handler
{
public:
HA_Proactive_Service()
{
ACE_OS::printf("Service_Handler constructed for connector \n");
}
~HA_Proactive_Service ()
{
if (this->handle () != ACE_INVALID_HANDLE)
{
ACE_OS::closesocket (this->handle ());
}
} virtual void open (ACE_HANDLE h, ACE_Message_Block&)
{
//在OPEN函数中完成读写操作
this->handle (h); if (this->reader_.open (*this) != ||
this->writer_.open (*this) != )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("HA_Proactive_Service open"))); return;
} ACE_OS::printf("connceted!\n");
//异步发送
send_to_remote();
//异步读取
reveive_from_remote(); return;
} //异步读完成后会调用此函数
virtual void handle_read_stream
(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == )
{
mb.release (); return;
}
//else
//输出读取内容
ACE_OS::printf("received:%s\n",mb.rd_ptr());
mb.release();
//继续下一次异步读取
reveive_from_remote(); return;
}
//异步写完成后会调用此函数
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
result.message_block ().release();
ACE_OS::sleep();
//上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
//handle_write_stream,所以会一直发送
send_to_remote();
return;
}
//remote
void reveive_from_remote(void)
{
ACE_Message_Block *mb;
ACE_NEW_NORETURN (mb, ACE_Message_Block ());
if (this->reader_.read (*mb, mb->space ()) != )
{
ACE_OS::printf("Begin read fail\n"); return;
}
}
//把当前时间发送到远端
void send_to_remote(void)
{
std::string book = "C: ";
time_t now = ACE_OS::gettimeofday().sec();
book = book+ ctime(&now);
ACE_Message_Block *mb = new ACE_Message_Block();
//获取当前时间的字符串格式
mb->copy(book.c_str() );
//send message to accepter
if (this->writer_.write(*mb,mb->length()) !=)
{
ACE_OS::printf("Begin write fail\n"); return;
}
else
{
ACE_OS::printf("sended %s\n",mb->rd_ptr());
}
} // Listing 3
private:
ACE_Asynch_Read_Stream reader_;
ACE_Asynch_Write_Stream writer_;
}; int main(int argc, char *argv[])
{ ACE_INET_Addr addr(,"127.0.0.1"); ACE_Asynch_Connector<HA_Proactive_Service> connector; connector.open();
if (connector.connect(addr) == -)
return -; while(true)
ACE_Proactor::instance ()->handle_events (); return ;
}

Connector.cpp

ACE_Proactor::run_event_loop(); <==>
while(true)
    ACE_Proactor::instance ()->handle_events ();

//增加线程池

 #include "ace/Proactor.h"
#include "ace/Asynch_Acceptor.h"
#include "ace/Task_T.h"
#include "ace/Thread_Semaphore.h" class Receive : public ACE_Service_Handler
{
public:
Receive()
{
ACE_OS::printf("Service_Handler constructed for accepter \n");
}
~Receive ()
{
if (this->handle () != ACE_INVALID_HANDLE)
{
ACE_OS::closesocket (this->handle ());
}
} virtual void open (ACE_HANDLE h, ACE_Message_Block&)
{
//在OPEN函数中完成读写操作
this->handle (h);
if (this->reader_.open (*this) != ||
this->writer_.open (*this) != )
{
ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("Receive open"))); return;
}
ACE_OS::printf("ready!\n"); //异步发送
//send_to_remote();
//异步读取
reveive_from_remote(); // mb is now controlled by Proactor framework.
return;
} //异步读完成后会调用此函数
virtual void handle_read_stream
(const ACE_Asynch_Read_Stream::Result &result)
{
ACE_Message_Block &mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == )
{
mb.release (); return;
} //输出读取内容
//ACE_OS::printf("received:%s\n",mb.rd_ptr());
ACE_DEBUG((LM_DEBUG,"(%t)received:%s\n",mb.rd_ptr()));
mb.release(); //继续下一次异步读取
reveive_from_remote(); return;
}
//异步写完成后会调用此函数
virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)
{
result.message_block ().release();
ACE_OS::sleep();
//上次发送完毕之后再接着发送一次,这次发送完成之后又会调用
//handle_write_stream,所以会一直发送
send_to_remote();
return;
}
//remote
void reveive_from_remote(void)
{
ACE_Message_Block *mb;
ACE_NEW_NORETURN (mb, ACE_Message_Block ());
if (this->reader_.read (*mb, mb->space ()) != )
{
ACE_OS::printf("Begin read fail\n"); return;
}
}
//把当前时间发送到远端
void send_to_remote(void)
{
std::string book = "S: ";
time_t now = ACE_OS::gettimeofday().sec();
book = book+ ctime(&now);
ACE_Message_Block *mb = new ACE_Message_Block();
//获取当前时间的字符串格式
mb->copy(book.c_str() );
//send message to accepter
if (this->writer_.write(*mb,mb->length()) !=)
{
ACE_OS::printf("Begin write fail\n"); return;
}
else
{
ACE_OS::printf("sended %s\n",mb->rd_ptr());
}
} // Listing 3
private:
ACE_Asynch_Read_Stream reader_;
ACE_Asynch_Write_Stream writer_;
}; class Accepte : public ACE_Asynch_Acceptor<Receive>
{
public:
virtual Receive* make_handler(void)
{
return new Receive();
} }; class Proactor_Task : public ACE_Task<ACE_MT_SYNCH>
{
public: int star(int nMax)
{
create_proactor();
this->activate (THR_NEW_LWP, nMax);
for (;nMax>;nMax--)
{
sem_.acquire();
}
return ;
}
int stop()
{
ACE_Proactor::end_event_loop();
this->wait();
return ;
} virtual int svc (void)
{
ACE_DEBUG((LM_INFO,ACE_TEXT("svc method is invoked!\n")));
sem_.release(); ACE_Proactor::run_event_loop(); return ;
} int create_proactor()
{
ACE_Proactor::instance (); return ;
} int release_proactor()
{
ACE_Proactor::close_singleton ();
return ;
} ACE_Thread_Semaphore sem_;
}; int ACE_TMAIN(int ,char*[])
{
Proactor_Task task;
task.star(); Accepte accepte;
accepte.open(ACE_INET_Addr (), , ,ACE_DEFAULT_BACKLOG,,ACE_Proactor::instance()); //主函数退出控制
{
int nExit=;
while (nExit==)
scanf("%d",&nExit);
} return ;
} /*
int main(int argc, char *argv[])
{
int port=3000;
ACE_Asynch_Acceptor<Receive> acceptor; if (acceptor.open (ACE_INET_Addr (port)) == -1)
return -1; ACE_Proactor::run_event_loop(); return 0;
}
*/

Proactor_Task.cpp

-----------------------------------

GOOD LUCK!

ACE_linux:Reactor与Proactor两种模式的区别的更多相关文章

  1. 【转】Reactor与Proactor两种模式区别

    转自:http://www.cnblogs.com/cbscan/articles/2107494.html 两种IO多路复用方案:Reactor and Proactor 一般情况下,I/O 复用机 ...

  2. vue-router的两种模式的区别

    众所周知,vue-router有两种模式,hash模式和history模式,这里来谈谈两者的区别. ### hash模式 hash模式背后的原理是`onhashchange`事件,可以在`window ...

  3. 在IDEA中将项目部署到Tomcat的方法及两种模式的区别

    转自:https://www.jianshu.com/p/fb0ed26c35d5 1.添加tomcat服务器 点右上角编辑配置   编辑配置 点击左上角+选择tomcat服务器   添加tomcat ...

  4. MODBUS ASCII和RTU两种模式的区别、优缺点

    转自:http://blog.sina.com.cn/s/blog_89f286ad0102uzju.html 下表是MODBUS ASCII协议和RTU协议的比较: 协议 开始标记 结束标记 校验 ...

  5. vue-router 的两种模式的区别

    Vue Router 是Vue官方的路由管理器.它和 Vue.js 的核心深度集成,让构建单页面应用变得易如反掌.vue-router 默认 hash 模式,还有一种是history模式. hash模 ...

  6. vue-router两种模式的区别

    参考文献:https://blog.csdn.net/lla520/article/details/77894985/ https://segmentfault.com/a/1190000015123 ...

  7. Unity下Iso和Persp两种模式的区别

    Iso模式 平行视野.在Iso模式下,不论物体距离摄像头远近都给人的感觉是一样大的. Persp模式 透视视野.在persp模式下,物体在scene界面上所呈现的画面是给人一种距离摄像头近的物体显示的 ...

  8. Reactor事件驱动的两种设计实现:面向对象 VS 函数式编程

    Reactor事件驱动的两种设计实现:面向对象 VS 函数式编程 这里的函数式编程的设计以muduo为例进行对比说明: Reactor实现架构对比 面向对象的设计类图如下: 函数式编程以muduo为例 ...

  9. Windows2003 IIS6.0支持32位和64位两种模式的设置方法

    IIS 6.0 可支持 32 位和 64 位两种模式.但是,IIS 6.0 不支持在 64 位版本的 Windows 上同时运行这两种模式.ASP.NET 1.1 只在 32 位模式下运行.而 ASP ...

随机推荐

  1. JavaScript中的面向对象

    //简单的面向对象 function 构造函数(){ this.属性; } //写在构造函数里面的属性一般为公共属性,或者通过传值进行改变. 构造函数.原型.方法 = function(){}; // ...

  2. XCode属性面板使用说明

    Xcode 中Interface Builder 工具 是一个功能强大的“所见即所得”开发工具.本文主要介绍属性面板 和  对象库面板 对象库面板: 提供了所有Cocoa Touch 库给我们定义好的 ...

  3. Delphi能通过SSH登录Linux,连接MYSQL取数么?像Navicat一样

    百度随时就能搜,你就懒得搜下.http://tieba.baidu.com/p/671327617 Ssh tunnel通常能实现3种功能1) 加密网络传输2) 绕过防火墙3) 让位于广域网的机器连接 ...

  4. 使用Vitamio打造自己的Android万能播放器(5)——在线播放(播放优酷视频)

    前言 为了保证每周一篇的进度,又由于Vitamio新版本没有发布, 决定推迟本地播放的一些功能(截图.视频时间.尺寸等),跳过直接写在线播放部分的章节.从Vitamio的介绍可以看得出,其支持http ...

  5. 提示text还能输入多少字节

    1.添加jQuery自定义扩展 $(function($){ // tipWrap: 提示消息的容器 // maxNumber: 最大输入字符 $.fn.artTxtCount = function( ...

  6. linux下安装tomcat,并设置自动启动

    在linux系统下,设置某个服务自启动的话,需要在/etc/rcX.d下挂载,还要在/etc/init.d/下写启动脚本的 在/etc/init.d/下新建一个文件tomcat(需要在root权限下操 ...

  7. 安卓高级3 RecyclerView 和cardView使用案例

    cardView: 添加依赖:在Studio搜索cardview即可 在V7包中 或者直接在gradle中添加 compile 'com.android.support:cardview-v7:24. ...

  8. HTML学习笔记03-HTML基础

    <!DOCTYPE HTML> <html> <head> <title> </title> </head> <body& ...

  9. ssh登录慢解决办法

    这两天ssh登录局域网的一台服务器非常慢,严重影响工作效率,怎么办?查了一下网上的解决办法,总结一下: 使用命令ssh -v xxx@x.x.x.x 可以看到debug信息,找到问题出在哪: debu ...

  10. learning ddr state diagram