原文地址:http://www.ibm.com/developerworks/cn/java/j-lo-activeobject/

本文主要从以下两个方面进行阐述:

  • 使用 C++ 语言,来描述 Active Object 设计模式。

    Java 类库对于这样一个典型的模式做了很好的类库层面的封装,因此对于 Java 的开发者来说,很多关于该设计模式本身的东西被屏蔽掉了。本文试图使用 Native C++ 语言,帮助读者从本质上对 Active Object 设计模式有一个更全面的认识。

  • 结合 C++ 版本的 Active Object 设计模式,引领读者对于 Active Object 设计模式在 Java 类库中的支持,有一个更深刻的认识,帮助读者正确并且有效地使用这些类库。

预备知识

并发对象 (Concurrent Object)

在这里,我们先定义一下,什么是并发对象。不同于一般的对象,并发对象指的是该对象方法的调用与方法的执行不在同一个线程內,也即:该对象方法被异步执行。这其实是在多线程编程环境下典型的计算特征,线程引入了异步。从另一个角度来看,并发对象其实是面向对象的概念应用于多线程计算环境下的产物。

 

回页首

Active Object 设计模式 C++ 描述

我们将从以下几个方面来讨论 Active Object 模式。

问题描述

我们都知道,基于多线程机制的并发编程模式可以极大提高应用的 QoS(Quality of Service)。典型的例子如,我们在开发服务器端的应用时,通行的做法就是通过多线程机制并发地服务客户端提交上来的请求,以期提高服务器对客户端的反应度 (Responsiveness)。同时,相比于单线程的应用,并发的多线程应用相对要复杂得多。在多线程的计算环境里,并发对象被所有的调用者线程所共享。一般来说,并发对象的设计实现需要考虑下面的几个重要因素:

  • 并发对象的任何一次的方法执行,不允许无限地或者长时间阻止其它方法的调用执行,从而影响应用的 QoS。
  • 由于并发对象被调用者线程所共享,其内部状态必须保证是线程安全的,必须受限于某些线程同步约束,并且这些约束对调用者来说是透明的,不可见的。从调用者的角度来看,并发对象与普通对象没有多大区别。
  • 并发对象的设计实现,应该能够透明地利用底层平台所提供的并发机制。这样做的好处是,当并发对象运行在诸如多核处理器这类底层硬件平台上时,我们的应用能够充分挖掘底层平台带来的并发优势,以获得足够好的应用性能。

我们使用 Active Object 设计模式来解决这些问题。

Active Object 设计模式的本质是解耦合方法的调用 (Method invocation) 与方法的执行 (Method execution),方法调用发生在调用者线程上下文中,而方法的执行发生在独立于调用者线程的 Active Object 线程上下文中。并且重要的一点是,该方法与其它普通的对象成员方法对于调用者来说,没有什么特别的不同。从运行时的角度来看,这里涉及到两类线程,一个是调用者线程,另外一个是 Active Object 线程,我们会在下面更详细地谈到。

结构

在 Active Object 模式中,主要有以下几种类型的参与者:

  • 代理 (Proxy) :代理是 Active Object 所定义的对于调用者的公共接口。运行时,代理运行在调用者线程的上下文中,负责把调用者的方法调用转换成相应的方法请求 (Method Request),并将其插入相应的 Activation List,最后返回给调用者 Future 对象。
  • 方法请求:方法请求定义了方法执行所需要的上下文信息,诸如调用参数等。
  • Activation List:负责存储所有由代理创建的,等待执行的方法请求。从运行时来看,Activation List 会被包括调用者线程及其 Active Object 线程并发存取访问,所以,Activation List 实现应当是线程安全的。
  • 调度者 (Scheduler):调度者运行在 Active Object 线程中,调度者来决定下一个执行的方法请求,而调度策略可以基于很多种标准,比如根据方法请求被插入的顺序 FIFO 或者 LIFO,比如根据方法请求的优先级等等。
  • Servant: Servant 定义了 Active Object 的行为和状态,它是 Proxy 所定义的接口的事实实现。
  • Future: 调用者调用 Proxy 所定义的方法,获得 Future 对象。调用者可以从该 Future 对象获得方法执行的最终结果。在真实的实现里,Future 对象会保留一个私有的空间,用来存放 Servant 方法执行的结果。

