Transactional outbox pattern

如果无法正常显示,请先停止浏览器的去广告插件。
分享至:
相关话题: #zalando
1. Transactional outbox pattern Helsinki Gophers meetup 22 Jan 2025 Nikolay Kuznetsov @nikolayk812 1
2. About me Senior software engineer @Zalando Oy C → Java → Kotlin → Go Conference speaker in 2019/20 ice-skating, kayaking, hiking, chess learning Finnish, Swedish, Italian 2
3. About Zalando Go-to-destination for fashion and lifestyle in Europe 25 countries, ~50 millions active customers 9 tech hubs, including Helsinki with ~150 employees 3
4. Dual write problem 4
5. Dual write problem ServiceA avoids direct sync HTTP/gRPC calls to ServiceB Data consistency challenge if one of writes fails Messages could be events notifications and state transfers Databases: PostgreSQL, MySQL, MariaDB, etc Message brokers: Kafka, SNS, NATS, etc 5
6. Naive solutions 1. Write to DB first, then write to broker 2. Write to broker first, then write to DB 3. Write to broker from a DB transaction, then commit 6
7. Database first 7
8. Message broker first 8
9. Message from transaction 9
10. Proper solutions Transactional outbox pattern Change data capture (CDC) Mix of them 10
11. Transactional outbox pattern 11
12. Transactional outbox pattern + Atomicity: domain entity vs outbox message + At least once delivery to message broker - Boilerplate code, deployment unit (cronjob) - Message consumer might need to deduplicate - Outbox table polling introduces delay - Autovacuum settings tuning due to MVCC 12
13. Change data capture intro keywords: write ahead log (WAL), logical replication, Debezium + Works on business and/or outbox tables + Near real time + No need for explicit SELECT/UPDATE - Complex low-level protocol - Not many mature products in Go ecosystem, AFAIK 13
14. 14
15. pgx-outbox 15
16. pgx driver for Postgres, high performance 11K stars at GitHub different interface from database/sql can be adapted to database/sql lib/pq is in maintenance mode 16
17. Outbox message type Message struct { ID int64 // generated by Postgres Broker string `validate:"required"` Topic string `validate:"required"` Metadata map[string]string // optional Payload []byte `validate:"required,json"` } 17
18. Outbox table --table name is customizable CREATE TABLE IF NOT EXISTS outbox_messages ( id BIGINT PRIMARY KEY GENERATED ALWAYS AS IDENTITY, broker topic metadata payload TEXT TEXT JSONB, JSONB NOT NULL, NOT NULL, NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL, published_at TIMESTAMP ); 18
19. Outbox writer type Writer interface { // Tx is empty interface to support both pgx.Tx and *sql.Tx Write(ctx, tx Tx, message Message) (int64, error) // pgx transaction only to invoke SendBatch and Prepare methods WriteBatch(ctx, tx pgx.Tx, messages []Message) ([]int64, error) } 19
20. Add outbox writer type repo struct { pool *pgxpool.Pool // new fields to use pgx-outbox writer outbox.Writer messageMapper ToMessageFunc[User] // can be a param instead } 20
21. Use outbox writer func (r *repo) CreateUser(ctx, user User) (u User, txErr error) { // create a transaction, commit/rollback in defer() user, err = r.createUser(ctx, tx, user) // INSERT INTO users if err != nil { return u, fmt.Errorf("createUser: %w", err) } message, err := r.messageMapper(user) // if err != nil { _, err = r.writer.Write(ctx, tx, message) // INSERT INTO outbox_message // if err != nil { 21
22. Transaction handling func (r *repo) CreateUser(ctx, user User) (u User, txErr error) { tx, commitFunc, err := r.beginTx(ctx) // pool.Begin(ctx) // if err != nil { defer func() { // commit or rollback depending on txErr if cErr := commitFunc(txErr); cErr != nil { txErr = fmt.Errorf("commitFunc: %w", cErr) } }() user, err = r.createUser(ctx, tx, user) 22
23. Demo outbox writer 23
24. Demo results 24
25. Message relay 25
26. Outbox forwarder type Forwarder interface { Forward(ctx, limit int) (ForwardOutput, error) } type forwarder struct { reader Reader publisher Publisher filter MessageFilter // optional } 26
27. Demo outbox forwarder 27
28. Demo results 28
29. Demo SQS reader 29
30. Demo results 30
31. Demo recap 31
32. pgx-outbox recap Simple, generic, extensible Writer supports pgx and database/sql tx Reader, Forwarder use pgx Test coverage 80% 32
33. Alternatives watermill-sql: SQL Pub/Sub on top of Watermill library dataddo/pgq: general queue on top of Postgres jackc/pglogrepl: Postgres logical replication, low-level Trendyol/go-pq-cdc: CDC for Postgres PeerDB: streaming from Postgres to data warehouses, queues 33
34. Testing in pgx-outbox Testcontainers: Postgres, LocalStack modules Mocks: vektra/mockery Suite: stretchr/testify/suite Linters: testifylint 34
35. Future plans Add support for CDC/WAL-based Reader Explore capabilities of PeerDB Add support for Kafka, NATS publishers 35
36. Q & A Thank you! @nikolayk812 36
37.

Home - Wiki
Copyright © 2011-2025 iteam. Current version is 2.142.0. UTC+08:00, 2025-02-22 04:43
浙ICP备14020137号-1 $Map of visitor$