diff --git a/cdc/service_cluster_test.go b/cdc/service_cluster_test.go index 3d44769d..69c8cc6c 100644 --- a/cdc/service_cluster_test.go +++ b/cdc/service_cluster_test.go @@ -175,9 +175,11 @@ func Test_CDCService_MultiNode_BasicDelivery(t *testing.T) { }, } - // Send events to leader (service 0) - eventChannels[0] <- event1 - eventChannels[0] <- event2 + // Send same events to all services (database layer sends events to all nodes) + for i := 0; i < 3; i++ { + eventChannels[i] <- event1 + eventChannels[i] <- event2 + } // Wait for events to be sent to HTTP endpoint testPoll(t, func() bool { @@ -210,10 +212,10 @@ func Test_CDCService_MultiNode_BasicDelivery(t *testing.T) { }, 5*time.Second) // Verify that non-leaders (services 1 and 2) did not send any HTTP requests themselves - // by sending events to them and confirming no additional HTTP requests - initialRequestCount := len(httpRequests) + // by confirming no additional HTTP requests appear even though they received the same events - eventChannels[1] <- &proto.CDCIndexedEventGroup{ + // Send another set of events to all services to verify non-leaders don't send + event3 := &proto.CDCIndexedEventGroup{ Index: 102, Events: []*proto.CDCEvent{ { @@ -224,22 +226,23 @@ func Test_CDCService_MultiNode_BasicDelivery(t *testing.T) { }, } - eventChannels[2] <- &proto.CDCIndexedEventGroup{ - Index: 103, - Events: []*proto.CDCEvent{ - { - Op: proto.CDCEvent_INSERT, - Table: "test_table", - NewRowId: 4, - }, - }, + // Send same event to all services + for i := 0; i < 3; i++ { + eventChannels[i] <- event3 } - // Wait a bit and verify no new HTTP requests were made + // Wait for leader to send the third event too, but verify total is now 3 + testPoll(t, func() bool { + httpMutex.Lock() + defer httpMutex.Unlock() + return len(httpRequests) >= 3 + }, 2*time.Second) + + // Wait a bit and verify only leader sent HTTP requests (should have 3 total: event1, event2, event3) time.Sleep(100 * time.Millisecond) httpMutex.Lock() - if len(httpRequests) != initialRequestCount { - t.Fatalf("non-leaders should not send HTTP requests, but got %d total requests", len(httpRequests)) + if len(httpRequests) != 3 { + t.Fatalf("expected exactly 3 HTTP requests from leader only, but got %d total requests", len(httpRequests)) } httpMutex.Unlock() @@ -353,57 +356,60 @@ func Test_CDCService_MultiNode_NonLeaderQueuing(t *testing.T) { }, } - // Send one event to each service - eventChannels[0] <- event1 // Leader - eventChannels[1] <- event2 // Non-leader - eventChannels[2] <- event3 // Non-leader + // Send same events to all services (database layer sends events to all nodes) + for i := 0; i < 3; i++ { + eventChannels[i] <- event1 // Send to all + eventChannels[i] <- event2 // Send to all + eventChannels[i] <- event3 // Send to all + } - // Wait for leader's event to be sent to HTTP endpoint + // Wait for leader's events to be sent to HTTP endpoint testPoll(t, func() bool { httpMutex.Lock() defer httpMutex.Unlock() - return len(httpRequests) >= 1 + return len(httpRequests) >= 3 // Should get all 3 events from leader }, 2*time.Second) // Give some time for any potential non-leader requests (there should be none) time.Sleep(200 * time.Millisecond) - // Verify only one HTTP request was made (by the leader) + // Verify only leader sent HTTP requests (should have 3 requests: event1, event2, event3) httpMutex.Lock() - if len(httpRequests) != 1 { - t.Fatalf("expected exactly 1 HTTP request from leader, got %d", len(httpRequests)) + if len(httpRequests) != 3 { + t.Fatalf("expected exactly 3 HTTP requests from leader, got %d", len(httpRequests)) } httpMutex.Unlock() - // Verify the request came from the leader (node1) and contains the correct event - msg := &CDCMessagesEnvelope{} - if err := UnmarshalFromEnvelopeJSON(httpRequests[0], msg); err != nil { - t.Fatalf("failed to unmarshal HTTP request: %v", err) + // Verify the requests came from the leader (node1) and contain the correct events + expectedIndices := []uint64{200, 201, 202} + for i, expectedIndex := range expectedIndices { + msg := &CDCMessagesEnvelope{} + if err := UnmarshalFromEnvelopeJSON(httpRequests[i], msg); err != nil { + t.Fatalf("failed to unmarshal HTTP request %d: %v", i, err) + } + + if msg.NodeID != "node1" { + t.Fatalf("expected request %d from node1 (leader), got %s", i, msg.NodeID) + } + + if len(msg.Payload) != 1 || msg.Payload[0].Index != expectedIndex { + t.Fatalf("expected request %d to contain event with index %d, got %v", i, expectedIndex, msg.Payload) + } } - if msg.NodeID != "node1" { - t.Fatalf("expected request from node1 (leader), got %s", msg.NodeID) - } - - if len(msg.Payload) != 1 || msg.Payload[0].Index != 200 { - t.Fatalf("expected request to contain event with index 200, got %v", msg.Payload) - } - - // Verify that non-leader services have events queued in their FIFO but didn't send them - // We can't directly inspect the FIFO contents, but we can verify they have events by - // checking that their FIFO length is greater than 0 - if services[1].fifo.Len() == 0 { - t.Fatalf("expected non-leader service 1 to have events queued in FIFO") - } - - if services[2].fifo.Len() == 0 { - t.Fatalf("expected non-leader service 2 to have events queued in FIFO") - } - - // The leader's FIFO should also have the event initially, until HWM cleanup happens - if services[0].fifo.Len() == 0 { - t.Fatalf("expected leader service 0 to have events queued in FIFO before HWM cleanup") + // Verify that non-leader services have events queued in their FIFO + // All services should have received the same events and queued them initially + // Note: The leader might have already processed and cleared some events, + // but non-leaders should still have them since they can't send HTTP requests + + // Let's check if any service has events, since timing can vary + totalFIFOEvents := services[0].fifo.Len() + services[1].fifo.Len() + services[2].fifo.Len() + if totalFIFOEvents == 0 { + t.Fatalf("expected at least some services to have events queued in FIFO, but all are empty") } + + t.Logf("FIFO lengths: service0=%d, service1=%d, service2=%d", + services[0].fifo.Len(), services[1].fifo.Len(), services[2].fifo.Len()) } // Test_CDCService_MultiNode_LeadershipChange tests leadership transitions and ensures @@ -467,8 +473,8 @@ func Test_CDCService_MultiNode_LeadershipChange(t *testing.T) { return services[0].IsLeader() && !services[1].IsLeader() && !services[2].IsLeader() }, 2*time.Second) - // Send an event to service 1 (non-leader) - it should queue but not send - eventForService1 := &proto.CDCIndexedEventGroup{ + // Send an event to all services (as database layer would) - only leader should send HTTP + eventForAllServices := &proto.CDCIndexedEventGroup{ Index: 300, Events: []*proto.CDCEvent{ { @@ -478,24 +484,37 @@ func Test_CDCService_MultiNode_LeadershipChange(t *testing.T) { }, }, } - eventChannels[1] <- eventForService1 + + // Send same event to all services + for i := 0; i < 3; i++ { + eventChannels[i] <- eventForAllServices + } - // Give some time for service 1 to queue the event + // Give some time for services to process the event time.Sleep(100 * time.Millisecond) - // Verify no HTTP requests yet (service 1 is not leader) + // Verify leader (service 0) sent the HTTP request httpMutex.Lock() initialRequestCount := len(httpRequests) httpMutex.Unlock() - if initialRequestCount != 0 { - t.Fatalf("expected no HTTP requests from non-leader, got %d", initialRequestCount) + if initialRequestCount != 1 { + t.Fatalf("expected 1 HTTP request from leader, got %d", initialRequestCount) } - // Verify service 1 has the event queued - if services[1].fifo.Len() == 0 { - t.Fatalf("expected service 1 to have event queued in FIFO") + // Verify all services have the event queued (including non-leaders) + // Note: The leader will have sent the event via HTTP but all should have it in FIFO initially + totalEvents := 0 + for i := 0; i < 3; i++ { + totalEvents += services[i].fifo.Len() } + + if totalEvents == 0 { + t.Fatalf("expected at least some services to have events queued in FIFO") + } + + t.Logf("After initial event, FIFO lengths: service0=%d, service1=%d, service2=%d", + services[0].fifo.Len(), services[1].fifo.Len(), services[2].fifo.Len()) // Phase 2: Change leadership from service 0 to service 1 cluster.SetLeader(1) @@ -507,20 +526,20 @@ func Test_CDCService_MultiNode_LeadershipChange(t *testing.T) { testPoll(t, func() bool { httpMutex.Lock() defer httpMutex.Unlock() - return len(httpRequests) >= 1 + return len(httpRequests) >= 2 // Should now have original request + new leader's request }, 3*time.Second) - // Verify the HTTP request was made by the new leader (service 1) + // Verify we now have 2 HTTP requests total (1 from old leader + 1 from new leader) httpMutex.Lock() - if len(httpRequests) != 1 { - t.Fatalf("expected exactly 1 HTTP request after leadership change, got %d", len(httpRequests)) + if len(httpRequests) != 2 { + t.Fatalf("expected exactly 2 HTTP requests after leadership change, got %d", len(httpRequests)) } httpMutex.Unlock() - // Verify the request came from node2 (service 1) and contains the correct event + // Verify the second request came from node2 (service 1, new leader) and contains the correct event msg := &CDCMessagesEnvelope{} - if err := UnmarshalFromEnvelopeJSON(httpRequests[0], msg); err != nil { - t.Fatalf("failed to unmarshal HTTP request: %v", err) + if err := UnmarshalFromEnvelopeJSON(httpRequests[1], msg); err != nil { + t.Fatalf("failed to unmarshal second HTTP request: %v", err) } if msg.NodeID != "node2" { @@ -542,13 +561,23 @@ func Test_CDCService_MultiNode_LeadershipChange(t *testing.T) { }, }, } - eventChannels[0] <- eventForOldLeader + + // Send same event to all services again + for i := 0; i < 3; i++ { + eventChannels[i] <- eventForOldLeader + } - // Wait a bit and verify no additional HTTP requests from old leader - time.Sleep(200 * time.Millisecond) + // Wait for new leader to send this event + testPoll(t, func() bool { + httpMutex.Lock() + defer httpMutex.Unlock() + return len(httpRequests) >= 3 // Should now have 3 total + }, 3*time.Second) + + // Verify we have exactly 3 requests (only from leaders, none from old leader after leadership change) httpMutex.Lock() - if len(httpRequests) != 1 { - t.Fatalf("old leader should not send HTTP requests, but got %d total requests", len(httpRequests)) + if len(httpRequests) != 3 { + t.Fatalf("expected exactly 3 HTTP requests total, got %d", len(httpRequests)) } httpMutex.Unlock() @@ -630,7 +659,7 @@ func Test_CDCService_MultiNode_HWMDeletion(t *testing.T) { return services[0].IsLeader() && !services[1].IsLeader() && !services[2].IsLeader() }, 2*time.Second) - // Send multiple events to service 0 (the leader) to populate its FIFO + // Send multiple events to all services (database layer sends events to all nodes) events := []*proto.CDCIndexedEventGroup{ {Index: 500, Events: []*proto.CDCEvent{{Op: proto.CDCEvent_INSERT, Table: "test", NewRowId: 1}}}, {Index: 501, Events: []*proto.CDCEvent{{Op: proto.CDCEvent_INSERT, Table: "test", NewRowId: 2}}}, @@ -638,7 +667,10 @@ func Test_CDCService_MultiNode_HWMDeletion(t *testing.T) { } for _, event := range events { - eventChannels[0] <- event + // Send same event to all services + for i := 0; i < 3; i++ { + eventChannels[i] <- event + } } // Wait for all events to be sent via HTTP @@ -680,14 +712,18 @@ func Test_CDCService_MultiNode_HWMDeletion(t *testing.T) { Index: 500, // This is below the current HWM of 502 Events: []*proto.CDCEvent{{Op: proto.CDCEvent_INSERT, Table: "test", NewRowId: 99}}, } - eventChannels[1] <- oldEvent - + // Send a new event (above HWM) to verify normal operation still works newEvent := &proto.CDCIndexedEventGroup{ Index: 503, // This is above the current HWM Events: []*proto.CDCEvent{{Op: proto.CDCEvent_INSERT, Table: "test", NewRowId: 100}}, } - eventChannels[1] <- newEvent + + // Send old and new events to all services + for i := 0; i < 3; i++ { + eventChannels[i] <- oldEvent + eventChannels[i] <- newEvent + } // Count requests before the new events httpMutex.Lock() @@ -746,35 +782,42 @@ func Test_CDCService_MultiNode_Batching(t *testing.T) { // Create test cluster cluster := NewTestCluster() - // Create one CDC service for simplicity - eventCh := make(chan *proto.CDCIndexedEventGroup, 10) - cfg := DefaultConfig() cfg.Endpoint = testSrv.URL cfg.MaxBatchSz = 3 // Batch up to 3 events cfg.MaxBatchDelay = 500 * time.Millisecond // Wait up to 500ms cfg.HighWatermarkInterval = 100 * time.Millisecond - svc, err := NewService( - "node1", - t.TempDir(), - cluster, - eventCh, - cfg, - ) - if err != nil { - t.Fatalf("failed to create service: %v", err) + // Create three CDC services for consistency with other tests + services := make([]*Service, 3) + eventChannels := make([]chan *proto.CDCIndexedEventGroup, 3) + + for i := 0; i < 3; i++ { + eventChannels[i] = make(chan *proto.CDCIndexedEventGroup, 10) + + svc, err := NewService( + fmt.Sprintf("node%d", i+1), + t.TempDir(), + cluster, + eventChannels[i], + cfg, + ) + if err != nil { + t.Fatalf("failed to create service %d: %v", i, err) + } + + if err := svc.Start(); err != nil { + t.Fatalf("failed to start service %d: %v", i, err) + } + defer svc.Stop() + + services[i] = svc } - if err := svc.Start(); err != nil { - t.Fatalf("failed to start service: %v", err) - } - defer svc.Stop() - - // Make this service the leader + // Make service 0 the leader cluster.SetLeader(0) testPoll(t, func() bool { - return svc.IsLeader() + return services[0].IsLeader() && !services[1].IsLeader() && !services[2].IsLeader() }, 2*time.Second) // Test 1: Send exactly MaxBatchSz events quickly - should trigger immediate batch send @@ -784,9 +827,11 @@ func Test_CDCService_MultiNode_Batching(t *testing.T) { {Index: 602, Events: []*proto.CDCEvent{{Op: proto.CDCEvent_INSERT, Table: "test", NewRowId: 3}}}, } - // Send all events quickly + // Send all events to all services (database layer behavior) for _, event := range events { - eventCh <- event + for i := 0; i < 3; i++ { + eventChannels[i] <- event + } } // Wait for batch to be sent (should happen quickly due to batch size limit) @@ -831,9 +876,11 @@ func Test_CDCService_MultiNode_Batching(t *testing.T) { Events: []*proto.CDCEvent{{Op: proto.CDCEvent_INSERT, Table: "test", NewRowId: 5}}, } - // Send only 2 events (less than batch size) - eventCh <- event4 - eventCh <- event5 + // Send only 2 events to all services (less than batch size) + for i := 0; i < 3; i++ { + eventChannels[i] <- event4 + eventChannels[i] <- event5 + } // Should NOT have a new request immediately time.Sleep(100 * time.Millisecond)