执行序列图

在 Active Object 设计模式中,在参与者之间将发生如下的协作过程:

  1. 方法请求的构造与调度。调用者调用 Proxy 的方法 method(),Proxy 创建相应的方法请求,把它传给调度者 (Scheduler),调度者负责把该方法请求放入 Activation List 中。如果 method() 需要返回执行结果,Proxy 返回一个 Future 对象给调用者(图 1 中步骤 1 到步骤 6)。
  2. 方法请求执行。调度者负责从 Activation List 队列里按照预先定义的规则拿出下一个可执行的方法请求,并把该请求绑定到相应 Servant 所定义的方法(图 1 中步骤 7 到步骤 11)。
  3. 完成阶段。保存任何 Servant 方法执行的结果到 Future 对象中去(图 1 中步骤 12)。重复第二步,调度者继续轮询 Activation List 队列,看是否有下一个可执行的方法请求。
图 1. Active Object Sequence Diagram.

从图 1 我们可以看到,步骤 1 到步骤 6 运行在调用者线程中,而步骤 7 到步骤 12 运行在 Active Object 的线程中。

实现

在本节中,我们给出 Active Object 的 C++ 示例实现。

调用者调用 Proxy 的 get() 方法,从 Active Object 获得 Message。我们可以假定,在真实的应用中, get() 方法的实现受制于某些慢速的 IO 操作,比如需要通过 TCP Socket 从远端的机器获得 Message, 然后返回给调用者。所以我们使用 Active Object 来实现该应用,通过线程的并发达到提高应用的 QoS。

  1. 实现 Servant,如清单 1 所示:

    清单 1. MQ_Servant
    class MQ_Servant {
    public:
    // Constructor and destructor.
    MQ_Servant (size_t mq_size);
    virtual ~MQ_Servant (); // Message queue implementation operations.
    void put (const Message &msg);
    Message get (); // Predicates.
    bool empty () const;
    bool full () const;
    private:
    // Internal queue representation, e.g., a circular
    // array or a linked list, that does not use any
    // internal synchronization mechanism.
    };

    MQ_Servant 是真正的服务提供者,实现了 Proxy 中定义的方法。put() 和 get() 方法用来操作底层的队列。另外,Servant 的实现是纯粹的应用逻辑实现,或者称为商业逻辑实现,没有混合任何的线程同步机制 , 这有利于我们进行应用逻辑的重用,而不需要考虑不同的线程同步机制。

  2. 实现 Proxy,如清单 2 所示:
    清单 2. MQ_Proxy
    class MQ_Proxy {
    public:
    // Bound the message queue size.
    enum { MQ_MAX_SIZE = 100 };
    MQ_Proxy (size_t size = MQ_MAX_SIZE)
    :scheduler_ (size),
    servant_ (size) {
    } // Schedule <put> to execute on the active object.
    void put (const Message &msg) {
    Method_Request *mr = new Put(servant_,msg);
    scheduler_.insert (mr);
    } // Return a <Message_Future> as the "future" result of
    // an asynchronous <get> method on the active object.
    Message_Future get () {
    Message_Future result;
    Method_Request *mr = new Get (&servant_,result);
    scheduler_.insert (mr);
    return result;
    } // empty() and full() predicate implementations ...
    private:
    // The servant that implements the active object
    // methods and a scheduler for the message queue.
    MQ_Servant servant_;
    MQ_Scheduler scheduler_;
    };

    同一个进程中的多个调用者线程可以共享同一个 Proxy。

  3. 实现 Method Request,如清单 3 所示:
    清单 3. Method_Request
    class Method_Request {
    public:
    // Evaluate the synchronization constraint.
    virtual bool can_run () const = 0
    // Execute the method.
    virtual void call () = 0;
    };
    // Inherites from Method_Request
    class Get : public Method_Request {
    public:
    Get (MQ_Servant *rep, const Message_Future &f)
    :servant_ (rep),
    result_ (f)
    {
    }
    virtual bool can_run () const {
    // Synchronization constraint: cannot call a
    // <get> method until queue is not empty.
    return !servant_->empty ();
    } virtual void call () {
    // Bind dequeued message to the future result.
    result_ = servant_->get ();
    }
    private:
    MQ_Servant *servant_;
    Message_Future result_;
    };
  4. 实现 Activation List,如清单 4 所示:
    清单 4. Activation_List
    class Activation_List {
    public:
    // Block for an "infinite" amount of time waiting
    // for <insert> and <remove> methods to complete.
    enum { INFINITE = -1 }; // Define a "trait".
    typedef Activation_List_Iterator iterator; Activation_List (); // Insert <method_request> into the list, waiting up
    // to <timeout> amount of time for space to become
    // available in the queue. Throws the <System_Ex>
    // exception if <timeout> expires.
    void insert (Method_Request *method_request,Time_Value *timeout = 0); // Remove <method_request> from the list, waiting up
    // to <timeout> amount of time for a <method_request>
    // to be inserted into the list. Throws the
    // <System_Ex> exception if <timeout> expires.
    void remove (Method_Request *&method_request, Time_Value *timeout = 0); private:
    // Synchronization mechanisms, e.g., condition
    // variables and mutexes, and the queue implementation,
    // e.g., an array or a linked list, go here.
    };

    Activation List 的实际上就是一个线程同步机制保护下的 Method Request 队列,对该队列的所有操作 (insert/remove) 都应该是线程安全的。从本质上讲,Activation List 所基于的就是典型的生产者 / 消费者并发编程模型,调用者线程作为生产者把 Method Request 放入该队列,Active Object 线程作为消费者从该队列拿出 Method Request, 并执行。

  5. 实现 Scheduler,如清单 5 所示:
    清单 5. MQ_Scheduler
    class MQ_Scheduler {
    public:
    // Initialize the <Activation_List> and make <MQ_Scheduler>
    // run in its own thread of control.
    // we call this thread as Active Object thread.
    MQ_Scheduler ()
    : act_list_() {
    // Spawn separate thread to dispatch method requests.
    // The following call is leveraging the parallelism available on native OS
    // transparently
    Thread_Manager::instance ()->spawn (&svc_run,this);
    }
    // ... Other constructors/destructors, etc. // Put <Method_Request> into <Activation_List>. This
    // method runs in the thread of its client,i.e.
    // in the proxy's thread.
    void insert (Method_Request *mr) {
    act_list_.insert (mr);
    } // Dispatch the method requests on their servant
    // in its scheduler's thread of control.
    virtual void dispatch () {
    // Iterate continuously in a separate thread(Active Object thread).
    for (;;) {
    Activation_List::iterator request;
    // The iterator's <begin> method blocks
    // when the <Activation_List> is empty.
    for(request = act_list_.begin (); request != act_list_.end ();++request){
    // Select a method request whose
    // guard evaluates to true.
    if ((*request).can_run ()) {
    // Take <request> off the list.
    act_list_.remove (*request);
    (*request).call () ;
    delete *request;
    } // Other scheduling activities can go here,
    // e.g., to handle when no <Method_Request>s
    // in the <Activation_List> have <can_run>
    // methods that evaluate to true. } }
    } private:
    // List of pending Method_Requests.
    Activation_List act_list_; // Entry point into the new thread.
    static void *svc_run (void *arg) {
    MQ_Scheduler *this_obj = static_cast<MQ_Scheduler *> (args);
    this_obj->dispatch ();
    }
    };
  6. 实现 Future,如清单 6 所示:
    清单 6. Message_Future
    class Message_Future {
    public:
    // Initializes <Message_Future> to
    // point to <message> immediately.
    Message_Future (const Message &message); //Other implementatio…… // Block upto <timeout> time waiting to obtain result
    // of an asynchronous method invocation. Throws
    // <System_Ex> exception if <timeout> expires.
    Message result (Time_Value *timeout = 0) const;
    private:
    //members definition here……
    };

    事实上,对于调用者来说,可以通过以下的方式从 Future 对象获得真实的执行结果 Message:

    • 同步等待。调用者调用 Future 对象的 result() 方法同步等待,直到后端的 Servant 相应方法执行结束,并把结果存储到了 Future 对象中来,result 返回,调用者获得 Message。
    • 同步超时等待。调用者调用 Future 对象的 result(timeout) 方法。如果过了 timeout 时间之后,后端的 Servant 相应方法执行仍未结束,则调用失败,否则,调用者线程被唤醒,result 方法返回,调用者获得 Message。
    • 异步查询。调用者可以通过调用 Future 对象定义的查询方法 ( 清单 6 没有提供相应的定义 ),查看真实的结果是否准备好了,如果准备好了,调用 result 方法,直接获得 Message。

    清单 7 是使用该 Active Object 的示例。

    清单 7. Active Object 使用
    MQ_Proxy message_queue;
    
    //Optioin 1. Obtain future and block thread until message arrives.
    Message_Future future = message_queue.get();
    Message msg = future.result();
    //Handle received message here
    handle(msg); //2. Obtain a future (does not block the client).
    Message_Future future = message_queue.get (); //The current thread is not blocked, do something else here...
    //Evaluate future and block if result is not available.
    Message msg = future.result ();
    //Handle received message here
    handle(msg);

    从清单 7 可以看到,MQ_Proxy 对于调用者而言,和一个普通的 C++ 定义的对象并没有区别,并发的实现细节已经被隐藏。

 

回页首

Java 对 Active Object 支持

Java JDK 1.3 引入了 java.util.Timer 和 java.util.TimerTask,提供了对 timer-based 并发任务支持,Timer 和 TimerTask 可以看作是 Active Object 设计模式在 Java 中的实现。不过,在这里我们不打算过多讨论 Timer 及其 TimerTask。由于 Timer 和 TimerTask 的缺陷性,例如 Timer 使用单线程执行 TimerTask 导致的 Task 调度时间的不精确性等问题。从 Java1.5 开始,Java 建议使用 ScheduledThreadPoolExecutor 作为 Timer 的替代。

在这里,我们讨论一下自 Java1.5 引入的 Executor Framework。Java1.5 的 Executor Framework 可以看作是 Active Object 设计模式在 Java 中的体现。不过 Java 的 Executor Framework 极大地简化了我们前面所讨论的 Active Object 所定义的模式。

Java 的 Executor Framework 是一套灵活强大的异步任务执行框架,它提供了标准的方式解耦合任务的提交与任务的执行。Java Executor 框架中的任务指的是实现了 Runnable 或者 Callable 接口的对象。Executor 的示例用法如清单 8 所示:

清单 8. Java Executor 示例代码
public class TaskExecutionTcpServer {
private static final int NTHREADS = 100;
private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
public void handleRequest(Socket connection) {
// Handle the incoming socket connection from
//individual client.
}
};
exec.execute(task);
}
}
}

