首发于taowen

转账业务实现最终一致性的6个阶段

阶段一:SELECT & UPDATE

https://github.com/taowen/colorfour/blob/master/example/take1/Transfer.go

SELECT 出来,判断余额,再进行 UPDATE。需要很强的数据库隔离级别来保证。

阶段二:UPDATE WHERE

github.com/taowen/color
var updateAmountSql = sqlxx.Translate(
	`UPDATE account SET amount=amount+:delta
	WHERE account_id=:account_id AND amount+:delta > 0`)

UPDATE 的时候,添加 WHERE 条件,判断余额。仅仅依赖SQL的单条事务。

缺点是不支持幂等。无法做到转账两次,不重复发生。

阶段三:INSERT EVENT & UPDATE

github.com/taowen/color

除了更新balance之外,还额外INSERT一条event。通过数据库的主键不重复的特性,保证第二次操作的时候,可以知道钱已经扣过一次了,不需要操作第二次。利用数据库的事务来实现INSERT EVENT 和 UPDATE的同时成功。

这样解决了幂等的问题。但是缺点是收款方和付款方的账户需要在同一个数据库实例上。

阶段四:分拆成两个独立的事务

github.com/taowen/color
package take4

import (
	"github.com/taowen/sqlxx"
	"github.com/taowen/colorfour/tristate"
)

func Transfer(conn *sqlxx.Conn, referenceNumber, from, to string, amount int) *tristate.TriState {
	result := updateBalance(conn, referenceNumber+"_"+from, from, -int64(amount))
	if result.IsFailure() || result.IsUnknown() {
		return result
	}
	result = updateBalance(conn, referenceNumber+"_"+to, to, int64(amount))
	if result.IsSuccess() || result.IsUnknown() {
		return result
	}
	rollbackResult := updateBalance(conn, referenceNumber+"_"+from+"_rollback", from, int64(amount))
	if rollbackResult.IsSuccess() {
		return result
	}
	return rollbackResult
}

从from账户里扣钱,和加钱到to账户里。做成两个独立的数据库事务。由应用层来保证数据的最终一致性。当返回的结果是tristate(三态)里的unknown的时候,系统的数据处于不一致的状态。需要重试Transfer函数,来尝试把数据恢复到稳定的状态(success或者failure)。如果重试多次仍然失败,转给人工对账。

demo里面的自动重试功能没有实现。可以考虑使用disque设置延迟job的方式来重新调用自身,实现重试。

阶段五:两阶段提交

github.com/taowen/color

前一个实现的缺陷是钱直接加到可使用的账户上的。如果这部分钱被立即使用了,然后又有数据的回滚,则会导致账户余额变成负数。为了避免事务处理过程中的钱被其他事务使用掉,需要把这部分钱冻结到独立的账户。

package take5

import (
	"github.com/taowen/sqlxx"
	"github.com/taowen/colorfour/tristate"
	"fmt"
)

type Action func() *tristate.TriState
type Step struct {
	Description string
	Apply       Action
	Rollback    Action
}

func executeStepsEventually(steps []Step) *tristate.TriState {
	executedSteps := []Step{}
	var result *tristate.TriState
	for _, step := range steps {
		fmt.Println(fmt.Sprintf("apply %s", step.Description))
		result = step.Apply()
		if result.IsUnknown() {
			fmt.Println("unknown failure: quit")
			return result
		}
		if result.IsFailure() {
			fmt.Println("faield: start to rollback")
			break
		}
		executedSteps = append(executedSteps, step)
	}
	if result.IsSuccess() {
		fmt.Println("all success")
		return result
	}
	for i := len(executedSteps) - 1; i >= 0; i-- {
		fmt.Println(fmt.Sprintf("rollback %s", executedSteps[i].Description))
		rollbackResult := executedSteps[i].Rollback()
		if rollbackResult.IsFailure() || rollbackResult.IsUnknown() {
			fmt.Println("failed to rollback: quit")
			return rollbackResult
		}
	}
	fmt.Println("all rolled back")
	return result
}

func Transfer(conn *sqlxx.Conn, referenceNumber, from, to string, amount int) *tristate.TriState {
	return executeStepsEventually([]Step{
		{fmt.Sprintf("transfer %v from %v to %v", amount, from, from+"_staging"),
			func() *tristate.TriState {
				return directTransfer(conn, referenceNumber, from, from+"_staging", amount)
			}, func() *tristate.TriState {
			return directTransfer(conn, referenceNumber+"_rollback", from+"_staging", from, amount)
		}},
		{fmt.Sprintf("transfer %v from %v to %v", amount, from+"_staging", to+"_staging"),
			func() *tristate.TriState {
				return directTransfer(conn, referenceNumber, from+"_staging", to+"_staging", amount)
			}, func() *tristate.TriState {
			return directTransfer(conn, referenceNumber+"_rollback", to+"_staging", from+"_staging", amount)
		}},
		{fmt.Sprintf("transfer %v from %v to %v", amount, to+"_staging", to),
			func() *tristate.TriState {
				return directTransfer(conn, referenceNumber, to+"_staging", to, amount)
			}, func() *tristate.TriState {
			return directTransfer(conn, referenceNumber+"_rollback", to, to+"_staging", amount)
		}},
	})
}

func directTransfer(conn *sqlxx.Conn, referenceNumber, from, to string, amount int) *tristate.TriState {
	return executeStepsEventually([]Step{
		{fmt.Sprintf("subtract %v from %v", amount, from),
			func() *tristate.TriState {
				return updateBalance(conn, referenceNumber+"_"+from, from, -int64(amount))
			}, func() *tristate.TriState {
			return updateBalance(conn, referenceNumber+"_"+from+"_rollback", from, int64(amount))
		}},
		{fmt.Sprintf("add %v to %v", amount, to),
			func() *tristate.TriState {
				return updateBalance(conn, referenceNumber+"_"+to, to, int64(amount))
			}, func() *tristate.TriState {
			return updateBalance(conn, referenceNumber+"_"+to+"_rollback", to, -int64(amount))
		}},
	})
}

冻结是通过从acc1转给acc1_staging这个特殊账号来实现的。所以对于 balance 和 balance_update_event 都没有做修改(没有引入frozen_amount或者记录2pc status这样的字段)。

对于转账失败的情况,钱会原路退回

apply transfer 100 from acc1 to acc1_staging
apply subtract 100 from acc1
apply add 100 to acc1_staging
all success
apply transfer 100 from acc1_staging to acc2_staging
apply subtract 100 from acc1_staging
apply add 100 to acc2_staging
all success
apply transfer 100 from acc2_staging to acc2
apply subtract 100 from acc2_staging
apply add 100 to acc2
faield: start to rollback
rollback subtract 100 from acc2_staging
all rolled back
faield: start to rollback
rollback transfer 100 from acc1_staging to acc2_staging
apply subtract 100 from acc2_staging
apply add 100 to acc1_staging
all success
rollback transfer 100 from acc1 to acc1_staging
apply subtract 100 from acc1_staging
apply add 100 to acc1
all success
all rolled back

阶段六:热点账户

当一个账户下的交易特别频繁,amount字段会被频繁读写。这就产生了锁。对于这样的热点账户,我们需要把余额和流水记录到多个子账户下,避免所有的交易去争抢一个锁。

  1. 先查询子账户余额,计算转账计划(从哪些子账户转到哪些子账户)
  2. 执行转账计划
  3. 如果失败,尝试重新查询子账户余额,计算新的转账计划
  4. 执行转账计划
  5. 如果失败,放弃
发布于 2017-05-15 09:51