mirror of
https://github.com/rqlite/rqlite.git
synced 2026-01-25 04:16:26 +00:00
Improve CDC stats
This commit is contained in:
@@ -29,6 +29,7 @@
|
||||
- [PR #2315](https://github.com/rqlite/rqlite/pull/2315): Include "before" and "after" in CDC events.
|
||||
- [PR #2318](https://github.com/rqlite/rqlite/pull/2318): Add ColumnNames() to DB object.
|
||||
- [PR #2319](https://github.com/rqlite/rqlite/pull/2319): CDC "before" and "after" now structured as maps.
|
||||
- [PR #2321](https://github.com/rqlite/rqlite/pull/2321): Add metrics to CDC Service.
|
||||
|
||||
## v8.43.4 (August 27th 2025)
|
||||
### Implementation changes and bug fixes
|
||||
|
||||
@@ -25,12 +25,12 @@ const (
|
||||
|
||||
const (
|
||||
DefaultMaxBatchSz = 10
|
||||
DefaultMaxBatchDelay = 100 * time.Millisecond
|
||||
DefaultMaxBatchDelay = 200 * time.Millisecond
|
||||
DefaultHighWatermarkInterval = 1 * time.Second
|
||||
DefaultTransmitTimeout = 5 * time.Second
|
||||
DefaultTransmitRetryPolicy = LinearRetryPolicy
|
||||
DefaultTransmitMinBackoff = time.Second
|
||||
DefaultTransmitMaxBackoff = time.Minute
|
||||
DefaultTransmitMaxBackoff = 30 * time.Second
|
||||
)
|
||||
|
||||
// TLSConfiguration holds the TLS configuration for the CDC service.
|
||||
|
||||
23
cdc/fifo.go
23
cdc/fifo.go
@@ -3,6 +3,7 @@ package cdc
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -90,6 +91,9 @@ func NewQueue(path string) (*Queue, error) {
|
||||
if innerErr != nil {
|
||||
return fmt.Errorf("failed to get highest key: %w", innerErr)
|
||||
}
|
||||
u := new(expvar.Int)
|
||||
u.Set(int64(bucketSize(tx, bucketName)))
|
||||
stats.Set(fifoSize, u)
|
||||
return nil
|
||||
}); err != nil {
|
||||
db.Close()
|
||||
@@ -285,7 +289,7 @@ func (q *Queue) run(highestKey uint64) {
|
||||
case req := <-q.enqueueChan:
|
||||
if req.idx <= highestKey {
|
||||
req.respChan <- enqueueResp{err: nil}
|
||||
stats.Add(numFIFOIgnored, 1)
|
||||
stats.Add(numFIFOEnqueueIgnored, 1)
|
||||
continue
|
||||
}
|
||||
key := uint64tob(req.idx)
|
||||
@@ -293,16 +297,15 @@ func (q *Queue) run(highestKey uint64) {
|
||||
if err := tx.Bucket(bucketName).Put(key, req.item); err != nil {
|
||||
return err
|
||||
}
|
||||
stats.Add(fifoSize, 1)
|
||||
if req.idx > highestKey {
|
||||
if err := setHighestKey(tx, req.idx); err != nil {
|
||||
return err
|
||||
}
|
||||
highestKey = req.idx
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err == nil && req.idx > highestKey {
|
||||
highestKey = req.idx
|
||||
}
|
||||
req.respChan <- enqueueResp{err: err}
|
||||
if err == nil && nextEv == nil {
|
||||
if err := loadHead(); err != nil {
|
||||
@@ -333,6 +336,7 @@ func (q *Queue) run(highestKey uint64) {
|
||||
return derr
|
||||
}
|
||||
}
|
||||
stats.Add(fifoSize, -int64(len(keysToDelete)))
|
||||
return nil
|
||||
})
|
||||
// Ensure cursor moves past deleted range
|
||||
@@ -357,8 +361,7 @@ func (q *Queue) run(highestKey uint64) {
|
||||
firstKey := uint64(0)
|
||||
err := q.db.View(func(tx *bbolt.Tx) error {
|
||||
bucket := tx.Bucket(bucketName)
|
||||
st := bucket.Stats()
|
||||
l = st.KeyN
|
||||
l = bucketSize(tx, bucketName)
|
||||
isEmpty = (l == 0)
|
||||
if !isEmpty {
|
||||
c := bucket.Cursor()
|
||||
@@ -424,6 +427,14 @@ type queryResp struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func bucketSize(tx *bbolt.Tx, name []byte) int {
|
||||
b := tx.Bucket(name)
|
||||
if b == nil {
|
||||
return 0
|
||||
}
|
||||
return b.Stats().KeyN
|
||||
}
|
||||
|
||||
func btouint64(b []byte) uint64 {
|
||||
if len(b) != 8 {
|
||||
panic(fmt.Sprintf("expected 8 bytes, got %d", len(b)))
|
||||
|
||||
@@ -29,14 +29,17 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
numDroppedNotLeader = "dropped_not_leader"
|
||||
numDroppedFailedToSend = "dropped_failed_to_send"
|
||||
numRetries = "retries"
|
||||
numSent = "sent_events"
|
||||
numSnapshotSync = "snapshot_sync"
|
||||
numFIFOIgnored = "fifo_ignored"
|
||||
cdcNumBatcherWriteIgnored = "cdc_batcher_write_ignored"
|
||||
cdcNumFIFOReadIgnored = "cdc_fifo_read_ignored"
|
||||
numDroppedFailedToSend = "dropped_failed_to_send"
|
||||
numRetries = "retries"
|
||||
numSent = "sent_events"
|
||||
numSnapshotSync = "snapshot_sync"
|
||||
numBatcherWrites = "batcher_writes"
|
||||
numBatcherEventsWrite = "batcher_events_write"
|
||||
numBatcherReads = "batcher_reads"
|
||||
numBatcherWriteIgnored = "batcher_write_ignored"
|
||||
numFIFOEnqueueIgnored = "fifo_enqueue_ignored"
|
||||
numHWMIgnored = "hwm_ignored"
|
||||
fifoSize = "fifo_size"
|
||||
)
|
||||
|
||||
// stats captures stats for the CDC Service.
|
||||
@@ -50,14 +53,17 @@ func init() {
|
||||
// ResetStats resets the expvar stats for this module. Mostly for test purposes.
|
||||
func ResetStats() {
|
||||
stats.Init()
|
||||
stats.Add(numDroppedNotLeader, 0)
|
||||
stats.Add(numDroppedFailedToSend, 0)
|
||||
stats.Add(numRetries, 0)
|
||||
stats.Add(numSent, 0)
|
||||
stats.Add(numSnapshotSync, 0)
|
||||
stats.Add(numFIFOIgnored, 0)
|
||||
stats.Add(cdcNumBatcherWriteIgnored, 0)
|
||||
stats.Add(cdcNumFIFOReadIgnored, 0)
|
||||
stats.Add(numBatcherWrites, 0)
|
||||
stats.Add(numBatcherEventsWrite, 0)
|
||||
stats.Add(numBatcherReads, 0)
|
||||
stats.Add(numBatcherWriteIgnored, 0)
|
||||
stats.Add(numFIFOEnqueueIgnored, 0)
|
||||
stats.Add(numHWMIgnored, 0)
|
||||
stats.Add(fifoSize, 0)
|
||||
}
|
||||
|
||||
// Cluster is an interface that defines methods for cluster management and communication.
|
||||
@@ -374,6 +380,8 @@ func (s *Service) mainLoop() {
|
||||
}
|
||||
|
||||
case req := <-s.batcher.C:
|
||||
stats.Add(numBatcherReads, 1)
|
||||
|
||||
for i := range req.Objects {
|
||||
if req.Objects[i].Flush {
|
||||
s.flushRx.Add(1)
|
||||
@@ -415,12 +423,16 @@ func (s *Service) writeToBatcher() {
|
||||
// This could happen on followers if the Leader has advanced the HWM
|
||||
// but this node hasn't even had the event generated by its underlying
|
||||
// database yet.
|
||||
stats.Add(cdcNumBatcherWriteIgnored, 1)
|
||||
stats.Add(numBatcherWriteIgnored, 1)
|
||||
continue
|
||||
}
|
||||
if _, err := s.batcher.WriteOne(o, nil); err != nil {
|
||||
s.logger.Printf("error writing CDC events to batcher: %v", err)
|
||||
} else if err == nil {
|
||||
stats.Add(numBatcherWrites, 1)
|
||||
stats.Add(numBatcherEventsWrite, int64(len(o.Events)))
|
||||
}
|
||||
|
||||
case ch := <-s.snapshotCh:
|
||||
stats.Add(numSnapshotSync, 1)
|
||||
evg := &proto.CDCIndexedEventGroup{
|
||||
@@ -474,7 +486,7 @@ func (s *Service) leaderLoop() (chan struct{}, chan struct{}) {
|
||||
// This could happen on followers if the Leader has advanced the HWM
|
||||
// but this node hasn't even had the event generated by its underlying
|
||||
// database yet.
|
||||
stats.Add(cdcNumFIFOReadIgnored, 1)
|
||||
stats.Add(numHWMIgnored, 1)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -243,11 +243,6 @@ func Test_ServiceRestart_NoDupes(t *testing.T) {
|
||||
t.Fatalf("timeout waiting for HTTP POST")
|
||||
}
|
||||
|
||||
// Peek into the FIFO, ensure it is behaving correctly.
|
||||
if got, exp := stats.Get(numFIFOIgnored).(*expvar.Int).Value(), int64(0); exp != got {
|
||||
t.Fatalf("expected %d FIFO ignored events, got %d", exp, got)
|
||||
}
|
||||
|
||||
// Wait until the svc has performed a FIFO pruning.
|
||||
testPoll(t, func() bool {
|
||||
return svc.hwmLeaderUpdated.Load() == 1
|
||||
@@ -271,12 +266,12 @@ func Test_ServiceRestart_NoDupes(t *testing.T) {
|
||||
defer svc2.Stop()
|
||||
cl.SetLeader(0)
|
||||
|
||||
// Send the same event, ensure it is not forwarded.
|
||||
// Send the same event, ensure it is not forwarded because the
|
||||
// FIFO's highest-key-written logic will drop it.
|
||||
currentDrop := stats.Get(numFIFOEnqueueIgnored).(*expvar.Int).Value()
|
||||
svc2.C() <- evs
|
||||
|
||||
// Peek into the CDC FIFO.
|
||||
testPoll(t, func() bool {
|
||||
return stats.Get(numFIFOIgnored).(*expvar.Int).Value() == 1
|
||||
return stats.Get(numFIFOEnqueueIgnored).(*expvar.Int).Value() == currentDrop+1
|
||||
}, 2*time.Second)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user