转载自: https://www.cnblogs.com/wudimanong/p/10558710.html

导读

在之前的文章中我们介绍了如何基于RocketMQ搭建生产级消息集群,以及2PC、3PC和TCC等与分布式事务相关的基本概念(没有读过的读者详见👇推荐阅读)。在这篇文章中我们将介绍RocketMQ的事务消息相关的内容,并通过一些实践和大家一起来探索下事务消息如何解决分布式系统中的分布式事务问题。

事务消息原理

事务消息特性可以看作是两阶段协议的消息实现方式,用以确保在以消息中间件解耦的分布式系统中本地事务的执行和消息的发送,可以以原子的方式进行

举个例子,以某互联网公司的用户余额充值为例,因为有充返活动(充值100元赠送20元),优惠比较大,用户Joe禁不住诱惑用支付宝向自己的余额账户充值了100元,支付成功后Joe的余额账户有了120元钱。

而该公司的关于用户余额充值的系统设计是这样的:

原理

在这个设计流程中,该公司通过自建支付系统完成用户Joe的支付宝扣款操作,成功后需要更新支付流水的状态,因为用户的余额账户系统与支付系统之间通过MQ解耦了,所以支付系统在完成支付流水状态更新后需要通过发送MQ消息到消息中间件服务,然后用户余额系统作为消费者通过消息消费的方式完成用户余额的增加操作。

这里有个问题:“支付系统如何确保这笔余额充值消息一定会成功发送到MQ,并且用户余额系统一定能处理成功呢”?如果支付系统在完成支付订单状态更新后,MQ消息发送失败或者用户余额系统消息处理失败的话,都会导致Joe支付扣款成功,而自己的余额账户却没到账的情况发生。

为了解决这个问题,按照目前的系统设计是需要“支付系统-MQ服务-用户余额系统”三者的处理满足数据的一致性要求。例如,如果支付系统感知到消息发送失败后还可以进行重新投递,从而确保支付系统与用户余额数据的最终一致性。

而上述问题就是事务消息要解决的问题,在具体了解RocketMQ提供的事务消息机制之前,我们先来看下在RocketMQ的早期版本不支持事务消息,或者因为历史原因选择的消息中间件本身就不支持事务消息的情况下,一些大公司是怎么解决这个问题的?

早期为了实现基于MQ异步调用的多个服务间,业务逻辑执行要么一起成功、要么一起失败,具备事务特点,通常会采用可靠消息最终一致性方案,来实现分布式事务。还是以Joe充值这件事来举例,可靠消息方案实现过程如下:

原理

在可靠消息最终一致性方案中,为了实现分布式事务,需要确保上游服务本地事务的处理与MQ消息的投递具有原子性,也就是说上游服务本地事务处理成功后要确保消息一定要成功投递到MQ服务,否则消息就不应该被投递到MQ服务;同样,被成功投递到MQ服务的消息,也一定要被下游服务成功处理,否则就需要重新投递MQ消息。

为了实现双向的原子性,可靠消息服务需要对消息进行状态标记,与此同时还需要对消息进行状态检查,从而实现重新投递及消息状态的最终一致性。核心流程说明如下:

1、上游服务(支付系统)如何确保完成自身支付成功状态更新后消息100%的能够投递到下游服务(用户余额系统)指定的Topic中?

在这个流程中上游服务在进行本地数据库事务操作前,会先发送一个状态为“待确认”的消息至可靠消息服务,而不是直接将消息投递到MQ服务的指定Topic。可靠消息服务此时会将该消息记录到自身服务的消息数据库中(消息状态为->待确认),完成后可靠消息服务会回调上游服务表示收到了消息,你们可以进行本地事务的操作了。

之后上游服务就会开启本地数据库事务执行业务逻辑操作,这里支付系统就会将该笔支付订单状态更新为“已成功”。(注意,这里只是举个示例场景,在真正的实践中一般是不会把支付订单本身的状态与业务端回调放在一个事务流程中的,关于这部分的详细说明我们在下面的场景说明中再讨论)。

如果上游服务本地数据库事务执行成功,则继续向可靠消息服务发送消息确认消息,此时可靠消息服务就会正式将消息投递到MQ服务,并且同时更新消息数据库中的消息状态为“已发送”。(注意,这里可靠消息服务更新消息状态与投递消息至MQ也必须是在一个原子操作中,即消息投递成功则一定要将消息状态更新为“已发送”,所以在编程的细节中,可靠消息服务一般会先更新消息状态,然后再进行消息投递,这样即使消息投递失败,也可以对消息状态进行回滚->“待确认”,相反如果先进行消息投递再更新消息状态,可能就不好控制了)。

相反,如果上游本地数据库事务执行失败,则需要向可靠消息服务发送消息删除消息,可靠消息服务此时就会将消息删除,这样就意味着事务在上游消息投递过程中就被回滚了,而流程也就此结束了,此时上游服务可以需要通过业务逻辑的设计进行重发,这个就不再分布式事务的讨论范畴了。

