`
JonsenElizee
  • 浏览: 44747 次
  • 性别: Icon_minigender_1
  • 来自: 北京
最近访客 更多访客>>
文章分类
社区版块
存档分类
最新评论

ACE TAO Programming Chapter 12. 基本多线程编程 (Basic Multithreaded Programming)

阅读更多
<!-- [endif]-->

Chapter12. 基本多线程编程 (Basic Multithreaded Programming )

12.1 开始 Getting Started

By default, a process is created with a single thread, which we call the main thread. This thread starts executing in the main() function of your program and ends when main() completes. Any extra threads that your process may need have to be explicitly created. To create your own thread with ACE, all you have to do is create a subclass of the ACE_Task_Base class and override the implementation of the virtual svc() method. The svc() method serves as the entry point for your new thread; that is, your thread starts in the svc() method and ends when the svc() method returns, in a fashion similar to the main thread.

You will often find yourself using extra threads to help process incoming messages for your network servers. This prevents clients that do not require responses from blocking on the network server, waiting for long-running requests to complete. In our first example, we create a home automation command handler class, HA_CommandHandler , that is responsible for applying long-running command sequences to the various devices that are connected on our home network. For now, we simulate the long-running processing with a sleep call. We print out the thread identifier for both the main thread and the command handler thread, using the ACE_DEBUG() macro's %t format specifier, so that we can see the two threads running in our debug log:

#include "ace/Task.h"
class HA_CommandHandler : public ACE_Task_Base
{
public:
virtual int svc (void)
{
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Handler Thread running\n")));
ACE_OS::sleep (4);
return 0;
}
};

int ACE_TMAIN (int, ACE_TCHAR *[])
{
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Main Thread running\n")));
HA_CommandHandler handler;
int result = handler.activate ();
ACE_ASSERT (result == 0);
handler.wait ();
return 0;
}

To start the thread, you must create an instance of HA_CommandHandler and call activate() on it. Before doing this, we print out the main thread's identifier so that we can compare both child and main identifiers in our output debug log.

激活子线程后,主线程中 handler 对象调用 wait() 方法,等待直到子线程执行完 svc() 方法体。一旦子线程执行完 svc() 方法体, wait() 方法立即返回,控制也随即退出到 main() 函数,整个进程结束。为什么主线程要等待子线程执行完呢?在许多平台上,一旦主线程退出 main() 函数, C 运行时就以为这是进程已经准备退出的暗示,从而销毁整个运行进程,包括子线程。如果我们让此发生,子线程可能没有机会被调度、运行。

After activating the child thread, the main thread calls wait() on the handler object, waiting for its threads to complete before continuing and falling out of the main() function. Once the child thread completes the svc() method and exits, the wait() call in the main thread will complete, control will fall out of the main() function, and the process will exit. Why does the main thread have to wait for the child thread to complete? On many platforms, once the main thread returns from the main() function, the C runtime sees this as an indication that the process is ready to exit and destroys the entire running process, including the child thread. If we allowed this to happen, the program might exit before the child thread ever got scheduled and got a chance to execute.

The output shows the two threads—the main thread and the child command handler thread—running:



(496) Main Thread running

(3648) Handler Thread running

<!-- [if gte vml 1]><v:shapetype id="_x0000_t75" coordsize="21600,21600" o:spt="75" o:preferrelative="t" path="m@4@5l@4@11@9@11@9@5xe" filled="f" stroked="f"> <v:stroke joinstyle="miter" /> <v:formulas> <v:f eqn="if lineDrawn pixelLineWidth 0" /> <v:f eqn="sum @0 1 0" /> <v:f eqn="sum 0 0 @1" /> <v:f eqn="prod @2 1 2" /> <v:f eqn="prod @3 21600 pixelWidth" /> <v:f eqn="prod @3 21600 pixelHeight" /> <v:f eqn="sum @0 0 1" /> <v:f eqn="prod @6 1 2" /> <v:f eqn="prod @7 21600 pixelWidth" /> <v:f eqn="sum @8 21600 0" /> <v:f eqn="prod @7 21600 pixelHeight" /> <v:f eqn="sum @10 21600 0" /> </v:formulas> <v:path o:extrusionok="f" gradientshapeok="t" o:connecttype="rect" /> <o:lock v:ext="edit" aspectratio="t" /> </v:shapetype><v:shape id="_x0000_i1027" type="#_x0000_t75" alt="" style='width:.75pt; height:.75pt' o:button="t" /><![endif]--><!-- [if !vml]--><!-- [endif]--><!-- [if !supportNestedAnchors]--><!-- [endif]-->12.2 基本线程安全 Basic Thread Safety

One of the most difficult problems you deal with when writing multithreaded programs is maintaining consistency of all globally available data. Because you have multiple threads accessing the same objects and structures, you must make sure that any updates made to these objects are safe. What safety means in this context is that all state information remains in a consistent state.

ACE provides a rich array of primitives to help you to achieve this goal. We cover a few of the most useful and commonly used primitives in the next few sections and continue coverage on the rest of these components in Chapter 14.

12.2.1 使用互斥 Using Mutexes

Mutexes, the simplest protection primitive available, provide a simple acquire() , release() interface. If successful in getting the mutex, the acquiring thread, acquire() , continues forward; otherwise, it blocks until the holder of the mutex releases it by using release() .

As shown in Table 14.1, ACE provides several mutex classes. ACE_Mutex can be used as a lightweight synchronization primitive for threads and as a heavyweight cross-process synchronization primitive.

In the next example, we add a device repository to our home automation example. This repository contains references to all the devices connected to our home network, as well as the interface to apply command sequences to the various devices connected to our home network. Let us suppose that only one thread can make updates in the repository at a time, without causing consistency problems.

The repository creates and manages an ACE_Thread_Mutex object as a data member that it uses to ensure the consistency constraint. This is a common idiom that you will find yourself using on a regular basis. Whenever it calls the update_device() method, a thread first has to acquire the mutex before continuing forward, as only one thread can have the mutex at a time; at no point will two threads simultaneously update the state of the repository. It is important that release() be called on the mutex so that other threads can acquire the repository mutex and update the repository after the first thread is done. When the repository is destroyed, the destructor of the mutex will ensure that it properly releases all resources that it holds:

class HA_Device_Repository
{
public:
HA_Device_Repository ()
{ }

void update_device (int device_id)
{
mutex_.acquire ();
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Updating device %d\n"),
device_id));
ACE_OS::sleep (1);
mutex_.release ();
}
private:
ACE_Thread_Mutex mutex_;
};

To illustrate the mutex in action, we modify HA_CommandHandler to call update_device() on the repository and then create two handler tasks that compete with each other, trying to update devices in the repository at the same time:

class HA_CommandHandler : public ACE_Task_Base
{
public:
enum {NUM_USES = 10};

HA_CommandHandler (HA_Device_Repository& rep) : rep_(rep)
{ }

virtual int svc (void)
{
ACE_DEBUG
((LM_DEBUG, ACE_TEXT ("(%t) Handler Thread running\n")));
for (int i=0; i < NUM_USES; i++)
this->rep_.update_device (i);
return 0;
}

private:
HA_Device_Repository & rep_;
};

int ACE_TMAIN (int, ACE_TCHAR *[])
{
HA_Device_Repository rep;
HA_CommandHandler handler1 (rep);
HA_CommandHandler handler2 (rep);
handler1.activate ();
handler2.activate ();

handler1.wait ();
handler2.wait ();
return 0;
}
The output from this program shows the two handler threads competing to update devices in the repository: 
(3768) Handler Thread running
(3768) Updating device 0
(1184) Handler Thread running
(1184) Updating device 0
(3768) Updating device 1
(1184) Updating device 1
(3768) Updating device 2
(1184) Updating device 2
(3768) Updating device 3
(1184) Updating device 3
(3768) Updating device 4

You may notice that on your platform, one thread may hang onto the repository until it is done before it lets go or that the threads run amok among one another, with no particular order as to which thread uses the repository. You can ensure strict ordering—if that is what you need—by using an ACE_Token , which is discussed in Section 14.1.4 . But be aware that although tokens support strict ordering and are recursive, they are slower and heavier than mutexes.

12.2.2 Using Guards

In many cases, exceptional conditions cause deadlock in otherwise perfectly working code. This usually happens when we overlook an exceptional path and forget to unlock a mutex. Let's illustrate this with a piece of code:



int
HA_Device_Repository::update_device (int device_id)
{
this->mutex_.acquire ();
ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%t) Updating device %d\n"), device_id));

// Allocate a new object.
ACE_NEW_RETURN (object, Object, -1);
// ...
// Use the object

this->mutex_.release ();
}

这里你能很容易的看出问题的所在:当错误发生时, ACE_NEW_RETURN() 返回了,因此阻止了互斥锁的释放 release() ,导致讨厌的死锁。在现实代码中,看出这种返回变得更加困难,而且代码中有多处 release() 的话,代码变得很难看。

You can spot the problem here pretty easily: The ACE_NEW_RETURN() macro returns when an error occurs, thereby preventing the lock from being released, and we have a nasty deadlock. In real code, it becomes more difficult to spot such returns, and having multiple release() calls all over your code can quickly become very ugly.

ACE provides a convenience class that solves this problem. The ACE_Guard set of classes and macros help simplify your code and prevent deadlock situations. The guard is based on the familiar C++ idiom of using the constructor and destructor calls for resource acquisition and release. The guard classes acquire a specified lock when they are constructed, and release the lock when they are destroyed. Using a guard on your stack, you are always assured that the lock will be released, no matter what pathological path your code may wind through.

Using a guard instead of explicit calls to acquire and release the mutex makes the previous snippet of code much easier to read:



int
HA_Device_Repository::update_device (int device_id)
{
// Construct a guard specifying the type of the mutex as
// a template parameter and passing in the mutex to hold
// as a parameter.
ACE_Guard<ACE_Thread_Mutex> guard (this->mutex_);

// This can throw an exception that is not caught here.
ACE_NEW_RETURN (object, Object, -1);
// ..
// Use the object.
// ..
// Guard is destroyed, automatically releasing the lock.
}

All we do here is create an ACE_Guard object on the stack. This automatically acquires and releases the mutex on function entry and exit: just what we want. The ACE_Guard template class takes the lock type as its template parameter and also requires you to pass in a lock object that the guard will operate on.

Table 12.1 lists the rich variety of guards that ACE provides. Most of these guards are self-explanatory; we talk about them and use them in several examples in Chapter 14. For further details on each of the guards, consult the ACE reference documentation.

ACE also provides a set of convenient macros that you can use to allocate guards on the stack. These macros expand to use the guard classes listed in Table 12.1 and perform error checking on the underlying acquire() and release() calls. They return on an error, optionally with a return value. In the lists of macros below, LockType is used as T in the guard class template, GuardName is the name of the guard object that's created, and LockObject is the lock object referenced by the guard.

Table 12.1. ACE Guard Classes

Guard

Description

ACE_Guard<T>

Uses the acquire() and release() methods of lock class T during guard creation and destruction. Thus, you get the semantics of acquire() and release() methods for the specified type T .

ACE_Read_Guard<T>

Uses acquire_read() for acquisition instead of the regular acquire() .

ACE_Write_Guard<T>

Uses acquire_write() for acquisition instead of the regular acquire() .

ACE_TSS_Guard<T>

Allocates the guard on the heap and keeps a reference to it in thread-specific storage. This ensures that the lock is always released even if the thread exits explicitly, using ACE_Thread::exit() .

ACE_TSS_Read_Guard<T>

Read version of a thread-specific guard.

ACE_TSS_Write_Guard<T>

Write version of a thread-specific guard.

The following guard macros do not return values:

<!-- [if !supportLists]-->· <!-- [endif]-->ACE_GUARD (LockType, GuardName, LockObject)

<!-- [if !supportLists]-->· <!-- [endif]-->ACE_WRITE_GUARD (LockType, GuardName, LockObject)

<!-- [if !supportLists]-->· <!-- [endif]-->ACE_READ_GUARD (LockType, GuardName, LockObject)

These guard macros return ReturnValue on an error:

<!-- [if !supportLists]-->· <!-- [endif]-->ACE_GUARD_RETURN (LockType, GuardName, LockObject, ReturnValue)

<!-- [if !supportLists]-->· <!-- [endif]-->ACE_WRITE_GUARD_RETURN (LockType, GuardName, LockObject, ReturnValue)

<!-- [if !supportLists]-->· <!-- [endif]-->ACE_READ_GUARD_RETURN (LockType, GuardName, LockObject, ReturnValue)

In the following code snippet, the guard return macro is used with the device repository:



int
HA_Device_Repository::update_device (int device_id)
{
ACE_GUARD_RETURN (ACE_Thread_Mutex, mon, mutex_, -1);

ACE_NEW_RETURN (object, Object, -1);
// Use the object.
// ...
}

If there is an error, the macro returns –1 from the method; otherwise, it creates an ACE_Guard<ACE_Thread_Mutex> instance called mon on the stack. If the mutex-protected method does not return a value, the not-return-value guards should be used in conjunction with the errno facility.

12.3 任务间通信 Intertask Communication

When writing multithreaded programs, you will often feel the need for your tasks to communicate with one another. This communication may take the form of something simple, such as one thread informing another that it is time to exit, or something more complicated, such as communicating threads passing data back and forth.

In general, intertask communication can be divided into two broad categories:

<!-- [if !supportLists]-->1. <!-- [endif]-->State change or event notifications , whereby only the event occurrence needs to be communicated, but no data is passed between the two threads

<!-- [if !supportLists]-->2. <!-- [endif]-->Message passing , whereby data is passed between the two threads, possibly forming a work chain, in which the first thread processes the data and then passes it along to the next for further processing

12.3.1 使用条件变量 Using Condition Variables

线程可以使用条件变量来告知其他线程状态的改变、事件的到达或者别的线程感兴趣的条件的满足。(但是不能实现线程间信息的传递,这得依靠别的措施)

A thread can use condition variables to communicate a state change, an event arrival, or the satisfaction of another condition to other interested threads.

A condition variable is always used in conjunction with a mutex. These variables also have a special characteristic in that you can do a timed block on the variable. This makes it easy to use a condition variable to manage a simple event loop. We show an example of this when we talk about timers in Chapter 20.

We can easily change our command handler example to use a condition variable to coordinate access to the device repository instead of using a mutex for protection. We start by modifying the repository so that it can record which task currently owns it:


class HA_Device_Repository
{
public:
HA_Device_Repository() : owner_(0)
{ }

int is_free (void)
{
return (this->owner_ == 0);
}

int is_owner (ACE_Task_Base* tb)
{
return (this->owner_ == tb);
}

ACE_Task_Base *get_owner (void)
{
return this->owner_;
}

void set_owner (ACE_Task_Base *owner)
{
this->owner_ = owner;
}

int update_device (int device_id);

private:
ACE_Task_Base * owner_;
};

Next, we modify the command handler such that it uses a condition variable (waitCond_ ) and mutex (mutex_ ), to coordinate access to the repository. Both the condition variable and the mutex are created on the main() thread stack and are passed to the command handlers during construction.

使用条件变量时,首先你必须得到互斥锁,检查系统是否满足要求的状态,如果是,则执行操作,释放锁。否则,你必须在条件变量上调用 wait() 以等待系统状态变化。一旦系统状态变化,导致变化的线程通知条件变量,叫醒若干等待变化的线程。等待的线程醒来后,检查系统状态,如果状态可用,执行要求的操作,否则,继续等待。

To use a condition variable, you must first acquire the mutex, check whether the system is in the required state—the required condition is true—and, if so perform the required action and then release the mutex. If the condition is not in the required state, you must call wait() on the condition variable, waiting for the system state to change. Once the system state changes, the thread that is making the change signals the condition variable, waking up one or more of the threads that are waiting for the change. The waiting threads wake up, check the system state again, and, if the state is still amenable, perform the required action; otherwise, they wait again.

In the command handler, the handler thread is waiting for the is_free() condition to become true on the repository. If this happens to be the case, the handler thread successfully acquires the repository and marks itself as the owner, after which it frees the mutex. If any other competing handler tries to acquire the repository for update at this time, the is_free() method will return 0, and the thread will block by calling wait() on the condition variable.

Once the successful thread is done updating, it removes itself as the owner of the repository and calls signal() on the condition variable. This causes the blocked thread to wake up, check whether the repository is free, and, if so, go on its merry way acquiring the repository for update.

You may notice that the blocking thread does not release the mutex before it falls asleep on wait() ; nor does it try to acquire it once it wakes up. The reason is that the condition variable ensures the automatic release of the mutex right before falling asleep and acquisition of the mutex just before waking up:

int
HA_CommandHandler::svc (void)
{
ACE_DEBUG ((LM_DEBUG,
ACE_TEXT ("(%t) Handler Thread running\n")));

for (int i = 0; i < NUM_USES; i++)
{
this->mutex_.acquire ();

while (!this->rep_.is_free ())
{
this->waitCond_.wait (); // block and wait for the condition to change
}

this->rep_.set_owner (this);
this->mutex_.release ();

this->rep_.update_device (i);

ACE_ASSERT (this->rep_.is_owner (this));
this->rep_.set_owner (0);

this->waitCond_.signal (); // note all the waiting thread and let them wake up from wait()
}

return 0;
}

As usual, we create two handlers that compete with each other to acquire the repository. Both are passed the condition variable, mutex, and repository by reference when the handler is constructed.

The ACE_Condition class is a template and requires the type of mutex being used as its argument. Because we are coordinating access between threads, we use ACE_Thread_Mutex as the mutex type. The condition variable instance also keeps a reference to the mutex, so that it can automatically acquire and release it on the wait() call, as described earlier. This reference is passed to the condition variable during construction:


int ACE_TMAIN (int, ACE_TCHAR *[])
{
HA_Device_Repository rep;
ACE_Thread_Mutex rep_mutex;
ACE_Condition<ACE_Thread_Mutex> wait (rep_mutex);

HA_CommandHandler handler1 (rep, wait, rep_mutex);
HA_CommandHandler handler2 (rep, wait, rep_mutex);

handler1.activate ();
handler2.activate ();

handler1.wait ();
handler2.wait ();

return 0;
}

12.3.2 消息传递 Message Passing

如前面提到的,消息传递经常用来实现线程间数据和事件的交互。发送者把消息压入队列,接收者阻塞或检查队列,一旦消息到达,接收者从队列中把消息取出来,使用,然后等待新的消息。

As mentioned earlier, message passing is often used to communicate data and event occurrences between threads. The sender creates a message that it enqueues on a message queue for the receiver to pick up. The receiver is either blocked or polling the queue, waiting for new data to arrive. Once the data is in the queue, the receiver dequeues the data, uses it, and then goes back to waiting for new data on the queue.

队列在线程间扮演了共享资源的角色,所以入队出队操作要受保护。如果队列支持阻塞式的出队操作是很方便的(阻塞在新消息到达时解锁)。最终,在主线线程中,如果每个子线程都拥有一个绑定的消息队列,交互就很方便了。这样,我们不必有一个全局的队列。如果一个任务引用到别的任务,它可以发送消息给所引用的任务。

The queue acts as a shared resource between the two threads, and thus the enqueue and dequeue operations must be protected. (See Figure 12.1.) It also would be handy if the queue supports a blocking dequeue call that unblocks when new data arrives. Finally, it would be convenient if each task object that we created comes out of the box with a message queue attached to it. That way, we wouldn't have to have a global queue. Instead, if a task has a reference to any other task, it can send it messages.

Figure 12.1. Using a queue for communication


<!-- [if gte vml 1]><v:shape id="_x0000_i1026" type="#_x0000_t75" style='width:399.75pt;height:69.75pt'> <v:imagedata src="file:///C:\DOCUME~1\ZENGQI~1\LOCALS~1\Temp\msohtml1\01\clip_image002.gif" mce_src="file:///C:\DOCUME~1\ZENGQI~1\LOCALS~1\Temp\msohtml1\01\clip_image002.gif" o:title="snap0009" /> </v:shape><![endif]--><!-- [if !vml]--><!-- [endif]-->

幸运的是所有这些特征 ACE 中都体现了。所以,我们使用 ACE_Task_Base 作为所有线程例子的基类。 ACE 也提供一个从模板类 ACE_Task 派生来的类来在线程间操作消息。从 ACE_Task 类继承而来的类自动继承了 ACE_Message_Queue 类型的一个消息队列( message queue ),你可以在你的新类中使用它。早在 System V 中队列出现后, ACE_Message_Queue 就开始被构建了,然而,不像类似 System V 的系统, ACE 允许进程中的任务间通信(线程间通信),而不提供进程间通信。

Fortunately, all these features come out of the box with ACE. Up to this point, we have been using ACE_Task_Base as the base class for all our example threads. ACE also provides a facility to queue messages between threads that are derived from the ACE_Task template. By deriving your class from ACE_Task , you automatically inherit a message queue of type ACE_Message_Queue , which you can use in your new class. ACE_Message_Queue is modeled after the queueing facilities available with System V streams. However, unlike their System V counterparts, the ACE facility allows for efficient intertask communication within a single process and does not provide for interprocess communication.

消息队列提供一个类型安全的接口,允许你入队类型为 ACE_Message_Block 的消息。

The message queue provides a type-safe interface, allowing you to enqueue messages that are instances of ACE_Message_Block .

消息块 Message Blocks

ACE_Message_Block 是一个很高效的容器类,能用来高效的存储和共享消息。你可以把它比作是一个高级的支持类似计数、数据共享良好特性的数据缓存器。每个消息块包含有两个指针:一个 rd_ptr() ,指向下一个能被读的字节,一个 wr_ptr() ,指向下一个能被写的空字节。你能利用这两个指针往消息块中放入数据或者从消息块中拷出数据。

The ACE_Message_Block is an efficient data container that can be used to efficiently store and share messages. You can think of the message block as an advanced data buffer that supports such nice features as reference counting and data sharing. Each message block contains two pointers: a rd_ptr() , which points to the next byte to be read, and a wr_ptr() , which points to the next available empty byte. You can use these pointers to copy data into and get data out of the message block.

你能使用 copy() 方法将数据拷贝进信息块中:

You can use the copy() method to copy data into the message block:

ACE_Message_Block *mb;
ACE_NEW_RETURN (mb, ACE_Message_Block (128), -1);
const char *deviceAddr= "Dev#12";
mb->copy (deviceAddr, ACE_OS::strlen (deviceAddr)+1);

Or, you can use the wr_ptr() directly. When doing so, you must move the wr_ptr() forward manually, so that the next write is at the end of the buffer:

ACE_Message_Block *mb;
ACE_NEW_RETURN (mb, ACE_Message_Block (128), -1);
const char *commandSeq= "CommandSeq#14";
ACE_OS::sprintf (mb->wr_ptr (), commandSeq);
// Move the wr_ptr() forward in the buffer by the
// amount of data we just put in.
mb->wr_ptr (ACE_OS::strlen (commandSeq) +1); //note, the distance to move is commandSeq+1

The rd_ptr() is similar to the write pointer. You can use it directly to get the data, but you must be careful to move it forward by the number of bytes you have already read so that you don't read the same data over and over. Once you are done working with the message block, release it using the release() method, causing the reference count to be decremented. When the count reaches 0, ACE will automatically release the memory that was allocated for the block:

ACE_DEBUG((LM_DEBUG,
ACE_TEXT ("Command Sequence --> %s\n"),
mb->rd_ptr ()));
mb->rd_ptr (ACE_OS::strlen (mb->rd_ptr ())+1);
mb->release ();

Message blocks also include a type field, which can be set during construction or through the msg_type() modifier. The message-type field comes in handy when you want to distinguish processing of the message based on its type or to send a simple command notification. An example of the latter is the use of ACE_Messsage_Block::MB_HANGUP message type to inform the message receiver that the source has shut down:



// Send a hangup notification to the receiver.

ACE_NEW_RETURN

 

(mb, ACE_Message_Block (128, ACE_Message_Block::MB_HANGUP), -1);

// Send an error notification to the receiver.

mb->msg_type (ACE_Message_Block::MB_ERROR);

ACE_Message_Block 也提供 duplicate() 方法用来创建一个新的对块数据的引用,同时增加引用计数值,有必要的话还对块数据执行深度拷贝。

ACE_Message_Block also offers the methods duplicate() , to create a new reference to the block's data, incrementing the reference count, and clone() , to create a deep copy of the message block.

使用消息队列 Using the Message Queue

为演示 ACE_Task 潜在消息队列的使用,我们扩展之前的自动处理器的例子,包含一个从 ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> 派生来的叫 Message_Receiver 的处理器类,它接受用 TCP 链接来的远端客户端的命令消息。当接收到命令, Message_Receiver 首先把它们包装成消息块,然后把消息块放入命令处理器的消息队列中。 HA_Command_Handler 类从 ACE_Task 类派生而来,而不是 ACE_Task_Base ,因此继承了要求的消息队列功能。命令处理器线程挂起,等待命令消息的到消息队列中来;当接收到消息,处理器继续工作处理命令。

To illustrate the use of ACE_Task and its underlying message queue, we extend our previous automation handler example to include a handler derived from ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_MT_SYNCH> , called Message_Receiver , that receives command messages for the devices on the network from TCP-connected remote clients. (For more on this, see Chapter 6.) On receipt of the commands, Message_Receiver first encapsulates them in message blocks and then enqueues them on the command handler's message queue. The HA_Command_Handler derives from ACE_Task instead of ACE_Task_Base , thus inheriting the required message queue functionality. The command handler thread spends its time waiting for command message blocks to arrive on its message queue; on receiving these messages, the handler proceeds to process the commands.

远端命令消息有一个简单的头, DeviceCommandHeader ,后面是以 NULL 结尾的命令串。

The remote command messages have a simple header, DeviceCommandHeader , followed by a payload that consists of a null-terminated command string:


struct DeviceCommandHeader
{
int length_;
int deviceId_;
};

Message_Receiver 服务处理器用 length_ 弄明白命令串的长度。一旦它知道命令的长度,服务处理器创建一个 ACE_Message_Block 类的有精确大小的实例:命令串的长度加上头的长度。然后处理器拷贝头和命令串到消息块中,再使用 ACE_Task::putq() 方法把消息块放入命令处理器的消息队列中( Message_Receiver 拥有一个对 HA_Command_Handler 的引用,从构造函数中取得的)。如果从头中读到的 device ID 是个负值,那么我们就认为系统需要停止下来;为实现这个,我们发送一个挂起消息给 HA_Command_Handler

The Message_Receiver service handler uses the length_ field of the header to figure out the size of the payload. Once it knows the length of the payload, the service handler can create an ACE_Message_Block of the exact size: the length of the payload plus the length of the header. The handler then copies the header and payload into the message block and enqueues it on the command handler task's message queue, using the ACE_Task::putq() method. (A reference to the HA_Command_Handler is kept by the Message_Receiver , which it receives on construction.) If the device ID read in from the header is negative, we use it as an indication that the system needs to shut down; to do this, we send a hangup message to HA_Command_Handler :


int
Message_Receiver::handle_input (ACE_HANDLE)
{
DeviceCommandHeader dch;
if (this->read_header (&dch) < 0) //read the header
return -1;

if (dch.deviceId_ < 0)
{
// Handle shutdown.
this->handler_->putq (shut_down_message ()); //send hangup msg to cmd handler
return -1;
}
// else, proceed and process the cmd msg

ACE_Message_Block *mb;
ACE_NEW_RETURN (mb, ACE_Message_Block (dch.length_ + sizeof dch), -1);

// Copy the header.
mb->copy ((const char*)&dch, sizeof dch); //sizeof dch, wonderful code here

// Copy the payload.
if (this->copy_payload (mb, dch.length_) < 0)
ACE_ERROR_RETURN ((LM_ERROR, ACE_TEXT ("%p\n"),
ACE_TEXT ("Recieve Failure")), -1);

// Pass it off to the handler thread.
this->handler_->putq (mb); //enqueue
return 0;
}

看看命令串是如何被拷贝到消息块中的是有好处的。我们直接把 wr_ptr() 传递给 ACE_SOCK_Stream::read_n() ,这个方法直接把数据拷贝进块中。把数据拷贝进块中后,我们的把 wr_ptr() 向前挪命令串长度个单位。记住我们做这些前一定要把 wr_ptr() 先前挪头长度个单位;因此,消息块现在包含头,紧跟着命令串。

It is instructive to look at how the payload is copied into the provided message block. We pass the wr_ptr() directly to ACE_SOCK_Stream::read_n() , which copies the data directly into the block. After the data is copied into the block, we advance the wr_ptr() by the size of the payload. Recall that we had moved the wr_ptr() for the message block forward by the size of the header before we made this call; therefore, the message block now contains the header, followed immediately by the payload:


int
Message_Receiver::copy_payload (ACE_Message_Block *mb, int payload_length)
{
int result = this->peer ().recv_n (mb->wr_ptr (), payload_length);

if (result <= 0)
{
mb->release (); //remember the release of the mb.
return result;
}

mb->wr_ptr (payload_length);
return 0;
}
When it gets a command to shut down the system, the message receiver creates a new message block that has no data in it but has the MB_HANGUP type set. When it receives a message, the HA_Command_Handler first checks the type; if it is a hang-up message, it shuts down the system:
ACE_Message_Block *
Message_Receiver::shut_down_message (void)
{
ACE_Message_Block *mb;
ACE_NEW_RETURN
(mb, ACE_Message_Block (0, ACE_Message_Block::MB_HANGUP), 0);
return mb;
}

On the other side of the fence, the HA_CommandHandler thread blocks, waiting for messages to arrive on its queue by calling getq() on itself. Once a message arrives, getq() will unblock; the handler then reads the messages and applies the received command to the device repository. Finally, it releases the message block, which will deallocate the used memory as the block's reference count drops to 0.

As we said earlier, the system uses a message of type MB_HANGUP to inform the server to shut down. On receiving a message of this type, the handler stops waiting for incoming messages and shuts down the reactor, using the ACE_Reactor::end_reactor_event_loop() method. This causes the command handler process to shut down.

其他队列类型 Other Queue Types

ACE provides various ACE_Message_Queue subclasses that provide more than the vanilla FIFO queueing available with ACE_Message_Queue . The ACE_Dynamic_Message_Queue offers priority queues, which include dynamic priority adjustments based on various algorithms. ACE also offers several platform-specific queues that incorporate OS-specific characteristics.

You can specify the queue type used by an ACE_Task during construction or by using the msg_queue() modifier. The queue types are listed in Table 12.2.

Table 12.2. Various Queue Types

Name

Description

ACE_Dynamic_Message_Queue

A priority-queue implementation that dynamically readjusts the priority of a message, using variants of the earliest-deadline-first scheme and a laxity (time to deadline minus worst-case execution time) schemes. For details, see the ACE reference documentation.

ACE_Message_Queue_Vx

Wrapper around the Wind River VxWorks message queue facility.

ACE_Message_Queue_Ex

An even more type-safe version of ACE_Message_Queue .

ACE_Message_Queue_NT

Implementation that is built on Windows NT's I/O completion port features.[a]

[a] ACE_Message_Queue_NT does not derive from ACE_Message_Queue and therefore cannot be used with ACE_Task.

<!-- [if gte vml 1]><v:shape id="_x0000_i1025" type="#_x0000_t75" alt="" style='width:.75pt;height:.75pt' o:button="t" /><![endif]--><!-- [if !vml]--><!-- [endif]--><!-- [if !supportNestedAnchors]--><!-- [endif]-->12.4 总结 Summary

<!-- [if gte mso 9]><xml> <w:WordDocument> <w:View>Normal</w:View> <w:Zoom>0</w:Zoom> <w:PunctuationKerning /> <w:DrawingGridVerticalSpacing>7.8 磅</w:DrawingGridVerticalSpacing> <w:DisplayHorizontalDrawingGridEvery>0</w:DisplayHorizontalDrawingGridEvery> <w:DisplayVerticalDrawingGridEvery>2</w:DisplayVerticalDrawingGridEvery> <w:ValidateAgainstSchemas /> <w:SaveIfXMLInvalid>false</w:SaveIfXMLInvalid> <w:IgnoreMixedContent>false</w:IgnoreMixedContent> <w:AlwaysShowPlaceholderText>false</w:AlwaysShowPlaceholderText> <w:Compatibility> <w:SpaceForUL /> <w:BalanceSingleByteDoubleByteWidth /> <w:DoNotLeaveBackslashAlone /> <w:ULTrailSpace /> <w:DoNotExpandShiftRetu

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics