Transactional outbox pattern
如果无法正常显示,请先停止浏览器的去广告插件。
        
                相关话题:
                                    #zalando
                            
                        
                1. Transactional outbox pattern
Helsinki Gophers meetup
22 Jan 2025
Nikolay Kuznetsov
@nikolayk812
1            
                        
                2. About me
Senior software engineer @Zalando Oy
C → Java → Kotlin → Go
Conference speaker in 2019/20
ice-skating, kayaking, hiking, chess
learning Finnish, Swedish, Italian
2            
                        
                3. About Zalando
Go-to-destination for fashion and lifestyle in Europe
25 countries, ~50 millions active customers
9 tech hubs, including Helsinki with ~150 employees
3            
                        
                4. Dual write problem
4            
                        
                5. Dual write problem
ServiceA avoids direct sync HTTP/gRPC calls to ServiceB
Data consistency challenge if one of writes fails
Messages could be events notifications and state transfers
Databases: PostgreSQL, MySQL, MariaDB, etc
Message brokers: Kafka, SNS, NATS, etc
5            
                        
                6. Naive solutions
1. Write to DB first, then write to broker
2. Write to broker first, then write to DB
3. Write to broker from a DB transaction, then commit
6            
                        
                7. Database first
7            
                        
                8. Message broker first
8            
                        
                9. Message from transaction
9            
                        
                10. Proper solutions
Transactional outbox pattern
Change data capture (CDC)
Mix of them
10            
                        
                11. Transactional outbox pattern
11            
                        
                12. Transactional outbox pattern
+ Atomicity: domain entity vs outbox message
+ At least once delivery to message broker
- Boilerplate code, deployment unit (cronjob)
- Message consumer might need to deduplicate
- Outbox table polling introduces delay
- Autovacuum settings tuning due to MVCC
12            
                        
                13. Change data capture intro
keywords: write ahead log (WAL), logical replication, Debezium
+ Works on business and/or outbox tables
+ Near real time
+ No need for explicit SELECT/UPDATE
- Complex low-level protocol
- Not many mature products in Go ecosystem, AFAIK
13            
                        
                14. 14            
                        
                15. pgx-outbox
15            
                        
                16. pgx driver
for Postgres, high performance
11K stars at GitHub
different interface from database/sql
can be adapted to database/sql
lib/pq is in maintenance mode
16            
                        
                17. Outbox message
type Message struct {
ID int64 // generated by Postgres
Broker string `validate:"required"`
Topic string `validate:"required"`
Metadata map[string]string // optional
Payload []byte `validate:"required,json"`
}
17            
                        
                18. Outbox table
--table name is customizable
CREATE TABLE IF NOT EXISTS outbox_messages
(
id
BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY,
broker
topic
metadata
payload
TEXT
TEXT
JSONB,
JSONB
NOT NULL,
NOT NULL,
NOT NULL,
created_at
TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
published_at TIMESTAMP
);
18            
                        
                19. Outbox writer
type Writer interface {
// Tx is empty interface to support both pgx.Tx and *sql.Tx
Write(ctx, tx Tx, message Message) (int64, error)
// pgx transaction only to invoke SendBatch and Prepare methods
WriteBatch(ctx, tx pgx.Tx, messages []Message) ([]int64, error)
}
19            
                        
                20. Add outbox writer
type repo struct {
pool *pgxpool.Pool
// new fields to use pgx-outbox
writer outbox.Writer
messageMapper ToMessageFunc[User] // can be a param instead
}
20            
                        
                21. Use outbox writer
func (r *repo) CreateUser(ctx, user User) (u User, txErr error) {
// create a transaction, commit/rollback in defer()
user, err = r.createUser(ctx, tx, user) // INSERT INTO users
if err != nil {
return u, fmt.Errorf("createUser: %w", err)
}
message, err := r.messageMapper(user)
// if err != nil {
_, err = r.writer.Write(ctx, tx, message) // INSERT INTO outbox_message
// if err != nil {
21            
                        
                22. Transaction handling
func (r *repo) CreateUser(ctx, user User) (u User, txErr error) {
tx, commitFunc, err := r.beginTx(ctx) // pool.Begin(ctx)
// if err != nil {
defer func() {
// commit or rollback depending on txErr
if cErr := commitFunc(txErr); cErr != nil {
txErr = fmt.Errorf("commitFunc: %w", cErr)
}
}()
user, err = r.createUser(ctx, tx, user)
22            
                        
                23. Demo outbox writer
23            
                        
                24. Demo results
24            
                        
                25. Message relay
25            
                        
                26. Outbox forwarder
type Forwarder interface {
Forward(ctx, limit int) (ForwardOutput, error)
}
type forwarder struct {
reader
Reader
publisher Publisher
filter
MessageFilter // optional
}
26            
                        
                27. Demo outbox forwarder
27            
                        
                28. Demo results
28            
                        
                29. Demo SQS reader
29            
                        
                30. Demo results
30            
                        
                31. Demo recap
31            
                        
                32. pgx-outbox recap
Simple, generic, extensible
Writer supports pgx and database/sql tx
Reader, Forwarder use pgx
Test coverage 80%
32            
                        
                33. Alternatives
watermill-sql: SQL Pub/Sub on top of Watermill library
dataddo/pgq: general queue on top of Postgres
jackc/pglogrepl: Postgres logical replication, low-level
Trendyol/go-pq-cdc: CDC for Postgres
PeerDB: streaming from Postgres to data warehouses, queues
33            
                        
                34. Testing in pgx-outbox
Testcontainers: Postgres, LocalStack modules
Mocks: vektra/mockery
Suite: stretchr/testify/suite
Linters: testifylint
34            
                        
                35. Future plans
Add support for CDC/WAL-based Reader
Explore capabilities of PeerDB
Add support for Kafka, NATS publishers
35            
                        
                36. Q & A
Thank you!
@nikolayk812
36            
                        
                37.