ACE之Proactor模式使用实例

【ACE之Proactor模式使用实例】关山初度尘未洗,策马扬鞭再奋蹄!这篇文章主要讲述ACE之Proactor模式使用实例相关的知识,希望能为你提供帮助。

// ACE_Proactor_Client.cpp : 定义控制台应用程序的入口点。
//

#include "stdafx.h"

#include "ace/Message_Queue.h"
#include "ace/Asynch_IO.h"
#include "ace/OS.h"
#include "ace/Proactor.h"
#include "ace/Asynch_Connector.h"
#include "ace/SOCK_SEQPACK_Association.h"

#pragma comment(lib,"ACEd.lib")

class Service_Handler : public ACE_Service_Handler

public:
Service_Handler()

//ACE_OS::printf("Service_Handler constructed for connector \\n");

~Service_Handler ()

if (this-> handle () != ACE_INVALID_HANDLE)
ACE_OS::closesocket (this-> handle ());
//ACE_OS::printf("one Service_Handler for connecter destructed");


void post_send(void)

do

time_t now = ACE_OS::gettimeofday().sec();
ACE_Message_Block *mb = new ACE_Message_Block(128);

char buff[64];
ACE_INET_Addr addr;
ACE_SOCK_SEQPACK_Association ass=ACE_SOCK_SEQPACK_Association(this-> handle());
size_t addr_size=sizeof ACE_INET_Addr;
ass.get_local_addrs(& addr,addr_size);
//ACE_OS::printf("fd:%d ip:%d port:%d\\n",(int)this-> handle(), addr.get_ip_address(), addr.get_port_number());
sprintf(buff,"%d",addr.get_port_number());
mb-> copy(buff/*ctime(now)*/);

if (this-> writer_.write(*mb,mb-> length()) !=0)

ACE_OS::printf("Begin write fail in open\\n");
delete this;
break;

else

ACE_OS::printf("sended:%s\\n",mb-> rd_ptr());

while (0);


void post_recv(void)

do

ACE_Message_Block *mb = new ACE_Message_Block(buffer,128);
if (this-> reader_.read (*mb, mb-> space ()) != 0)

ACE_OS::printf("Begin read fail\\n");
delete this;
break;

while (0);


virtual void open (ACE_HANDLE h, ACE_Message_Block& )

do

this-> handle (h);
if (this-> writer_.open (*this) != 0 )

ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\\n"),ACE_TEXT ("Service_Handler open")));
delete this;
break;

post_send();

if (this-> reader_.open (*this) != 0 )

ACE_ERROR ((LM_ERROR, ACE_TEXT ("%p\\n"),ACE_TEXT ("Service_Handler open")));
delete this;
break;

post_recv();

while (0);


virtual void handle_write_stream(const ACE_Asynch_Write_Stream::Result & result)

result.message_block ().release();
//ACE_OS::sleep(1);
post_send();


virtual void handle_read_stream(const ACE_Asynch_Read_Stream::Result & result)

do

ACE_Message_Block & mb = result.message_block ();
if (!result.success () || result.bytes_transferred () == 0)

mb.release ();
delete this;
break;

ACE_OS::printf("received:%s\\n",mb.rd_ptr());
mb.release();

post_recv();
while (0);

private:
ACE_Asynch_Write_Stream writer_;
ACE_Asynch_Read_Stream reader_;
char buffer[128];
;

#include < ace/OS.h>
#include < ace/Task.h>

class TTcpNetThread : public ACE_Task_Base

public:
/// 运行
int open();

/// 停止运行
int close();
protected:
/// 线程函数
virtual int svc();
;

int TTcpNetThread::open()return this-> activate();

int TTcpNetThread::close()

ACE_Proactor::instance()-> proactor_end_event_loop(); // 终止ACE_Proactor循环

this-> wait(); // 等待清理现场

return 0;


int TTcpNetThread::svc()

/*
ACE_INET_Addr listenAddr(4567); // 默认监听地址
TTcpAcceptor tcpAcceptor; // 接收器

// 演出开始
if (tcpAcceptor.open(listenAddr, 0, 1, 5, 1, 0, 0) != 0)
ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("%p\\n"), ACE_TEXT("failed to open TcpAcceptor errno=%i\\n"), errno), -1);
*/
// Proactor的事件循环开始
ACE_Proactor::instance()-> proactor_run_event_loop();

ACE_DEBUG((LM_DEBUG, ACE_TEXT("Network fin\\n")));

return 0;


#define TCP_CLIENT_THREAD_SEND0x777

const int CLIENT_CONNECTION_NUM_OF_PER_THREAD = 1; //< 客户端每个线程的连接数

#include "ace/SOCK_Connector.h"
#include "ace/SOCK_Stream.h"
/**
* @class TTcpClientThread
* @brief TCP客户端测试线程
*/
class TTcpClientThread : public ACE_Task< ACE_MT_SYNCH>

ACE_SOCK_Connector connector[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 连接器
ACE_SOCK_Stream peerStream[CLIENT_CONNECTION_NUM_OF_PER_THREAD]; //< 流对象

public:
/// ctor
~TTcpClientThread();

/// 运行
int open();

/// 停止运行
int close();
private:
/// 线程函数
virtual int svc(); <

    推荐阅读