在示例 8 中,我们创建了一个基于线程池的 Java Executor, 每当新的 TCP 连接进来的时候,我们就分配一个独立的实现了 Runnable 任务来处理该连接,所有这些任务运行在我们创建的有 100 个线程的线程池上。

我们可以从 Active Object 设计模式的角度来审视一下 Java Executor 框架。Java Executor 框架以任务 (Task) 为中心,简化了 Active Object 中的角色分工。可以看到,实现 Runnable 或者 Callable 接口的 Java Executor 任务整合了 Method Request 和 Servant 的角色 , 通过实现 run() 或者 call() 方法实现应用逻辑。Java Executor 框架并没有显式地定义 Proxy 接口,而是直接调用 Executor 提交任务,这里的 Executor 相当于 Active Object 中调度者角色。从调用者的角度来看,这看起来并不像是在调用一个普通对象方法,而是向 Executor 提交了一个任务。所以,在这个层面上说,并发的底层细节已经暴露给了调用者。对于 Java 的开发者来说,如果你不担心这样的底层并发细节直接暴露给调用者,或者说你的应用并不需要像对待普通对象一样对待并发对象,Java 的 Executor 框架是一个很好的选择。相反,如果你希望隐藏这样的并发细节,希望像操纵普通对象一样操纵并发对象,那你就需要如本文上节所描述的那样,遵循 Active Object 设计原则 , 清晰地定义各个角色,实现自己的 Active Object 模式。

