Compress data when writing to CDC FIFO

This commit is contained in:
Copilot
2025-09-14 11:37:30 -04:00
committed by GitHub
parent 65665d0899
commit 97cd6fc11c
4 changed files with 198 additions and 3 deletions

View File

@@ -30,6 +30,7 @@
- [PR #2318](https://github.com/rqlite/rqlite/pull/2318): Add ColumnNames() to DB object.
- [PR #2319](https://github.com/rqlite/rqlite/pull/2319): CDC "before" and "after" now structured as maps.
- [PR #2321](https://github.com/rqlite/rqlite/pull/2321): Add metrics to CDC Service.
- [PR #2323](https://github.com/rqlite/rqlite/pull/2323): Compress data before storing in FIFO.
## v8.43.4 (August 27th 2025)
### Implementation changes and bug fixes

View File

@@ -16,6 +16,7 @@ import (
cdcjson "github.com/rqlite/rqlite/v8/cdc/json"
"github.com/rqlite/rqlite/v8/command/proto"
httpurl "github.com/rqlite/rqlite/v8/http/url"
"github.com/rqlite/rqlite/v8/internal/rarchive/zlib"
"github.com/rqlite/rqlite/v8/internal/rsync"
"github.com/rqlite/rqlite/v8/queue"
)
@@ -394,8 +395,15 @@ func (s *Service) mainLoop() {
continue
}
// Compress the marshalled data before enqueuing to FIFO
compressedData, err := zlib.Compress(b)
if err != nil {
s.logger.Printf("error compressing batch for FIFO: %v", err)
continue
}
idx := req.Objects[len(req.Objects)-1].Index
if err := s.fifo.Enqueue(&Event{Index: idx, Data: b}); err != nil {
if err := s.fifo.Enqueue(&Event{Index: idx, Data: compressedData}); err != nil {
s.logger.Printf("error writing batch to FIFO: %v", err)
}
req.Close()
@@ -490,7 +498,17 @@ func (s *Service) leaderLoop() (chan struct{}, chan struct{}) {
continue
}
req, err := http.NewRequest("POST", s.endpoint, bytes.NewReader(ev.Data))
// Decompress the data read from FIFO into a byte slice. We need to do this
// so the HTTP request can set Content-Length correctly. This makes it easier
// for servers to handle the request. Sure, it consumes some memory but
// CDC events are typically small and it makes downstream processing easier.
decompressed, err := zlib.Decompress(ev.Data)
if err != nil {
s.logger.Printf("error decompressing data for batch from FIFO: %v", err)
continue
}
req, err := http.NewRequest("POST", s.endpoint, bytes.NewReader(decompressed))
if err != nil {
s.logger.Printf("error creating HTTP request for endpoint: %v", err)
continue
@@ -503,7 +521,7 @@ func (s *Service) leaderLoop() (chan struct{}, chan struct{}) {
for {
nAttempts++
if s.logOnly {
s.logger.Println(string(ev.Data))
s.logger.Println(string(decompressed))
sentOK = true
break
}
@@ -520,6 +538,7 @@ func (s *Service) leaderLoop() (chan struct{}, chan struct{}) {
break
}
// OK, need to prep for a retry.
if s.transmitRetryPolicy == ExponentialRetryPolicy {
retryDelay *= 2
if retryDelay > s.transmitMaxBackoff {

View File

@@ -0,0 +1,67 @@
package zlib
import (
"bytes"
"compress/zlib"
"io"
)
// Compress compresses the given data using zlib compression.
func Compress(data []byte) ([]byte, error) {
var buf bytes.Buffer
w := zlib.NewWriter(&buf)
_, err := w.Write(data)
if err != nil {
w.Close()
return nil, err
}
err = w.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Decompress decompresses the given zlib-compressed data.
func Decompress(data []byte) ([]byte, error) {
reader := bytes.NewReader(data)
r, err := zlib.NewReader(reader)
if err != nil {
return nil, err
}
defer r.Close()
return io.ReadAll(r)
}
// Reader performs zlib decompression.
type Reader struct {
io.Reader
}
// NewReader creates a new zlib reader from the given io.Reader.
func NewReader(data []byte) (*Reader, error) {
zr, err := zlib.NewReader(bytes.NewReader(data))
if err != nil {
return nil, err
}
return &Reader{zr}, nil
}
// Close closes the reader.
func (r *Reader) Close() error {
if closer, ok := r.Reader.(io.Closer); ok {
return closer.Close()
}
return nil
}
// Reset resets the reader with new data.
func (r *Reader) Reset(data []byte) error {
if resetter, ok := r.Reader.(zlib.Resetter); ok {
return resetter.Reset(bytes.NewReader(data), nil)
}
return nil
}

View File

@@ -0,0 +1,108 @@
package zlib
import (
"bytes"
"io"
"testing"
)
// Test_CompressDecompress tests basic compression and decompression functionality.
func Test_CompressDecompress(t *testing.T) {
testData := []byte("Hello, this is a test string for zlib compression!")
// Compress the data
compressed, err := Compress(testData)
if err != nil {
t.Fatalf("Failed to compress data: %v", err)
}
// Verify compression actually reduces size for this test data
if len(compressed) >= len(testData) {
t.Logf("Compression did not reduce size: original=%d, compressed=%d", len(testData), len(compressed))
}
// Decompress the data
decompressed, err := Decompress(compressed)
if err != nil {
t.Fatalf("Failed to decompress data: %v", err)
}
// Verify the data is identical
if !bytes.Equal(testData, decompressed) {
t.Errorf("Decompressed data does not match original. Original: %q, Decompressed: %q", testData, decompressed)
}
}
// Test_Compress_EmptyData tests compression of empty data.
func Test_Compress_EmptyData(t *testing.T) {
testData := []byte{}
compressed, err := Compress(testData)
if err != nil {
t.Fatalf("Failed to compress empty data: %v", err)
}
decompressed, err := Decompress(compressed)
if err != nil {
t.Fatalf("Failed to decompress empty data: %v", err)
}
if !bytes.Equal(testData, decompressed) {
t.Errorf("Decompressed empty data does not match original")
}
}
// Test_Decompress_InvalidData tests decompression of invalid data.
func Test_Decompress_InvalidData(t *testing.T) {
invalidData := []byte("this is not zlib compressed data")
_, err := Decompress(invalidData)
if err == nil {
t.Error("Expected an error when decompressing invalid data, but got none")
}
}
func Test_Reader(t *testing.T) {
testData := []byte("Hello, this is a test string for zlib compression!")
// Compress the data
compressed, err := Compress(testData)
if err != nil {
t.Fatalf("Failed to compress data: %v", err)
}
// Create a new zlib reader
reader, err := NewReader(compressed)
if err != nil {
t.Fatalf("Failed to create zlib reader: %v", err)
}
defer reader.Close()
// Read all data from the reader and check it matches original.
decompressed, err := io.ReadAll(reader)
if err != nil {
t.Fatalf("Failed to read from zlib reader: %v", err)
}
if !bytes.Equal(testData, decompressed) {
t.Errorf("Decompressed data does not match original. Original: %q, Decompressed: %q", testData, decompressed)
}
// Test resetting the reader with new data.
testData2 := []byte("Hello, this is another test string for zlib compression!")
compressed2, err := Compress(testData2)
if err != nil {
t.Fatalf("Failed to compress data: %v", err)
}
if err := reader.Reset(compressed2); err != nil {
t.Fatalf("Failed to reset zlib reader: %v", err)
}
// Read all data from the reset reader and check it matches new original.
decompressed2, err := io.ReadAll(reader)
if err != nil {
t.Fatalf("Failed to read from zlib reader after reset: %v", err)
}
if !bytes.Equal(testData2, decompressed2) {
t.Errorf("Decompressed data after reset does not match original. Original: %q, Decompressed: %q", testData2, decompressed2)
}
}