本篇主要粗略介绍程序在启动时,连接mq做了哪些操作,你可能需要先自己阅读一遍源码,再来看本篇文章,或对照源码看本篇文章

1 抛开spring,创建一个简单生产者

1.1 安装windows版activemq

  1. 下载地址:http://activemq.apache.org/download.html
  2. 解压,启动bin\win64\activemq.bat
  3. 访问:http://localhost:8161/

1.2 生产者

  1. pom.xml

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.4</version>
    </dependency>
  2. Producter

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    public class Producter {
    //ActiveMq 的默认用户名
    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //ActiveMq 的默认登录密码
    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //ActiveMQ 的链接地址
    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    //链接工厂
    ConnectionFactory connectionFactory;
    //链接对象
    Connection connection;
    //事务管理
    Session session;
    public void init() {
    try {
    //创建一个链接工厂
    connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
    //从工厂中创建一个链接
    connection = connectionFactory.createConnection();
    //开启链接
    connection.start();
    //创建一个事务(这里通过参数可以设置事务的级别)
    session = connection.createSession(true, Session.SESSION_TRANSACTED);
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    public void sendMessage(String disname, String content) {
    try {
    //创建一个消息队列
    Queue queue = session.createQueue(disname);
    //消息生产者
    MessageProducer messageProducer = session.createProducer(queue);
    //创建一条消息
    TextMessage msg = session.createTextMessage(content);
    System.out.println(content);
    //发送消息
    messageProducer.send(msg);
    //提交事务
    session.commit();
    } catch (JMSException e) {
    session.rollback();
    e.printStackTrace();
    }
    }
    }
  3. 测试

    1
    2
    3
    4
    5
    public static void main(String[] args) {
    Producter producter = new Producter();
    producter.init();
    producter.sendMessage("test", "interest");
    }

2 分析init()方法

2.1 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);

    该语句只是将USERNAME, PASSWORD, BROKEN_URL设置到ActiveMQConnectionFactory的变量里面。

2.2 connection = connectionFactory.createConnection();

    createConnection()方法只是单纯调用createActiveMQConnection()方法,createActiveMQConnection的源码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws JMSException {
if (brokerURL == null) {
throw new ConfigurationException("brokerURL not set.");
}
ActiveMQConnection connection = null;
try {
// 2.2.1 创建Transport
Transport transport = createTransport();
// 2.2.2 由transport和状态管理器创建连接
connection = createActiveMQConnection(transport, factoryStats);

connection.setUserName(userName);
connection.setPassword(password);

// 看名字就知道配置connection的参数,它也的确只做了这样事,把new ActiveMQConnectionFactory()时处理好的参数set到connection中
configureConnection(connection);

//一切准备就绪,就可以启动了
transport.start();

if (clientID != null) {
connection.setDefaultClientID(clientID);
}
return connection;
} catch (JMSException e) {
// Clean up!
connection.close();
throw e;
} catch (Exception e) {
// Clean up!
connection.close();
throw JMSExceptionSupport.create("Could not connect to broker URL: " + brokerURL + ". Reason: " + e, e);
}
}
2.2.1 创建Transport,createTransport()

    粗略介绍:createTransport(),该方法首先从brokerURL中,取出scheme,scheme就是写在brokerUrl中的tcp、auto等关键字,然后根据scheme在一个保存了TransportFactory的ConcurrentMap中查找TransportFactory,没有就新new一个TransportFactory保存到ConcurrentMap中。然后调用TransportFactory.doConnect(),根据brokerUrl中的参数返回一个Transport的对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected Transport createTransport() throws JMSException {
try {
URI connectBrokerUL = brokerURL;
//取出scheme
String scheme = brokerURL.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
}
if (scheme.equals("auto")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
} else if (scheme.equals("auto+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
} else if (scheme.equals("auto+nio")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
} else if (scheme.equals("auto+nio+ssl")) {
connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
}
//2.2.1.1 根据scheme在Map中取出TransportFactory,获得Transport对象
return TransportFactory.connect(connectBrokerUL);
} catch (Exception e) {
throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
}
}

2.2.1.1 根据scheme在Map中取出TransportFactory,获得Transport对象

1
2
3
4
5
6
public static Transport connect(URI location) throws Exception {
//2.2.1.1.1 从ConcurrentMap中获取TransportFactory
TransportFactory tf = findTransportFactory(location);
//2.2.1.1.2 从TransportFactory返回Transport对象
return tf.doConnect(location);
}

2.2.1.1.1 从ConcurrentMap中获取TransportFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static TransportFactory findTransportFactory(URI location) throws IOException {
String scheme = location.getScheme();
if (scheme == null) {
throw new IOException("Transport not scheme specified: [" + location + "]");
}
//TRANSPORT_FACTORYS 保存着 TransportFactory的ConcurrentMap
TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
//没有,则返回一个新对象并保存在map中
if (tf == null) {
// Try to load if from a META-INF property.
try {
tf = (TransportFactory)TRANSPORT_FACTORY_FINDER.newInstance(scheme);
TRANSPORT_FACTORYS.put(scheme, tf);
} catch (Throwable e) {
throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
}
}
return tf;
}

2.2.1.1.2 从TransportFactory返回Transport对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public Transport doConnect(URI location) throws Exception {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
if( !options.containsKey("wireFormat.host") ) {
options.put("wireFormat.host", location.getHost());
}
WireFormat wf = createWireFormat(options);
//2.2.1.1.2.1 创建Transport对象
Transport transport = createTransport(location, wf);
Transport rc = configure(transport, wf, options);
//remove auto
IntrospectionSupport.extractProperties(options, "auto.");

if (!options.isEmpty()) {
throw new IllegalArgumentException("Invalid connect parameters: " + options);
}
return rc;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
}
}

