转账业务实现最终一致性的6个阶段
阶段一:SELECT & UPDATE
https://github.com/taowen/colorfour/blob/master/example/take1/Transfer.goSELECT 出来,判断余额,再进行 UPDATE。需要很强的数据库隔离级别来保证。
阶段二:UPDATE WHERE
https://github.com/taowen/colorfour/blob/master/example/take2/Transfer.govar updateAmountSql = sqlxx.Translate(
`UPDATE account SET amount=amount+:delta
WHERE account_id=:account_id AND amount+:delta > 0`)
UPDATE 的时候,添加 WHERE 条件,判断余额。仅仅依赖SQL的单条事务。
缺点是不支持幂等。无法做到转账两次,不重复发生。
阶段三:INSERT EVENT & UPDATE
https://github.com/taowen/colorfour/tree/master/example/take3除了更新balance之外,还额外INSERT一条event。通过数据库的主键不重复的特性,保证第二次操作的时候,可以知道钱已经扣过一次了,不需要操作第二次。利用数据库的事务来实现INSERT EVENT 和 UPDATE的同时成功。
这样解决了幂等的问题。但是缺点是收款方和付款方的账户需要在同一个数据库实例上。
阶段四:分拆成两个独立的事务
https://github.com/taowen/colorfour/tree/master/example/take4package take4
import (
"github.com/taowen/sqlxx"
"github.com/taowen/colorfour/tristate"
)
func Transfer(conn *sqlxx.Conn, referenceNumber, from, to string, amount int) *tristate.TriState {
result := updateBalance(conn, referenceNumber+"_"+from, from, -int64(amount))
if result.IsFailure() || result.IsUnknown() {
return result
}
result = updateBalance(conn, referenceNumber+"_"+to, to, int64(amount))
if result.IsSuccess() || result.IsUnknown() {
return result
}
rollbackResult := updateBalance(conn, referenceNumber+"_"+from+"_rollback", from, int64(amount))
if rollbackResult.IsSuccess() {
return result
}
return rollbackResult
}
从from账户里扣钱,和加钱到to账户里。做成两个独立的数据库事务。由应用层来保证数据的最终一致性。当返回的结果是tristate(三态)里的unknown的时候,系统的数据处于不一致的状态。需要重试Transfer函数,来尝试把数据恢复到稳定的状态(success或者failure)。如果重试多次仍然失败,转给人工对账。
demo里面的自动重试功能没有实现。可以考虑使用disque设置延迟job的方式来重新调用自身,实现重试。
阶段五:两阶段提交
https://github.com/taowen/colorfour/tree/master/example/take5前一个实现的缺陷是钱直接加到可使用的账户上的。如果这部分钱被立即使用了,然后又有数据的回滚,则会导致账户余额变成负数。为了避免事务处理过程中的钱被其他事务使用掉,需要把这部分钱冻结到独立的账户。
package take5
import (
"github.com/taowen/sqlxx"
"github.com/taowen/colorfour/tristate"
"fmt"
)
type Action func() *tristate.TriState
type Step struct {
Description string
Apply Action
Rollback Action
}
func executeStepsEventually(steps []Step) *tristate.TriState {
executedSteps := []Step{}
var result *tristate.TriState
for _, step := range steps {
fmt.Println(fmt.Sprintf("apply %s", step.Description))
result = step.Apply()
if result.IsUnknown() {
fmt.Println("unknown failure: quit")
return result
}
if result.IsFailure() {
fmt.Println("faield: start to rollback")
break
}
executedSteps = append(executedSteps, step)
}
if result.IsSuccess() {
fmt.Println("all success")
return result
}
for i := len(executedSteps) - 1; i >= 0; i-- {
fmt.Println(fmt.Sprintf("rollback %s", executedSteps[i].Description))
rollbackResult := executedSteps[i].Rollback()
if rollbackResult.IsFailure() || rollbackResult.IsUnknown() {
fmt.Println("failed to rollback: quit")
return rollbackResult
}
}
fmt.Println("all rolled back")
return result
}
func Transfer(conn *sqlxx.Conn, referenceNumber, from, to string, amount int) *tristate.TriState {
return executeStepsEventually([]Step{
{fmt.Sprintf("transfer %v from %v to %v", amount, from, from+"_staging"),
func() *tristate.TriState {
return directTransfer(conn, referenceNumber, from, from+"_staging", amount)
}, func() *tristate.TriState {
return directTransfer(conn, referenceNumber+"_rollback", from+"_staging", from, amount)
}},
{fmt.Sprintf("transfer %v from %v to %v", amount, from+"_staging", to+"_staging"),
func() *tristate.TriState {
return directTransfer(conn, referenceNumber, from+"_staging", to+"_staging", amount)
}, func() *tristate.TriState {
return directTransfer(conn, referenceNumber+"_rollback", to+"_staging", from+"_staging", amount)
}},
{fmt.Sprintf("transfer %v from %v to %v", amount, to+"_staging", to),
func() *tristate.TriState {
return directTransfer(conn, referenceNumber, to+"_staging", to, amount)
}, func() *tristate.TriState {
return directTransfer(conn, referenceNumber+"_rollback", to, to+"_staging", amount)
}},
})
}
func directTransfer(conn *sqlxx.Conn, referenceNumber, from, to string, amount int) *tristate.TriState {
return executeStepsEventually([]Step{
{fmt.Sprintf("subtract %v from %v", amount, from),
func() *tristate.TriState {
return updateBalance(conn, referenceNumber+"_"+from, from, -int64(amount))
}, func() *tristate.TriState {
return updateBalance(conn, referenceNumber+"_"+from+"_rollback", from, int64(amount))
}},
{fmt.Sprintf("add %v to %v", amount, to),
func() *tristate.TriState {
return updateBalance(conn, referenceNumber+"_"+to, to, int64(amount))
}, func() *tristate.TriState {
return updateBalance(conn, referenceNumber+"_"+to+"_rollback", to, -int64(amount))
}},
})
}
冻结是通过从acc1转给acc1_staging这个特殊账号来实现的。所以对于 balance 和 balance_update_event 都没有做修改(没有引入frozen_amount或者记录2pc status这样的字段)。
对于转账失败的情况,钱会原路退回
apply transfer 100 from acc1 to acc1_staging
apply subtract 100 from acc1
apply add 100 to acc1_staging
all success
apply transfer 100 from acc1_staging to acc2_staging
apply subtract 100 from acc1_staging
apply add 100 to acc2_staging
all success
apply transfer 100 from acc2_staging to acc2
apply subtract 100 from acc2_staging
apply add 100 to acc2
faield: start to rollback
rollback subtract 100 from acc2_staging
all rolled back
faield: start to rollback
rollback transfer 100 from acc1_staging to acc2_staging
apply subtract 100 from acc2_staging
apply add 100 to acc1_staging
all success
rollback transfer 100 from acc1 to acc1_staging
apply subtract 100 from acc1_staging
apply add 100 to acc1
all success
all rolled back
阶段六:热点账户
当一个账户下的交易特别频繁,amount字段会被频繁读写。这就产生了锁。对于这样的热点账户,我们需要把余额和流水记录到多个子账户下,避免所有的交易去争抢一个锁。
- 先查询子账户余额,计算转账计划(从哪些子账户转到哪些子账户)
- 执行转账计划
- 如果失败,尝试重新查询子账户余额,计算新的转账计划
- 执行转账计划
- 如果失败,放弃