说到这里,大家可能会有疑问了!因为在上述描述中,即使上游服务本地数据库事务执行成功了,但是在发送确认消息至可靠消息服务的过程中,以及可靠消息服务在投递消息至MQ服务的过程中,还是会存在失败的风险,这样的话还是会导致支付服务更新了状态,但是用户余额系统连消息都没有收到的情况发生?

实际上,实现数据一致性是一个复杂的活。在这个方案中可靠消息服务作为基础性的服务除了执行正常的逻辑外,还得处理复杂的异常场景。在实现过程中可靠消息服务需要启动相应的后台线程,不断轮训消息的状态,这里会轮训消息状态为“待确认”的消息,并判断该消息的状态的持续时间是否超过了规定的时间,如果超过规定时间的消息还处于“待确认”的状态,就会触发上游服务状态询问机制

可靠消息服务就会调用上游服务提供的相关借口,询问这笔消息的处理情况,如果这笔消息在上游服务处理成功,则后台线程就会继续触发上图中的步骤5,更新消息状态为“已发送”并投递消息至MQ服务;反之如果这笔消息上游服务处理失败,可靠消息服务则会进行消息删除。通过这样以上机制就确保了“上游服务本地事务成功处理+消息成功投递”处于一个原子操作了。

2、下游服务(用户余额系统)如何确保对MQ服务Topic消息的消费100%都能处理成功?

在1的过程中,确保了上游服务逻辑处理与MQ消息的投递具备原子性,那么当消息被成功投递到了MQ服务的指定Topic后,下游服务如何才能确保消息的消费一定能被成功处理呢?

在正常的流程中,下游服务等待消费Topic的消息并进行自身本地数据库事务的处理,如果处理成功则会主动通知可靠消息服务,可靠消息服务此时就会将消息的状态更新为“已完成”;反之,处理失败下游服务就无法再主动向可靠消息服务发送通知消息了。

此时,与消息投递过程中的异常逻辑一样,可靠消息服务也会启动相应的后台线程,轮询一直处于“已发送”状态的消息,判断状态持续时间是否超过了规定时间,如果超时,可靠消息服务就会再次向MQ服务投递此消息,从而确保消息能被再次消费处理。(注意,也可能出现下游服务处理成功,但是通知消息发送失败的情况,所以为了确保幂等,下游服务也需要在业务逻辑上做好相应的防重处理)。

RocketMQ事务消息机制

在👆面第2小节的内容中,我们演示了一个自编写的中间服务+MQ来实现事务消息的示例。但是在现实的工作场景中,开发和维护一套可靠消息服务是一件很耗费资源和成本的事情,实际上,RocketMQ的最新版本(4.3.0+)中已经实现了可靠消息服务的所有功能,并且在保证高并发、高可用、高性能方面做了更为优秀的架构实现。

从设计逻辑上看RocketMQ所支持的分布式事务特性与上节中阐述的可靠消息服务基本上是一致的。只是RocketMQ在实现上相比较于可靠消息服务而言做了更为复杂的设计,并且因为天然与MQ服务本身紧密结合,所以在高可用、可靠性、性能等方面直接继承了MQ服务本身的架构优势。

下面我们就结合流程并通过示例代码的分析来和大家一起理解下利用RocketMQ是如何实现分布式事务操作的?

原理

在应用场景中分布式服务通过MQ通信的过程中,发送消息的一方我们称之为Producer,接收消费消息的一方我们称之为Consumer。如果Producer自身业务逻辑本地事务执行成功与否希望和消息的发送保持一个原子性(也就是说如果Producer本地事务执行成功,那么这笔消息就一定要被成功的发送到RocketMQ服务的指定Topic,并且Consumer一定要被消费成功;反之,如果Producer本地事务执行失败,那么这笔消息就应该被RocketMQ服务器丢弃)的话,RocketMQ是怎么做的呢?

  1. Producer选择使用RockerMQ提供的事务消息方法向RocketMQ服务发送事务消息(设置消息属性TRAN_MSG=TRUE);

  2. RocketMQ服务端在收到消息后会判断消息的属性是否为事务消息,如果是普通消息就直接Push给Consumer;如果是事务消息就会对该消息进行特殊处理设置事务ID,并暂时设置该消息对Consumer不可见,之后向Producer返回Pre消息发送状态(SEND_OK)。

  3. 之后Producer就会开始执行本地事务逻辑,并设置本地事务处理状态后向RocketMQ服务器发送该事务消息的确认/回滚消息(COMMIT_MESSAGE/ROLLBACK_MESSAGE)。

  4. RocketMQ服务器根据该笔事务消息的本地事务执行状态决定是否将消息Push给Consumer还是删除该消息。

  5. 之后Consumer就会消费该消息,执行Consumer的本地事务逻辑,如果执行成功则向RocketMQ返回“CONSUME_SUCCESS”;反之出现异常则需要返回“RECONSUME_LATER”,以便RocketMQ再次Push该消息,这一点在实际编程中需要控制好