总而言之,Java Executor 框架简化了 Active Object 所定义的模式,模糊了 Active Object 中角色的分工,其基于生产者 / 消费者模式,生产者和消费者基于任务相互协作。

 

回页首

总结

最后,我们讨论一下 Active Object 设计模式的优缺点。

Active Object 给我们的应用带来的好处:

  • 极大提高了应用的并发性以及简化了线程同步带来的复杂性。并发性的提高得益于调用者线程与 Active Object 线程的并发执行。简化的线程同步复杂性主要表现在所有线程同步细节封装在调度者内 ( 也就是 Java 的 Executor 对象 ),Active Object 调用者并不需要关心。
  • 在 Active Object 中,方法的执行顺序可以不同于方法的调用顺序。用 Java 的话说,也就是任务执行的顺序可以不同于任务提交的顺序。在一定情况下,这可以帮助优化我们应用的性能,提高应用的 QoS 及其 Responsiveness。在 Java Executor 框架下,你可以根据当前的计算资源,确定优化的执行策略 (Execution Policy),该执行策略的内容包括:任务将分配在多少线程上执行,以什么顺序执行,多少任务可以同时执行等等。

当然,Active Object 也有缺点:

  • 额外的性能开销。这涉及到从调用者线程到 Active Object 线程的上下文切换,线程同步,额外的内存拷贝等。
  • 难于调试。Active Object 引入了方法的异步执行,从调试者的角度看,调试这样的方法调用不像普通方法那样直截了当,并且这其中涉及到了线程的调度,同步等。