2.2.1.1.2.1 创建Transport对象

1
2
3
4
//由子类实现
protected Transport createTransport(URI location, WireFormat wf) throws MalformedURLException, UnknownHostException, IOException {
throw new IOException("createTransport() method not implemented!");
}

2.2.1.1.2.2 createTransport()由子类实现,TcpTransportFactory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException, IOException {
URI localLocation = null;
String path = location.getPath();
// see if the path is a local URI location
if (path != null && path.length() > 0) {
int localPortIndex = path.indexOf(':');
try {
Integer.parseInt(path.substring(localPortIndex + 1, path.length()));
String localString = location.getScheme() + ":/" + path;
localLocation = new URI(localString);
} catch (Exception e) {
LOG.warn("path isn't a valid local location for TcpTransport to use", e.getMessage());
if(LOG.isDebugEnabled()) {
LOG.debug("Failure detail", e);
}
}
}
//以上算是都在校验url
//这一句new DefaultSocketFactory()对象返回
SocketFactory socketFactory = createSocketFactory();

// new一个TcpTransport对象,将wf,socketFactory放入该对象的变量里面
return createTcpTransport(wf, socketFactory, location, localLocation);
}
2.2.2 由transport和状态管理器创建连接

    粗略介绍:createActiveMQConnection()字面意思就是创建一个activeMQ的连接。这个连接是根据transport,和一个连接状态管理器JMSStats创建的,源码如下。

1
2
3
4
5
6
7
8
9
protected ActiveMQConnection createActiveMQConnection(Transport transport, JMSStatsImpl stats) throws Exception {
//getConnectionIdGenerator(), getClientIdGenerator()这是一个同步方法,返回IdGenerator对象
//transport 上面代码生成的Transport对象
//stats 是一个JMSStatsImpl对象,有提供set方法,修改连接状态
//2.2.2.2 ActiveMQConnection的构造
ActiveMQConnection connection = new ActiveMQConnection(transport, getClientIdGenerator(),
getConnectionIdGenerator(), stats);
return connection;
}