正常情况下以上就是RocketMQ事务消息的基本运行流程了,但是从异常情况考虑,理论上也是存在Producer迟迟不发送确认或回滚消息的情况。与可靠消息服务一样,RocketMQ服务端也会设置后台线程去扫描消息状态,之后会调用Producer的本地checkLocalTransaction函数获取本地事务状态后继续进行第3步操作。

RocketMQ事务消息实现

相信看到这里,大家对于RocketMQ的分布式事务消息的理解应该有了一个相对清晰的概念了,那么在代码中如何编写呢?

在开发中使用RocketMQ的分布式事务消息Consumer的代码不需要有什么特别的变化与普通消息Consumer代码一致就可以。

Consumer示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static void main(String[] args) throws InterruptedException, MQClientException {

// Instantiate with specified consumer group name.
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_PAY_ACCOUNT");

// Specify name server addresses.
consumer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876");

// Subscribe one more more topics to consume.
consumer.subscribe("PAY_ACCOUNT", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt messageExt : msgs) {
System.out.println(new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});

//Launch the consumer instance.
consumer.start();
System.out.printf("Consumer Started.%n");
}

主要的改变是在Producer代码,我们需要额外编写一个实现执行本地事务逻辑,以及检查本地事务状态的类。示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class TransactionListenerImpl implements TransactionListener {

private AtomicInteger transactionIndex = new AtomicInteger(0);

private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.COMMIT_MESSAGE;
}

@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}

Producer示例代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class TransactionProducerTest {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("CID_PAY_ACCOUNT");
producer.setNamesrvAddr("10.211.55.4:9876;10.211.55.5:9876;10.211.55.6:9876");

ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});

producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();

String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};

try {
Map<String, String> paramMap = new HashMap<>();
paramMap.put("type", "6");
paramMap.put("bizOrderId", "15414012438257823");
paramMap.put("payOrderId", "15414012438257823");
paramMap.put("amount", "10");
paramMap.put("userId", "200001");
paramMap.put("tradeType", "charge");
paramMap.put("financeStatus", "0");//财务状态,应收
paramMap.put("channel", "a");//余额
paramMap.put("tradeTime", "20190101202022");
paramMap.put("nonce_str", "xkdkskskdksk");

//拼凑消息体
Message msg = new Message("PAY_ACCOUNT", "pre",paramMap.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);

Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}

Thread.sleep(10*1000);
producer.shutdown();
}
}

与非事务消息直接调用RocketMQ Client的send方法不同,事务消息发送需要设置事务监听器类,并调用sendMessageInTransaction方法,而这个方法的具体逻辑也就是上述流程中描述的那样,具体大家可以看下。

场景说明

目前RocketMQ消息中间件的使用场景比较广泛,对于需要通过MQ进行异步解耦的分布式应用系统来说,RocketMQ无疑是一个不错的技术选择。接下来,我们就以对数据一致性要求非常高的分布式支付系统为例,来看看基于RocketMQ的事务消息适用于哪些特定场景,从而实现支付系统数据的高度一致性。

事实上,支付系统的数据一致性是一个复杂的问题,原因在于支付流程的各个环节都存在异步的不确定性,例如支付系统需要跟第三方渠道进行交互,不同的支付渠道交互流程存在差异,并且有异步支付结果回调的情况。

除此以外,支付系统内部本身又是由多个不同子系统组成,除核心支付系统外,还有账务系统、商户通知系统等等,而核心支付系统本身也会被拆分为多个不同的服务模块,如风控、路由等用以实现不同的功能逻辑。某些场景我们无法通过分布式事务来实现数据一致性,只能通过额外的业务补偿手段,如二次轮训、支付对账等来实现数据最终一致性

综上所述,支付系统是一个复杂的系统,要完全实现数据的一致性单靠某一种手段是无法实现的,大部分情况下我们可以通过额外的业务补偿逻辑来实现数据最终一致性,只是这样补偿逻辑需要以更多的业务开发逻辑为代价,并且在时效性上会存在延迟的问题。

举个例子,支付核心系统支付成功后会更新自己的订单状态为支付成功,整个核心交易流程是一个比较实时同步的场景,如果出现数据不一致,会有额外的补偿逻辑如二次支付订单状态轮询、T+1日对账等用以确保支付状态数据的最终一致性。但是除了核心支付外,支付成功的结果是需要通知到支付账务系统、以及业务端系统,而为了确保性能,一般后续的通知就不会与主流程一样设计成实时同步,而是通过MQ异步解耦发送消息给独立的“通知响应模块”,而“通知响应模块”此时就可以通过分布式事务消息来与支付账户系统、业务端等系统实现数据一致性,从而减少需要补偿手段处理的范围,提高系统的数据一致性等级和灵敏度