Active Object 并发模式在 Java 中的应用--转载的更多相关文章

  1. 探索 Java 同步机制[Monitor Object 并发模式在 Java 同步机制中的实现]

    探索 Java 同步机制[Monitor Object 并发模式在 Java 同步机制中的实现] https://www.ibm.com/developerworks/cn/java/j-lo-syn ...

  2. 【多线程与并发】:Java中的锁

    锁的概念 锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁可以防止多个线程同时访问共享资源(但有些锁可以允许多个线程并发的访问共享资源,如读写锁). 在JDK1.5之前,Java是通过sync ...

  3. Java并发编程:Java中的锁和线程同步机制

    锁的基础知识 锁的类型 锁从宏观上分类,只分为两种:悲观锁与乐观锁. 乐观锁 乐观锁是一种乐观思想,即认为读多写少,遇到并发写的可能性低,每次去拿数据的时候都认为别人不会修改,所以不会上锁,但是在更新 ...

  4. Java多线程并发03——在Java中线程是如何调度的

    在前两篇文章中,我们已经了解了关于线程的创建与常用方法等相关知识.接下来就来了解下,当你运行线程时,线程是如何调度的.关注我的公众号「Java面典」了解更多 Java 相关知识点. 多任务系统往往需要 ...

  5. 【并发编程】Java中的原子操作

    什么是原子操作 原子操作是指一个或者多个不可再分割的操作.这些操作的执行顺序不能被打乱,这些步骤也不可以被切割而只执行其中的一部分(不可中断性).举个列子: //就是一个原子操作 int i = 1; ...

  6. 分析java中clone()方法 (转载+修改)

    Java中的clone() 方法 java所有的类都是从java.lang.Object类继承而来的,而Object类提供下面的方法对对象进行复制. protected native Object c ...

  7. Builder模式在Java中的应用

    在设计模式中对Builder模式的定义是用于构建复杂对象的一种模式,所构建的对象往往需要多步初始化或赋值才能完成.那么,在实际的开发过程中,我们哪些地方适合用到Builder模式呢?其中使用Build ...

  8. 代理模式与java中的动态代理

    前言    代理模式又分为静态代理与动态代理,其中动态代理是Java各大框架中运用的最为广泛的一种模式之一,下面就用简单的例子来说明静态代理与动态代理. 场景    李雷是一个唱片公司的大老板,很忙, ...

  9. 10分钟了解 代理模式与java中的动态代理

    前言    代理模式又分为静态代理与动态代理,其中动态代理是Java各大框架中运用的最为广泛的一种模式之一,下面就用简单的例子来说明静态代理与动态代理. 场景    李雷是一个唱片公司的大老板,很忙, ...