*2.2.2.1 JMSStatsImpl *

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class JMSStatsImpl extends StatsImpl
{
private List connections;
public JMSStatsImpl()
{
//CopyOnWriteArrayList,一开是所有线程共享内容,当你需要修改时将内容复制出来再修改,然后把旧的地址指向新地址,这样可以达到并发的读,而不需要加锁
connections = new CopyOnWriteArrayList();
}
public JMSConnectionStatsImpl[] getConnections()
{
Object connectionArray[] = connections.toArray();
int size = connectionArray.length;
JMSConnectionStatsImpl answer[] = new JMSConnectionStatsImpl[size];
for(int i = 0; i < size; i++)
{
ActiveMQConnection connection = (ActiveMQConnection)connectionArray[i];
answer[i] = connection.getConnectionStats();
}

return answer;
}
public void addConnection(ActiveMQConnection connection)
{
connections.add(connection);
}
public void removeConnection(ActiveMQConnection connection)
{
connections.remove(connection);
}
public void dump(IndentPrinter out)
{
out.printIndent();
out.println("factory {");
out.incrementIndent();
JMSConnectionStatsImpl array[] = getConnections();
for(int i = 0; i < array.length; i++)
{
JMSConnectionStatsImpl connectionStat = array[i];
connectionStat.dump(out);
}

out.decrementIndent();
out.printIndent();
out.println("}");
out.flush();
}
public void setEnabled(boolean enabled)
{
super.setEnabled(enabled);
JMSConnectionStatsImpl stats[] = getConnections();
int size = stats.length;
for(int i = 0; i < size; i++)
stats[i].setEnabled(enabled);
}
}

2.2.2.2 ActiveMQConnection的构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
protected ActiveMQConnection(final Transport transport, IdGenerator clientIdGenerator, IdGenerator connectionIdGenerator, JMSStatsImpl factoryStats) throws Exception {

this.transport = transport;
this.clientIdGenerator = clientIdGenerator;
this.factoryStats = factoryStats;

// Configure a single threaded executor who's core thread can timeout if
// idle
executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "ActiveMQ Connection Executor: " + transport);
//守护线程,用于关闭,检测心跳等 - see https://issues.apache.org/jira/browse/AMQ-796
//thread.setDaemon(true);
return thread;
}
});
//连接信息,赋值
// asyncConnectionThread.allowCoreThreadTimeOut(true);
String uniqueId = connectionIdGenerator.generateId();
this.info = new ConnectionInfo(new ConnectionId(uniqueId));
this.info.setManageable(true);
this.info.setFaultTolerant(transport.isFaultTolerant());
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);

this.transport.setTransportListener(this);

//sessions 也是一个 CopyOnWriteArrayList,用于保存session,启动时size = 0
this.stats = new JMSConnectionStatsImpl(sessions, this instanceof XAConnection);
this.factoryStats.addConnection(this);
this.timeCreated = System.currentTimeMillis();
this.connectionAudit.setCheckForDuplicates(transport.isFaultTolerant());
}

2.3 session = connection.createSession(true, Session.SESSION_TRANSACTED);

    该方法返回一个seesion,该session用于操作activemq,该方法先做了保护性校验查看连接是否关闭,如果没有,那么将ConnectionInfo发送给broker,没有异常情况下,new 一个 Session对象返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
ensureConnectionInfoSent();
if (!transacted) {
if (acknowledgeMode == Session.SESSION_TRANSACTED) {
throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
} else if (acknowledgeMode < Session.SESSION_TRANSACTED || acknowledgeMode > ActiveMQSession.MAX_ACK_CONSTANT) {
throw new JMSException("invalid acknowledgeMode: " + acknowledgeMode + ". Valid values are Session.AUTO_ACKNOWLEDGE (1), " +
"Session.CLIENT_ACKNOWLEDGE (2), Session.DUPS_OK_ACKNOWLEDGE (3), ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4) or for transacted sessions Session.SESSION_TRANSACTED (0)");
}
}
return new ActiveMQSession(this, getNextSessionId(), transacted ? Session.SESSION_TRANSACTED : acknowledgeMode, isDispatchAsync(), isAlwaysSessionAsync());
}

3 分析sendMessage()方法

    这个方法基本上就属于业务层了,你需要关心的是Queue,Topic。
可参考点击查看