一、AvticeMQ

    ActiveMQ 是Apache出品的开源消息总线。完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现

二、JMS

    JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

三、JMS支持的两种消息传递模型

1. Queue(点对点 point-to-point,简称PTP)

    通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。消息目的地类型是队列(即由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。

    消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得

2. Topic(发布/订阅 publish/subscribe,简称pub/sub)

    通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。

    消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息

3. Queue 和 Topic 具体区别
Tables Topic Queue
概要 Publish Subscribe messaging 发布订阅消息 Point-to-Point 点对点
有无状态 topic数据默认不落地,是无状态的。 Queue数据默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在$AMQ_HOME\data\kr-store\data下面。也可以配置成DB存储。
完整性保障 并不保证publisher发布的每条数据,Subscriber都能接受到。 Queue保证每条数据都能被receiver接收。
消息是否会丢失 一般来说publisher发布消息到某一个topic时,只有正在监听该topic地址的sub能够接收到消息;如果没有sub在监听,该topic就丢失了。 Sender发送消息到目标Queue,receiver可以异步接收这个Queue上的消息。Queue上的消息如果暂时没有receiver来取,也不会丢失。
消息发布接收策略 一对多的消息发布接收策略,监听同一个topic地址的多个sub都能收到publisher发送的消息。Sub接收完通知mq服务器 一对一的消息发布接收策略,一个sender发送的消息,只能有一个receiver接收。receiver接收完后,通知mq服务器已接收,mq服务器对queue里的消息采取删除或其他操作。

四、AvticeMQ 配置

failover
1
2
3
4
5
failover:(tcp://192.168.1.118:61616?connectionTimeout=3000)
?timeout=5000
&initialReconnectDelay=1000
&maxReconnectDelay=1000
&nested.wireFormat.maxInactivityDuration=30000

initialReconnectDelay:默认为10,单位毫秒,表示第一次尝试重连之前等待的时间。
maxReconnectDelay:默认30000,单位毫秒,表示两次重连之间的最大时间间隔。
nested.wireFormat.maxInactivityDuration 检测死连接时间
其它相关配置

五、ActiveMQ 使用

    一种典型的JMS 程序需要经过下列几个步骤:

  1. 通过 JNDI 查找 ConnectionFactory。
  2. 用 ConnectionFactory 创建一个 Connection。
  3. 用 Connection 创建一个或多个 Session。
  4. 用 Session 和 Destination 创建所需的 MessageProducer 和 MessageConsumer。
  5. 启动 Connection。 (下面是对应的截图说明)
1. 连接工厂 (ConnectionFactory)

    连接工厂是客户用来创建连接的对象.

    例如: ActiveMQ 提供的ActiveMQConnectionFactory。注意(要初始化 JMS,则需要使用连接工厂。客户端通过创建ConnectionFactory建立到 ActveMQ的连接,一个连接工厂封装了一组连接配置参数,这组参数在配置ActiveMQ时已经定义,例如brokerURL参数,此参数传入的是ActiveMQ服务地址和端口,支持openwire协议的默认连接为 tcp://localhost:61616,支持 stomp协议的默认连接为tcp://localhost:61613。

ActiveMQConnectionFactory构造方法:
1
2
3
4
5
ActiveMQConnectionFactory();
ActiveMQConnectionFactory(String brokerURL);
ActiveMQConnectionFactory(String userName, String password, String b rokerURL) ;
ActiveMQConnectionFactory(String userName, String password, URI brok erURL) ;
ActiveMQConnectionFactory(URI brokerURL);

其中 brokerURL为ActiveMQ服务地址和端口。

spring封装的:
  • SingleConnectionFactory:对于建立JMS服务器链接的请求会一直返回同一个链接,并且会忽略Connection的close方法调用。(
    org.springframework.jms.connection.SingleConnectionFactory)

  • CachingConnectionFactory:继承了SingleConnectionFactory,所以它拥有SingleConnectionFactory的所有功能,同时它还新增了缓存功能,它可以缓存Session、MessageProducer和MessageConsumer。我们使用CachingConnectionFactory来作为示例。(
    org.springframework.jms.connection.CachingConnectionFactory)

  • PooledConnectionFactory:线程池(org.apache.activemq.pool.PooledConnectionFactory)

2. 连接 (Connection)

    Connection 封装了客户与 JMS 提供者之间的一个虚拟的连接。

    当一个Connection被创建时,它的传输默认是关闭的,必须使用start方法开启。一个Connection可以建立一个或多个的Session。当一个程序执行完成后,必须关闭之前创建的Connection,否则 ActiveMQ不能释放资源,关闭一个Connection同样也关闭了 Session,MessageProducer和MessageConsumer。

3. 会话(Seesion)

    Session 是生产和消费消息的一个单线程上下文。

    会话用于创建消息生产者(producer)、消息消费者(consumer)和消息(message)等。会话提供了一个事务性的上下文,在这个上下文中,一组发送和接收被组合到了一个原子操作中。

六、ActiveMQ的多种部署方式

具体介绍请查看文章后面部分:查看

1. Master-Slave部署方式
shared filesystem Master-Slave部署方式
shared database Master-Slave方式
Replicated LevelDB Store方式
2. Broker-Cluster部署方式
static Broker-Cluster部署
Dynamic Broker-Cluster部署
3. Master-Slave与Broker-Cluster相结合的部署方式