随机推荐

  1. db2数据库新手可能碰到的问题及详解(部分内容来自网络搜索)

    一.db2安装好之后出现乱码,菜单栏呈现方框状,此时选择菜单第五项,点击选择下拉菜单中的最后一项,打开选择标签卡的第三项(字体),如果是无衬线都改为有衬线,如果是有衬线改为无衬线.乱码即可解决(网上一 ...

  2. CSS媒体查询,CSS根据不同的分辨率显示不同的样式

    在写自适应网页的时候,我们需要网页有几种显示方式,我们可以用CSS实现这个功能 使用CSS提供的媒体查询,我们可以根据屏幕分辨率来使用相应的CSS样式 @media screen and (max-w ...

  3. light oj 1140 - How Many Zeroes? 数位DP

    思路:dp[i][j]:表示第i位数,j表示是否有0. 代码如下: #include<iostream> #include<stdio.h> #include<algor ...

  4. H.264编码之IDCT变换原理

    上次讲到了DCT变换的推导过程,这次主要给大家讲下IDCT反变换的推导过程.建议大家先看上次讲的DCT变换公式推导过程.这样在看这篇文章时会容易很多!话不多说,让我们开始IDCT的旅程吧! IDCT反 ...

  5. 编程实现Windows关机、重启、注销

    要想编程使Windows关机.重启或者注销,可以使用ExWindowsEx这个API函数,该函数只有两个参数,第一个表示关机动作的标志,也就是你要让该函数关机呢,还是重启,还是注销等.可以使用EWX_ ...

  6. erlang学习笔记(1)

    提示符erl 注释% comment 表达式123456789 * 123456789. 变量(单一赋值)X = 123456789.X.Y = X * X * X.Y.f(). 整数浮点数X = 5 ...

  7. Hello China操作系统STM32移植指南(一)

    Hello China操作系统移植指南 首先说明一下,为了适应更多的文化背景,对Hello China操作系统的名字做了修改,修改为"Hello X",或者连接在一起,写为&quo ...

  8. 【HDU】4888 Redraw Beautiful Drawings 网络流【推断解是否唯一】

    传送门:pid=4888">[HDU]4888 Redraw Beautiful Drawings 题目分析: 比赛的时候看出是个网络流,可是没有敲出来.各种反面样例推倒自己(究其原因 ...

  9. Spring定时任务解决博客缓存数据更新问题

    最近在做博客系统的时候,由于很多页面都有右边侧边栏,内容包括博客分类信息,归档日志,热门文章,标签列表等,为了不想每次访问页面都去查询数据库,因为本身这些东西相对来说是比较固定的,但是也有可能在网站后 ...

  10. SparkMLlib分类算法之支持向量机

    SparkMLlib分类算法之支持向量机 (一),概念 支持向量机(support vector machine)是一种分类算法,通过寻求结构化风险最小来提高学习机泛化能力,实现经验风险和置信范围的最 ...