mirror of
https://github.com/apple/foundationdb.git
synced 2026-01-25 04:18:18 +00:00
Implement MockS3ServerChaos: S3 Error Injection for Testing (#12515)
* Implement MockS3ServerChaos: S3 Error Injection for Testing Add chaos injection system for MockS3Server following AsyncFileChaos pattern. Tests S3BlobStore client resilience against realistic S3 failures. - MockS3ServerChaos wrapper with configurable fault injection - S3FaultInjector with error/throttle/delay/corruption rates - Chaos support integrated into S3ClientWorkload - Simplified URL parsing and improved error handling - Multipart upload idempotency and BUGGIFY fix - ChaosMetrics tracking for S3 events - Comprehensive test suite with multiple chaos levels Design doc: design/mocks3server_chaos_design.md Tests: tests/slow/S3ClientWorkloadWithChaos.toml Currently adds light/medium/heavy chaos to the simple s3client test. Will follow-on with similar for bulkload via s3 and for backup via s3. * Mostly around registration of http server across test runs inside a workflow * Formatting * Missing state qualifier * * fdbclient/S3BlobStore.actor.cpp Fix bug where we are accumulating headers across retries. * fdbserver/MockS3Server.actor.cpp * fdbserver/MockS3ServerChaos.actor.cpp * fdbserver/include/fdbserver/MockS3Server.h Add check if we should persist mocks3 data. * tests/slow/S3ClientWorkloadWithChaos.toml Use one mocks3, the one that does chaos for all loads.
This commit is contained in:
122
design/mocks3server_chaos_design.md
Normal file
122
design/mocks3server_chaos_design.md
Normal file
@@ -0,0 +1,122 @@
|
||||
# MockS3ServerChaos: S3 Error Injection for Testing
|
||||
|
||||
## Overview
|
||||
|
||||
Create a MockS3ServerChaos implementation, modeled after the existing [`AsyncFileChaos`](https://github.com/apple/foundationdb/tree/main/fdbrpc/include/fdbrpc/AsyncFileChaos.h#L32) pattern. This enables comprehensive testing of S3 client error handling, retry logic, and resilience against realistic failure scenarios.
|
||||
|
||||
Philosophy: mocks3 should be more intolerant/strict than real s3
|
||||
|
||||
## Problem
|
||||
|
||||
FoundationDB's S3BlobStore client needs thorough testing against realistic S3 failure scenarios, but the existing [`MockS3Server`](https://github.com/apple/foundationdb/tree/main/fdbserver/MockS3Server.actor.cpp#L43) only provides deterministic "happy path" responses. Real S3 services exhibit various error conditions that clients must handle gracefully.
|
||||
|
||||
## Design
|
||||
|
||||
**MockS3ServerChaos** is a new class that acts as a chaos-enabled wrapper around the base MockS3Server, following the established chaos injection pattern:
|
||||
|
||||
### Chaos Control System
|
||||
|
||||
MockS3ServerChaos will follow the [`AsyncFileChaos`](https://github.com/apple/foundationdb/tree/main/fdbrpc/include/fdbrpc/AsyncFileChaos.h#L54) pattern with **fault injector-driven chaos**:
|
||||
|
||||
#### **S3FaultInjector** - Configurable Rates (Primary Control)
|
||||
|
||||
```
|
||||
// S3FaultInjector provides configurable rates (0.0-1.0) via g_network->global()
|
||||
auto injector = g_network->global(enS3FaultInjector);
|
||||
if (injector) {
|
||||
double errorRate = injector->getErrorRate(); // 0.0-1.0 probability
|
||||
double throttleRate = injector->getThrottleRate(); // 0.0-1.0 probability
|
||||
double delayRate = injector->getDelayRate(); // 0.0-1.0 probability
|
||||
double maxDelay = injector->getMaxDelay(); // seconds
|
||||
|
||||
// Runtime decision using deterministic random
|
||||
if (deterministicRandom()->random01() < errorRate) {
|
||||
// Inject error based on configured probability
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
#### **BUGGIFY** - Occasional Extra Chaos
|
||||
|
||||
`BUGGIFY` is a **probabilistic macro** (not boolean) that evaluates to true with low probability:
|
||||
|
||||
```
|
||||
if (BUGGIFY) {
|
||||
// Occasionally inject extra chaos - executes randomly, not always
|
||||
// Used for additional chaos beyond configured rates
|
||||
}
|
||||
```
|
||||
|
||||
### Key Features to Implement
|
||||
|
||||
#### 1. **Realistic S3 Error Simulation**
|
||||
|
||||
* **HTTP errors**: 429 (throttling), 503 (service unavailable), 500/502 (server errors)
|
||||
* **Auth errors**: 401 (unauthorized), 406 (not acceptable)
|
||||
* **S3-specific errors**: InvalidToken, ExpiredToken (matching [`S3BlobStore.actor.cpp`](https://github.com/apple/foundationdb/tree/main/fdbclient/S3BlobStore.actor.cpp#L1241) patterns)
|
||||
* **Connection issues**: Connection drops, timeouts
|
||||
* **Data corruption**: Malformed responses, bit flips
|
||||
|
||||
#### 2. **Fault Injector-Driven Rates**
|
||||
|
||||
* **No master switch** - chaos is controlled by fault injector presence and rates
|
||||
* **Configurable probabilities**: Each fault type has independent rate (0.0-1.0)
|
||||
* **Deterministic randomness**: Reproducible chaos using `deterministicRandom()->random01()`
|
||||
* **Rate-based decisions**: `if (random < rate)` pattern like [`AsyncFileChaos`](https://github.com/apple/foundationdb/tree/main/fdbrpc/include/fdbrpc/AsyncFileChaos.h#L96)
|
||||
|
||||
#### 3. **Operation-Specific Targeting**
|
||||
|
||||
Supports targeted chaos injection with configurable multipliers:
|
||||
|
||||
* **Read operations** (GET/HEAD objects)
|
||||
* **Write operations** (PUT objects)
|
||||
* **Delete operations** (DELETE objects)
|
||||
* **Multipart upload operations** (initiate/upload/complete/abort)
|
||||
* **List operations** (bucket listing)
|
||||
|
||||
#### 4. **Advanced Chaos Patterns** TODO
|
||||
|
||||
* **BUGGIFY extras**: Occasional additional chaos beyond configured rates
|
||||
* **Delay jitter**: Variable delays with configurable patterns
|
||||
* **Retry-After headers**: Realistic throttling responses with retry guidance
|
||||
* **Burst patterns**: Periodic error rate spikes
|
||||
|
||||
#### 5. **Framework Integration**
|
||||
|
||||
* **Fault injector pattern**: Uses `g_network->global()` like [`DiskFailureInjector`](https://github.com/apple/foundationdb/tree/main/fdbrpc/include/fdbrpc/AsyncFileChaos.h#L56) and [`BitFlipper`](https://github.com/apple/foundationdb/tree/main/fdbrpc/include/fdbrpc/AsyncFileChaos.h#L93)
|
||||
* **No boolean master switch**: Always potentially active when fault injectors present
|
||||
* **Metrics integration**: Tracks chaos events via existing [`ChaosMetrics`](https://github.com/apple/foundationdb/tree/main/fdbrpc/include/fdbrpc/AsyncFileChaos.h#L28) system
|
||||
* **Trace events**: Comprehensive logging for debugging and analysis
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
MockS3ServerChaos Configuration:
|
||||
├── S3FaultInjector (primary control) - Configurable fault rates:
|
||||
│ ├── errorRate: 0.0-1.0 (0% to 100% error probability)
|
||||
│ ├── throttleRate: 0.0-1.0 (throttling probability)
|
||||
│ ├── delayRate: 0.0-1.0 (delay probability)
|
||||
│ └── maxDelay: seconds (maximum delay time)
|
||||
├── BUGGIFY (occasional extras) - Probabilistic additional chaos
|
||||
└── Runtime Decision Logic:
|
||||
├── Operation classification (GET/PUT/DELETE/multipart/list)
|
||||
├── Deterministic random + configured rates
|
||||
├── Chaos injection actors (delay/error/corruption)
|
||||
├── S3-compatible error response generation
|
||||
└── Base MockS3RequestHandler delegation
|
||||
```
|
||||
|
||||
## Usage
|
||||
|
||||
Replace [`startMockS3Server()`](https://github.com/apple/foundationdb/tree/main/fdbserver/include/fdbserver/MockS3Server.h#L47) calls with `startMockS3ServerChaos()` in simulation tests:
|
||||
|
||||
```
|
||||
// Before:
|
||||
wait(startMockS3Server(listenAddress));
|
||||
|
||||
// After:
|
||||
wait(startMockS3ServerChaos(listenAddress));
|
||||
```
|
||||
|
||||
Chaos behavior is controlled by **S3FaultInjector rates** (0.0-1.0), with **BUGGIFY providing occasional extra chaos** - no master boolean switch.
|
||||
|
||||
@@ -1097,6 +1097,21 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
|
||||
state std::string canonicalURI = resource;
|
||||
// Set the resource on each loop so we don't double-encode when we set it to `getCanonicalURI` below.
|
||||
req->resource = resource;
|
||||
|
||||
// Reset headers to initial state for this retry attempt to prevent header accumulation
|
||||
// across retries and potential map corruption
|
||||
req->data.headers = headers;
|
||||
req->data.headers["Host"] = bstore->host;
|
||||
req->data.headers["Accept"] = "application/xml";
|
||||
// Re-merge extraHeaders
|
||||
for (const auto& [k, v] : bstore->extraHeaders) {
|
||||
std::string& fieldValue = req->data.headers[k];
|
||||
if (!fieldValue.empty()) {
|
||||
fieldValue.append(",");
|
||||
}
|
||||
fieldValue.append(v);
|
||||
}
|
||||
|
||||
state UID connID = UID();
|
||||
state double reqStartTimer;
|
||||
state double connectStartTimer = g_network->timer();
|
||||
@@ -1223,7 +1238,14 @@ ACTOR Future<Reference<HTTP::IncomingResponse>> doRequest_impl(Reference<S3BlobS
|
||||
}
|
||||
|
||||
Reference<HTTP::IncomingResponse> _r = wait(timeoutError(reqF, requestTimeout));
|
||||
if (g_network->isSimulated() && BUGGIFY && deterministicRandom()->random01() < 0.1) {
|
||||
// Don't simulate token errors for multipart complete operations (POST with uploadId but no partNumber)
|
||||
// because changing a successful 200 to 400 after the server has already completed and removed
|
||||
// the upload causes the client to infinitely retry with a phantom upload ID. This seems too much of
|
||||
// an artifical manufacture.
|
||||
bool isMultipartComplete = verb == "POST" && resource.find("uploadId=") != std::string::npos &&
|
||||
resource.find("partNumber=") == std::string::npos;
|
||||
if (g_network->isSimulated() && BUGGIFY && deterministicRandom()->random01() < 0.1 &&
|
||||
!isMultipartComplete) {
|
||||
// simulate an error from s3
|
||||
_r->code = badRequestCode;
|
||||
simulateS3TokenError = true;
|
||||
|
||||
@@ -418,6 +418,10 @@ public:
|
||||
int nextHTTPPort = 5000;
|
||||
bool httpProtected = false;
|
||||
|
||||
// Truly simulator-global registry for MockS3ServerChaos to prevent duplicate registrations
|
||||
// across all simulated processes (must stay in sync with httpHandlers)
|
||||
std::set<std::string> registeredMockS3ChaosServers;
|
||||
|
||||
flowGlobalType global(int id) const final;
|
||||
void setGlobal(size_t id, flowGlobalType v) final;
|
||||
|
||||
|
||||
@@ -757,13 +757,34 @@ public:
|
||||
|
||||
TraceEvent("MockS3MultipartStart").detail("Bucket", bucket).detail("Object", object);
|
||||
|
||||
// Create multipart upload
|
||||
MultipartUpload upload(bucket, object);
|
||||
state std::string uploadId = upload.uploadId;
|
||||
getGlobalStorage().multipartUploads[uploadId] = std::move(upload);
|
||||
// Check if there's already an in-progress upload for this bucket/object
|
||||
// This makes multipart initiation idempotent - retries return the same upload ID
|
||||
// This matches real S3 behavior where you can have multiple concurrent uploads for the same object
|
||||
std::string existingUploadId;
|
||||
for (const auto& pair : getGlobalStorage().multipartUploads) {
|
||||
if (pair.second.bucket == bucket && pair.second.object == object) {
|
||||
existingUploadId = pair.first;
|
||||
TraceEvent("MockS3MultipartStartIdempotent")
|
||||
.detail("Bucket", bucket)
|
||||
.detail("Object", object)
|
||||
.detail("ExistingUploadId", existingUploadId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Persist multipart state
|
||||
wait(persistMultipartState(uploadId));
|
||||
state std::string uploadId;
|
||||
if (!existingUploadId.empty()) {
|
||||
uploadId = existingUploadId;
|
||||
} else {
|
||||
MultipartUpload upload(bucket, object);
|
||||
uploadId = upload.uploadId;
|
||||
getGlobalStorage().multipartUploads[uploadId] = std::move(upload);
|
||||
TraceEvent("MockS3MultipartStarted").detail("UploadId", uploadId);
|
||||
}
|
||||
|
||||
if (getGlobalStorage().persistenceEnabled) {
|
||||
wait(persistMultipartState(uploadId));
|
||||
}
|
||||
|
||||
// Generate XML response
|
||||
std::string xml = format("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
|
||||
@@ -778,8 +799,6 @@ public:
|
||||
|
||||
self->sendXMLResponse(response, 200, xml);
|
||||
|
||||
TraceEvent("MockS3MultipartStarted").detail("UploadId", uploadId);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
@@ -814,7 +833,9 @@ public:
|
||||
uploadIter->second.parts[partNumber] = { etag, req->data.content };
|
||||
|
||||
// Persist multipart state (includes all parts)
|
||||
wait(persistMultipartState(uploadId));
|
||||
if (getGlobalStorage().persistenceEnabled) {
|
||||
wait(persistMultipartState(uploadId));
|
||||
}
|
||||
|
||||
// Return ETag in response
|
||||
response->code = 200;
|
||||
@@ -875,11 +896,15 @@ public:
|
||||
: "EMPTY");
|
||||
|
||||
// Persist final object
|
||||
wait(persistObject(bucket, object));
|
||||
if (getGlobalStorage().persistenceEnabled) {
|
||||
wait(persistObject(bucket, object));
|
||||
}
|
||||
|
||||
// Clean up multipart upload (in-memory and persisted)
|
||||
getGlobalStorage().multipartUploads.erase(uploadId);
|
||||
wait(deletePersistedMultipart(uploadId));
|
||||
if (getGlobalStorage().persistenceEnabled) {
|
||||
wait(deletePersistedMultipart(uploadId));
|
||||
}
|
||||
|
||||
// Generate completion XML response
|
||||
std::string xml = format("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
|
||||
@@ -918,7 +943,9 @@ public:
|
||||
|
||||
// Remove multipart upload (in-memory and persisted)
|
||||
getGlobalStorage().multipartUploads.erase(uploadId);
|
||||
wait(deletePersistedMultipart(uploadId));
|
||||
if (getGlobalStorage().persistenceEnabled) {
|
||||
wait(deletePersistedMultipart(uploadId));
|
||||
}
|
||||
|
||||
response->code = 204; // No Content
|
||||
response->data.contentLen = 0;
|
||||
@@ -1026,7 +1053,9 @@ public:
|
||||
.detail("StoredSize", getGlobalStorage().buckets[bucket][object].content.size());
|
||||
|
||||
// Persist object to disk
|
||||
wait(persistObject(bucket, object));
|
||||
if (getGlobalStorage().persistenceEnabled) {
|
||||
wait(persistObject(bucket, object));
|
||||
}
|
||||
|
||||
response->code = 200;
|
||||
response->data.headers["ETag"] = etag;
|
||||
@@ -1141,7 +1170,9 @@ public:
|
||||
}
|
||||
|
||||
// Delete persisted object
|
||||
wait(deletePersistedObject(bucket, object));
|
||||
if (getGlobalStorage().persistenceEnabled) {
|
||||
wait(deletePersistedObject(bucket, object));
|
||||
}
|
||||
|
||||
response->code = 204; // No Content
|
||||
response->data.contentLen = 0;
|
||||
@@ -1462,6 +1493,12 @@ static void clearSingletonState() {
|
||||
TraceEvent("MockS3ServerImpl_StateCleared");
|
||||
}
|
||||
|
||||
// Process a Mock S3 request directly (for wrapping/chaos injection)
|
||||
Future<Void> processMockS3Request(Reference<HTTP::IncomingRequest> req, Reference<HTTP::OutgoingResponse> response) {
|
||||
static MockS3ServerImpl serverInstance;
|
||||
return MockS3ServerImpl::handleRequest(&serverInstance, req, response);
|
||||
}
|
||||
|
||||
// Request Handler Implementation - Each handler instance works with global storage
|
||||
Future<Void> MockS3RequestHandler::handleRequest(Reference<HTTP::IncomingRequest> req,
|
||||
Reference<HTTP::OutgoingResponse> response) {
|
||||
@@ -1473,10 +1510,7 @@ Future<Void> MockS3RequestHandler::handleRequest(Reference<HTTP::IncomingRequest
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Create a temporary instance just to use its static handleRequest method
|
||||
// All actual storage is in g_mockS3Storage which is truly global
|
||||
static MockS3ServerImpl serverInstance;
|
||||
return MockS3ServerImpl::handleRequest(&serverInstance, req, response);
|
||||
return processMockS3Request(req, response);
|
||||
}
|
||||
|
||||
Reference<HTTP::IRequestHandler> MockS3RequestHandler::clone() {
|
||||
@@ -1509,15 +1543,8 @@ ACTOR Future<Void> registerMockS3Server_impl(std::string ip, std::string port) {
|
||||
}
|
||||
|
||||
try {
|
||||
TraceEvent("MockS3ServerDiagnostic")
|
||||
.detail("Phase", "Calling registerSimHTTPServer")
|
||||
.detail("Address", serverKey);
|
||||
|
||||
wait(g_simulator->registerSimHTTPServer(ip, port, makeReference<MockS3RequestHandler>()));
|
||||
registeredServers[serverKey] = true;
|
||||
|
||||
// Enable persistence automatically for all MockS3 instances
|
||||
// This ensures all tests using MockS3 get persistence enabled
|
||||
// Enable persistence BEFORE registering the server to prevent race conditions
|
||||
// where requests arrive before persistence is configured
|
||||
if (!getGlobalStorage().persistenceEnabled) {
|
||||
std::string persistenceDir = "simfdb/mocks3";
|
||||
enableMockS3Persistence(persistenceDir);
|
||||
@@ -1529,6 +1556,13 @@ ACTOR Future<Void> registerMockS3Server_impl(std::string ip, std::string port) {
|
||||
wait(loadMockS3PersistedStateFuture());
|
||||
}
|
||||
|
||||
TraceEvent("MockS3ServerDiagnostic")
|
||||
.detail("Phase", "Calling registerSimHTTPServer")
|
||||
.detail("Address", serverKey);
|
||||
|
||||
wait(g_simulator->registerSimHTTPServer(ip, port, makeReference<MockS3RequestHandler>()));
|
||||
registeredServers[serverKey] = true;
|
||||
|
||||
TraceEvent("MockS3ServerRegistered").detail("Address", serverKey).detail("Success", true);
|
||||
|
||||
TraceEvent("MockS3ServerDiagnostic")
|
||||
@@ -1575,6 +1609,8 @@ ACTOR Future<Void> startMockS3Server(NetworkAddress listenAddress) {
|
||||
// Clear all MockS3 global storage - called at the start of each simulation test
|
||||
void clearMockS3Storage() {
|
||||
getGlobalStorage().clearStorage();
|
||||
// Note: Do NOT clear chaos server registry here - it must persist across tests
|
||||
// like the simulator's httpHandlers map, to prevent duplicate registration attempts
|
||||
}
|
||||
|
||||
// Enable persistence for MockS3 storage
|
||||
@@ -1583,6 +1619,11 @@ void enableMockS3Persistence(const std::string& persistenceDir) {
|
||||
TraceEvent("MockS3PersistenceConfigured").detail("Directory", persistenceDir);
|
||||
}
|
||||
|
||||
// Check if MockS3 persistence is currently enabled
|
||||
bool isMockS3PersistenceEnabled() {
|
||||
return getGlobalStorage().persistenceEnabled;
|
||||
}
|
||||
|
||||
// ACTOR: Load persisted objects from disk
|
||||
ACTOR static Future<Void> loadPersistedObjects(std::string persistenceDir) {
|
||||
state std::string objectsDir = persistenceDir + "/objects"; // State variable before any early returns
|
||||
|
||||
362
fdbserver/MockS3ServerChaos.actor.cpp
Normal file
362
fdbserver/MockS3ServerChaos.actor.cpp
Normal file
@@ -0,0 +1,362 @@
|
||||
/*
|
||||
* MockS3ServerChaos.actor.cpp
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2025 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// Design: design/mocks3server_chaos_design.md
|
||||
|
||||
#include "fdbserver/MockS3ServerChaos.h"
|
||||
#include "fdbserver/MockS3Server.h"
|
||||
#include "flow/ChaosMetrics.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
#include "flow/Trace.h"
|
||||
#include "flow/IRandom.h"
|
||||
#include "flow/actorcompiler.h"
|
||||
|
||||
// Clear the chaos server registry (for testing/debugging only)
|
||||
// NOTE: In production simulation tests, the registry should NOT be cleared between tests,
|
||||
// as it must stay in sync with the simulator's persistent httpHandlers map to prevent
|
||||
// duplicate registration attempts that would trigger assertions.
|
||||
void clearMockS3ChaosRegistry() {
|
||||
if (g_network && g_simulator) {
|
||||
size_t previousSize = g_simulator->registeredMockS3ChaosServers.size();
|
||||
g_simulator->registeredMockS3ChaosServers.clear();
|
||||
TraceEvent("MockS3ChaosRegistryCleared").detail("PreviousSize", previousSize);
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function to classify S3 operations
|
||||
S3Operation classifyS3Operation(const std::string& method, const std::string& resource) {
|
||||
if (method == "GET" || method == "HEAD") {
|
||||
return S3Operation::READ;
|
||||
} else if (method == "PUT") {
|
||||
if (resource.find("uploads") != std::string::npos) {
|
||||
return S3Operation::MULTIPART;
|
||||
}
|
||||
return S3Operation::WRITE;
|
||||
} else if (method == "DELETE") {
|
||||
return S3Operation::DELETE;
|
||||
} else if (method == "POST") {
|
||||
if (resource.find("uploads") != std::string::npos) {
|
||||
return S3Operation::MULTIPART;
|
||||
}
|
||||
return S3Operation::WRITE;
|
||||
} else {
|
||||
return S3Operation::READ; // Default fallback
|
||||
}
|
||||
}
|
||||
|
||||
// Get operation-specific multiplier for chaos rates
|
||||
double getOperationMultiplier(S3Operation op) {
|
||||
auto injector = S3FaultInjector::injector();
|
||||
switch (op) {
|
||||
case S3Operation::READ:
|
||||
return injector->getReadMultiplier();
|
||||
case S3Operation::WRITE:
|
||||
return injector->getWriteMultiplier();
|
||||
case S3Operation::DELETE:
|
||||
return injector->getDeleteMultiplier();
|
||||
case S3Operation::LIST:
|
||||
return injector->getListMultiplier();
|
||||
case S3Operation::MULTIPART:
|
||||
return injector->getWriteMultiplier(); // Use write multiplier for multipart
|
||||
default:
|
||||
return 1.0;
|
||||
}
|
||||
}
|
||||
|
||||
// Generate S3-compatible error XML
|
||||
std::string generateS3ErrorXML(const std::string& code, const std::string& message, const std::string& resource) {
|
||||
return format("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n"
|
||||
"<Error>\n"
|
||||
" <Code>%s</Code>\n"
|
||||
" <Message>%s</Message>\n"
|
||||
" <Resource>%s</Resource>\n"
|
||||
" <RequestId>%s</RequestId>\n"
|
||||
"</Error>",
|
||||
code.c_str(),
|
||||
message.c_str(),
|
||||
resource.c_str(),
|
||||
deterministicRandom()->randomUniqueID().toString().c_str());
|
||||
}
|
||||
|
||||
// Phase 1: Inject delay if configured
|
||||
ACTOR Future<Void> maybeInjectDelay(S3Operation op) {
|
||||
auto injector = S3FaultInjector::injector();
|
||||
double delayRate = injector->getDelayRate() * getOperationMultiplier(op);
|
||||
|
||||
if (delayRate > 0.0 && deterministicRandom()->random01() < delayRate) {
|
||||
double maxDelay = injector->getMaxDelay();
|
||||
double delayTime = deterministicRandom()->random01() * maxDelay;
|
||||
|
||||
TraceEvent("MockS3ChaosDelay")
|
||||
.detail("Operation", (int)op)
|
||||
.detail("Delay", delayTime)
|
||||
.detail("MaxDelay", maxDelay);
|
||||
|
||||
wait(delay(delayTime));
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Phase 2: Inject errors if configured
|
||||
ACTOR Future<bool> maybeInjectError(Reference<HTTP::IncomingRequest> req,
|
||||
Reference<HTTP::OutgoingResponse> response,
|
||||
S3Operation op) {
|
||||
auto injector = S3FaultInjector::injector();
|
||||
double errorRate = injector->getErrorRate() * getOperationMultiplier(op);
|
||||
|
||||
if (errorRate <= 0.0 || deterministicRandom()->random01() >= errorRate) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check for throttling (429) first
|
||||
double throttleRate = injector->getThrottleRate() * getOperationMultiplier(op);
|
||||
if (throttleRate > 0.0 && deterministicRandom()->random01() < throttleRate) {
|
||||
response->code = 429;
|
||||
response->data.headers["Content-Type"] = "application/xml";
|
||||
response->data.headers["Retry-After"] = "1";
|
||||
|
||||
std::string errorXML = generateS3ErrorXML("Throttling", "Request was throttled. Please retry.", req->resource);
|
||||
response->data.content = new UnsentPacketQueue();
|
||||
response->data.contentLen = errorXML.size();
|
||||
PacketBuffer* buffer = response->data.content->getWriteBuffer(errorXML.size());
|
||||
PacketWriter writer(buffer, nullptr, Unversioned());
|
||||
writer.serializeBytes(errorXML);
|
||||
writer.finish();
|
||||
|
||||
TraceEvent("MockS3ChaosThrottle").detail("Operation", (int)op).detail("Resource", req->resource);
|
||||
|
||||
// Update metrics
|
||||
auto metrics = g_network->global(INetwork::enChaosMetrics);
|
||||
if (metrics) {
|
||||
static_cast<ChaosMetrics*>(metrics)->s3Throttles++;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Check for general errors (500, 502, 503, 401, 406)
|
||||
if (deterministicRandom()->random01() < errorRate) {
|
||||
// Select error type based on weighted distribution
|
||||
double errorType = deterministicRandom()->random01();
|
||||
|
||||
std::string errorXML;
|
||||
std::string errorType_str;
|
||||
int errorCode;
|
||||
|
||||
if (errorType < 0.4) {
|
||||
// 40% - 503 Service Unavailable
|
||||
errorCode = 503;
|
||||
errorType_str = "ServiceUnavailable";
|
||||
errorXML = generateS3ErrorXML(
|
||||
"ServiceUnavailable", "The service is temporarily unavailable. Please retry.", req->resource);
|
||||
} else if (errorType < 0.7) {
|
||||
// 30% - 500 Internal Server Error
|
||||
errorCode = 500;
|
||||
errorType_str = "InternalError";
|
||||
errorXML =
|
||||
generateS3ErrorXML("InternalError", "We encountered an internal error. Please retry.", req->resource);
|
||||
} else if (errorType < 0.85) {
|
||||
// 15% - 502 Bad Gateway
|
||||
errorCode = 502;
|
||||
errorType_str = "BadGateway";
|
||||
errorXML = generateS3ErrorXML("BadGateway", "Bad gateway error occurred.", req->resource);
|
||||
} else if (errorType < 0.92) {
|
||||
// 7% - 401 Unauthorized
|
||||
errorCode = 401;
|
||||
errorType_str = "InvalidToken";
|
||||
errorXML = generateS3ErrorXML("InvalidToken", "The provided token is invalid.", req->resource);
|
||||
} else {
|
||||
// 8% - 406 Not Acceptable / Token expired
|
||||
errorCode = 406;
|
||||
errorType_str = "ExpiredToken";
|
||||
errorXML = generateS3ErrorXML("ExpiredToken", "The provided token has expired.", req->resource);
|
||||
}
|
||||
|
||||
response->code = errorCode;
|
||||
response->data.headers["Content-Type"] = "application/xml";
|
||||
response->data.content = new UnsentPacketQueue();
|
||||
response->data.contentLen = errorXML.size();
|
||||
PacketBuffer* buffer = response->data.content->getWriteBuffer(errorXML.size());
|
||||
PacketWriter writer(buffer, nullptr, Unversioned());
|
||||
writer.serializeBytes(errorXML);
|
||||
writer.finish();
|
||||
|
||||
TraceEvent("MockS3ChaosError")
|
||||
.detail("Operation", (int)op)
|
||||
.detail("ErrorType", errorType_str)
|
||||
.detail("ErrorCode", errorCode)
|
||||
.detail("Resource", req->resource);
|
||||
|
||||
// Update metrics
|
||||
auto metrics = g_network->global(INetwork::enChaosMetrics);
|
||||
if (metrics) {
|
||||
static_cast<ChaosMetrics*>(metrics)->s3Errors++;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// Phase 4: Corrupt response data if configured
|
||||
ACTOR Future<Void> maybeCorruptResponse(Reference<HTTP::OutgoingResponse> response, S3Operation op) {
|
||||
auto injector = S3FaultInjector::injector();
|
||||
double corruptionRate = injector->getCorruptionRate() * getOperationMultiplier(op);
|
||||
|
||||
if (corruptionRate <= 0.0 || deterministicRandom()->random01() >= corruptionRate) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Only corrupt successful responses
|
||||
if (response->code >= 200 && response->code < 300) {
|
||||
// Invalidate ETag to simulate data corruption
|
||||
if (response->data.headers.find("ETag") != response->data.headers.end()) {
|
||||
response->data.headers["ETag"] = "\"corrupted-" + deterministicRandom()->randomUniqueID().toString() + "\"";
|
||||
}
|
||||
|
||||
TraceEvent("MockS3ChaosCorruption").detail("Operation", (int)op).detail("Resource", "response_data");
|
||||
|
||||
// Update metrics
|
||||
auto metrics = g_network->global(INetwork::enChaosMetrics);
|
||||
if (metrics) {
|
||||
static_cast<ChaosMetrics*>(metrics)->s3Corruptions++;
|
||||
}
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Core chaos server implementation
|
||||
class MockS3ChaosServerImpl {
|
||||
public:
|
||||
ACTOR static Future<Void> handleRequest(Reference<HTTP::IncomingRequest> req,
|
||||
Reference<HTTP::OutgoingResponse> response) {
|
||||
// Classify the operation type (must be state since used after wait())
|
||||
state S3Operation op = classifyS3Operation(req->verb, req->resource);
|
||||
|
||||
TraceEvent("MockS3ChaosRequest")
|
||||
.detail("Method", req->verb)
|
||||
.detail("Resource", req->resource)
|
||||
.detail("Operation", (int)op)
|
||||
.detail("ContentLength", req->data.contentLen);
|
||||
|
||||
// Phase 1: Inject delay if configured
|
||||
wait(maybeInjectDelay(op));
|
||||
|
||||
// Phase 2: Check if we should inject an error
|
||||
state bool errorInjected = wait(maybeInjectError(req, response, op));
|
||||
if (errorInjected) {
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Phase 3: Delegate to base MockS3Server for normal processing
|
||||
// Use the exposed processMockS3Request function to access the shared server instance
|
||||
wait(processMockS3Request(req, response));
|
||||
|
||||
// Phase 4: Possibly corrupt the response data
|
||||
wait(maybeCorruptResponse(response, op));
|
||||
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
||||
// Public interface implementation
|
||||
Future<Void> MockS3ChaosRequestHandler::handleRequest(Reference<HTTP::IncomingRequest> req,
|
||||
Reference<HTTP::OutgoingResponse> response) {
|
||||
return MockS3ChaosServerImpl::handleRequest(req, response);
|
||||
}
|
||||
|
||||
Reference<HTTP::IRequestHandler> MockS3ChaosRequestHandler::clone() {
|
||||
return makeReference<MockS3ChaosRequestHandler>();
|
||||
}
|
||||
|
||||
// Safe server registration that prevents conflicts using truly simulator-global registry
|
||||
ACTOR Future<Void> registerMockS3ChaosServer(std::string ip, std::string port) {
|
||||
state std::string serverKey = ip + ":" + port;
|
||||
ASSERT(g_simulator);
|
||||
|
||||
TraceEvent("MockS3ChaosServerRegistration")
|
||||
.detail("Phase", "Start")
|
||||
.detail("IP", ip)
|
||||
.detail("Port", port)
|
||||
.detail("ServerKey", serverKey)
|
||||
.detail("IsSimulated", g_network->isSimulated())
|
||||
.detail("AlreadyRegistered", g_simulator->registeredMockS3ChaosServers.count(serverKey) > 0);
|
||||
|
||||
// Check if server is already registered using truly simulator-global registry
|
||||
if (g_simulator->registeredMockS3ChaosServers.count(serverKey)) {
|
||||
TraceEvent(SevWarn, "MockS3ChaosServerAlreadyRegistered").detail("Address", serverKey);
|
||||
return Void();
|
||||
}
|
||||
|
||||
try {
|
||||
// Enable persistence BEFORE registering the server to prevent race conditions
|
||||
// where requests arrive before persistence is configured
|
||||
if (!isMockS3PersistenceEnabled()) {
|
||||
std::string persistenceDir = "simfdb/mocks3";
|
||||
enableMockS3Persistence(persistenceDir);
|
||||
TraceEvent("MockS3ChaosServerPersistenceEnabled")
|
||||
.detail("Address", serverKey)
|
||||
.detail("PersistenceDir", persistenceDir);
|
||||
|
||||
// Load any previously persisted state (for crash recovery in simulation)
|
||||
wait(loadMockS3PersistedStateFuture());
|
||||
}
|
||||
|
||||
wait(g_simulator->registerSimHTTPServer(ip, port, makeReference<MockS3ChaosRequestHandler>()));
|
||||
g_simulator->registeredMockS3ChaosServers.insert(serverKey);
|
||||
|
||||
TraceEvent("MockS3ChaosServerRegistered")
|
||||
.detail("Address", serverKey)
|
||||
.detail("Success", true)
|
||||
.detail("TotalRegistered", g_simulator->registeredMockS3ChaosServers.size());
|
||||
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "MockS3ChaosServerRegistrationFailed")
|
||||
.error(e)
|
||||
.detail("Address", serverKey)
|
||||
.detail("ErrorCode", e.code())
|
||||
.detail("ErrorName", e.name());
|
||||
throw;
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
// Public Interface Implementation
|
||||
ACTOR Future<Void> startMockS3ServerChaos(NetworkAddress listenAddress) {
|
||||
ASSERT(g_network->isSimulated());
|
||||
|
||||
TraceEvent("MockS3ChaosServerStart")
|
||||
.detail("Address", listenAddress.toString())
|
||||
.detail("IP", listenAddress.ip.toString())
|
||||
.detail("Port", listenAddress.port);
|
||||
|
||||
// Register the chaos-enabled HTTP server
|
||||
wait(registerMockS3ChaosServer(listenAddress.ip.toString(), std::to_string(listenAddress.port)));
|
||||
|
||||
TraceEvent("MockS3ChaosServerStarted").detail("Address", listenAddress.toString()).detail("Ready", true);
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
#include "flow/unactorcompiler.h"
|
||||
@@ -72,3 +72,13 @@ Future<Void> registerMockS3Server(std::string ip, std::string port);
|
||||
// persistenceDir: Directory where data will be stored (e.g., "simfdb/mocks3")
|
||||
// Creates directory structure: <persistenceDir>/objects/<bucket>/ and <persistenceDir>/multipart/
|
||||
void enableMockS3Persistence(const std::string& persistenceDir);
|
||||
|
||||
// Check if MockS3 persistence is currently enabled
|
||||
bool isMockS3PersistenceEnabled();
|
||||
|
||||
// Load any previously persisted MockS3 state from disk
|
||||
Future<Void> loadMockS3PersistedStateFuture();
|
||||
|
||||
// Process a Mock S3 request directly (for wrapping/chaos injection)
|
||||
// This is the low-level request processor used by MockS3RequestHandler
|
||||
Future<Void> processMockS3Request(Reference<HTTP::IncomingRequest> req, Reference<HTTP::OutgoingResponse> response);
|
||||
|
||||
114
fdbserver/include/fdbserver/MockS3ServerChaos.h
Normal file
114
fdbserver/include/fdbserver/MockS3ServerChaos.h
Normal file
@@ -0,0 +1,114 @@
|
||||
/*
|
||||
* MockS3ServerChaos.h
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2013-2025 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "flow/flow.h"
|
||||
#include "flow/network.h"
|
||||
#include "fdbrpc/HTTP.h"
|
||||
#include <atomic>
|
||||
|
||||
// MockS3ServerChaos - Comprehensive S3 chaos injection for FoundationDB simulation testing
|
||||
//
|
||||
// DESIGN:
|
||||
// See design/mocks3server_chaos_design.md for full design documentation
|
||||
// https://github.com/apple/foundationdb/tree/main/design/mocks3server_chaos_design.md
|
||||
//
|
||||
// OVERVIEW:
|
||||
// Chaos-enabled wrapper around MockS3Server that injects realistic S3 failures
|
||||
// to test client resilience and error handling. Follows AsyncFileChaos pattern.
|
||||
//
|
||||
// PHILOSOPHY:
|
||||
// "MockS3 should be more intolerant/strict than real S3" - surface bugs early
|
||||
// by providing comprehensive fault injection in a controlled, deterministic environment.
|
||||
//
|
||||
// CONFIGURATION:
|
||||
// Chaos is controlled by S3FaultInjector rates (0.0-1.0) via g_network->global(enS3FaultInjector).
|
||||
// No master boolean switch - fine-grained control per fault type.
|
||||
//
|
||||
// S3FaultInjector::injector()->setErrorRate(0.1); // 10% HTTP errors
|
||||
// S3FaultInjector::injector()->setThrottleRate(0.05); // 5% throttling
|
||||
// S3FaultInjector::injector()->setDelayRate(0.1); // 10% delays
|
||||
// S3FaultInjector::injector()->setCorruptionRate(0.01); // 1% corruption
|
||||
// S3FaultInjector::injector()->setMaxDelay(5.0); // Up to 5s delays
|
||||
//
|
||||
// FAULT TYPES:
|
||||
// - HTTP Errors: 429 (throttling), 500/502/503 (server errors), 401/406 (auth)
|
||||
// - Token Issues: InvalidToken, ExpiredToken, TokenRefreshRequired
|
||||
// - Network: Connection drops, timeouts, retry-after headers
|
||||
// - Data Corruption: ETag invalidation, response truncation
|
||||
// - Operation Targeting: Different rates for GET/PUT/DELETE/multipart/list
|
||||
//
|
||||
// USAGE:
|
||||
// // Start chaos server
|
||||
// wait(startMockS3ServerChaos(NetworkAddress::parse("127.0.0.1:8080")));
|
||||
//
|
||||
// // Use with S3BlobStoreEndpoint
|
||||
// auto blobStore = S3BlobStoreEndpoint::fromString("blobstore://key:secret@127.0.0.1:8080", ...);
|
||||
//
|
||||
// INTEGRATION:
|
||||
// - Integrates with S3BlobStoreEndpoint for production client testing
|
||||
// - Works with existing workloads by replacing MockS3Server with MockS3ServerChaos
|
||||
// - Supports all S3 operations: PUT, GET, HEAD, DELETE, multipart, list
|
||||
// - Deterministic randomness based on simulation seed for reproducibility
|
||||
//
|
||||
// METRICS:
|
||||
// ChaosMetrics tracks injected faults: s3Errors, s3Throttles, s3Delays, s3Corruptions
|
||||
//
|
||||
// EXAMPLES:
|
||||
// See tests/slow/S3ClientWorkloadWithChaos.toml for comprehensive test configurations
|
||||
// See fdbserver/workloads/S3ClientWorkload.actor.cpp for usage example
|
||||
|
||||
// S3 Operation Types for targeted chaos
|
||||
enum class S3Operation {
|
||||
READ, // GET, HEAD
|
||||
WRITE, // PUT (single and multipart)
|
||||
DELETE, // DELETE
|
||||
LIST, // List objects
|
||||
MULTIPART // Multipart operations (initiate, upload, complete, abort)
|
||||
};
|
||||
|
||||
// HTTP request handler with chaos injection for Mock S3 Server
|
||||
class MockS3ChaosRequestHandler : public HTTP::IRequestHandler, public ReferenceCounted<MockS3ChaosRequestHandler> {
|
||||
public:
|
||||
MockS3ChaosRequestHandler() : destructing(false) {}
|
||||
|
||||
// Prevent virtual function calls during destruction
|
||||
~MockS3ChaosRequestHandler() { destructing = true; }
|
||||
|
||||
Future<Void> handleRequest(Reference<HTTP::IncomingRequest> req,
|
||||
Reference<HTTP::OutgoingResponse> response) override;
|
||||
Reference<HTTP::IRequestHandler> clone() override;
|
||||
|
||||
void addref() override { ReferenceCounted<MockS3ChaosRequestHandler>::addref(); }
|
||||
void delref() override { ReferenceCounted<MockS3ChaosRequestHandler>::delref(); }
|
||||
|
||||
private:
|
||||
std::atomic<bool> destructing;
|
||||
};
|
||||
|
||||
// Public interface
|
||||
Future<Void> startMockS3ServerChaos(const NetworkAddress& listenAddress);
|
||||
|
||||
// Clear the chaos server registry (for testing/debugging only)
|
||||
// NOTE: In production simulation tests, the registry should NOT be cleared between tests,
|
||||
// as it must stay in sync with the simulator's persistent httpHandlers map to prevent
|
||||
// duplicate registration attempts that would trigger assertions.
|
||||
void clearMockS3ChaosRegistry();
|
||||
@@ -19,6 +19,7 @@
|
||||
*/
|
||||
|
||||
#include "fdbserver/MockS3Server.h"
|
||||
#include "fdbserver/MockS3ServerChaos.h"
|
||||
|
||||
#include "fdbrpc/HTTP.h"
|
||||
#include "fdbrpc/simulator.h"
|
||||
@@ -49,6 +50,14 @@ struct S3ClientWorkload : TestWorkload {
|
||||
std::string credentials;
|
||||
std::string simfdbDir;
|
||||
|
||||
// Chaos injection options
|
||||
bool enableChaos;
|
||||
double errorRate;
|
||||
double throttleRate;
|
||||
double delayRate;
|
||||
double corruptionRate;
|
||||
double maxDelay;
|
||||
|
||||
S3ClientWorkload(WorkloadContext const& wcx) : TestWorkload(wcx), enabled(true), pass(true) {
|
||||
s3Url = getOption(options, "s3Url"_sr, ""_sr).toString();
|
||||
if (s3Url.empty()) {
|
||||
@@ -58,6 +67,14 @@ struct S3ClientWorkload : TestWorkload {
|
||||
simfdbDir = getOption(options, "simfdb"_sr, "simfdb"_sr).toString();
|
||||
// Place credentials file in the simulation root, NOT inside the server's data dir (simfdbDir)
|
||||
credentials = "S3ClientWorkload.blob-credentials.json";
|
||||
|
||||
// Initialize chaos options
|
||||
enableChaos = getOption(options, "enableChaos"_sr, false);
|
||||
errorRate = getOption(options, "errorRate"_sr, 0.1);
|
||||
throttleRate = getOption(options, "throttleRate"_sr, 0.05);
|
||||
delayRate = getOption(options, "delayRate"_sr, 0.1);
|
||||
corruptionRate = getOption(options, "corruptionRate"_sr, 0.01);
|
||||
maxDelay = getOption(options, "maxDelay"_sr, 2.0);
|
||||
}
|
||||
~S3ClientWorkload() {
|
||||
if (pass) {
|
||||
@@ -96,32 +113,41 @@ private:
|
||||
}
|
||||
|
||||
// Add the basename of a file to the URL path
|
||||
// Uses S3BlobStoreEndpoint::fromString() for robust URL parsing (similar to
|
||||
// BlobMetadataUtils::getBlobMetadataPartitionedURL)
|
||||
static std::string addFileToUrl(std::string filePath, std::string baseUrl) {
|
||||
std::string basename = ::basename(const_cast<char*>(filePath.c_str()));
|
||||
|
||||
// Parse the URL and append the basename to the path
|
||||
try {
|
||||
// Find the position after the host:port part
|
||||
size_t hostEnd = baseUrl.find('/', baseUrl.find("://") + 3);
|
||||
if (hostEnd == std::string::npos) {
|
||||
hostEnd = baseUrl.length();
|
||||
std::string resource;
|
||||
std::string error;
|
||||
S3BlobStoreEndpoint::ParametersT parameters;
|
||||
Reference<S3BlobStoreEndpoint> endpoint =
|
||||
S3BlobStoreEndpoint::fromString(baseUrl, {}, &resource, &error, ¶meters);
|
||||
|
||||
if (!error.empty() || !endpoint) {
|
||||
TraceEvent(SevError, "S3ClientWorkloadURLParseError").detail("URL", baseUrl).detail("Error", error);
|
||||
throw backup_invalid_url();
|
||||
}
|
||||
|
||||
// Find the query string start
|
||||
size_t queryStart = baseUrl.find('?', hostEnd);
|
||||
if (queryStart == std::string::npos) {
|
||||
queryStart = baseUrl.length();
|
||||
// If there's an existing resource in the URL, find it and append the basename after it
|
||||
if (!resource.empty()) {
|
||||
size_t resourceStart = baseUrl.find(resource);
|
||||
if (resourceStart == std::string::npos) {
|
||||
throw backup_invalid_url();
|
||||
}
|
||||
// Insert "/basename" after the existing resource
|
||||
std::string separator = (resource.back() == '/') ? "" : "/";
|
||||
return baseUrl.insert(resourceStart + resource.size(), separator + basename);
|
||||
} else {
|
||||
// No resource in URL, need to insert before query string
|
||||
size_t queryStart = baseUrl.find('?');
|
||||
if (queryStart != std::string::npos) {
|
||||
return baseUrl.insert(queryStart, "/" + basename);
|
||||
} else {
|
||||
return baseUrl + "/" + basename;
|
||||
}
|
||||
}
|
||||
|
||||
// Get the current path
|
||||
std::string currentPath = baseUrl.substr(hostEnd, queryStart - hostEnd);
|
||||
// Ensure there's always a path separator
|
||||
if (currentPath.empty() || currentPath.back() != '/') {
|
||||
currentPath += '/';
|
||||
}
|
||||
|
||||
// Construct the new URL
|
||||
return baseUrl.substr(0, hostEnd) + currentPath + basename + baseUrl.substr(queryStart);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevError, "S3ClientWorkloadURLParseError")
|
||||
.error(e)
|
||||
@@ -165,10 +191,12 @@ private:
|
||||
// --- END PRE-TEST CLEANUP ---
|
||||
|
||||
// --- BEGIN PER-RUN ISOLATION & CLEANUP ---
|
||||
// Create a unique directory for this workload instance
|
||||
// Create a unique directory for this workload instance inside simfdb
|
||||
// This keeps test artifacts in the simulation directory like other workloads
|
||||
// Use deterministic directory name instead of random UID to ensure deterministic behavior
|
||||
state std::string uniqueRunDir =
|
||||
format("s3_workload_run_%08x_%08x", self->clientId, deterministicRandom()->randomInt(0, 1000000));
|
||||
joinPath(self->simfdbDir,
|
||||
format("s3_workload_run_%08x_%08x", self->clientId, deterministicRandom()->randomInt(0, 1000000)));
|
||||
try {
|
||||
platform::createDirectory(uniqueRunDir);
|
||||
TraceEvent(SevDebug, "S3ClientWorkloadCreatedRunDir").detail("Dir", uniqueRunDir);
|
||||
@@ -243,26 +271,44 @@ private:
|
||||
throw file_not_found();
|
||||
}
|
||||
|
||||
// Cleanup local files (now inside uniqueRunDir)
|
||||
// Cleanup local files - each operation handles its own errors non-fatally
|
||||
// Delete credentials file
|
||||
try {
|
||||
deleteFile(self->credentials);
|
||||
deleteFile(download);
|
||||
TraceEvent(SevDebug, "S3ClientWorkloadCleanedLocalFiles")
|
||||
.detail("CredentialsFile", self->credentials)
|
||||
.detail("DownloadFile", download);
|
||||
// Attempt to cleanup the unique run directory itself
|
||||
platform::eraseDirectoryRecursive(uniqueRunDir);
|
||||
TraceEvent(SevDebug, "S3ClientWorkloadCleanedRunDir").detail("Dir", uniqueRunDir);
|
||||
if (fileExists(self->credentials)) {
|
||||
deleteFile(self->credentials);
|
||||
TraceEvent(SevDebug, "S3ClientWorkloadDeletedCredentials").detail("File", self->credentials);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
// Log if cleanup fails, but don't fail the test just for this
|
||||
TraceEvent(SevWarn, "S3ClientWorkloadCleanupError").errorUnsuppressed(e);
|
||||
TraceEvent(SevWarn, "S3ClientWorkloadCredentialsCleanupFailed").error(e).detail("File", self->credentials);
|
||||
}
|
||||
|
||||
// Delete download file
|
||||
try {
|
||||
if (fileExists(download)) {
|
||||
deleteFile(download);
|
||||
TraceEvent(SevDebug, "S3ClientWorkloadDeletedDownload").detail("File", download);
|
||||
}
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "S3ClientWorkloadDownloadCleanupFailed").error(e).detail("File", download);
|
||||
}
|
||||
|
||||
// Delete run directory - may fail in simulation due to timing/locking
|
||||
try {
|
||||
platform::eraseDirectoryRecursive(uniqueRunDir);
|
||||
TraceEvent(SevInfo, "S3ClientWorkloadCleanedRunDir").detail("Dir", uniqueRunDir);
|
||||
} catch (Error& e) {
|
||||
TraceEvent(SevWarn, "S3ClientWorkloadRunDirCleanupFailed")
|
||||
.error(e)
|
||||
.detail("Dir", uniqueRunDir)
|
||||
.detail("Reason", "Non-fatal in simulation");
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
|
||||
ACTOR Future<Void> _setup(S3ClientWorkload* self, Database cx) {
|
||||
// Only client 0 registers the MockS3Server to avoid duplicates
|
||||
// Only client 0 registers the MockS3Server to avoid unnecessary duplicate trace events
|
||||
// Note: Both startMockS3ServerChaos() and registerSimHTTPServer() have internal duplicate detection
|
||||
if (self->clientId == 0) {
|
||||
// Check if we're using a local mock server URL pattern
|
||||
bool useMockS3 = self->s3Url.find("127.0.0.1") != std::string::npos ||
|
||||
@@ -270,17 +316,63 @@ private:
|
||||
self->s3Url.find("mock-s3-server") != std::string::npos;
|
||||
|
||||
if (useMockS3 && g_network->isSimulated()) {
|
||||
TraceEvent("S3ClientWorkload").detail("Phase", "Registering MockS3Server").detail("URL", self->s3Url);
|
||||
// Check if 127.0.0.1:8080 is already registered in simulator's httpHandlers
|
||||
std::string serverKey = "127.0.0.1:8080";
|
||||
bool alreadyRegistered = g_simulator->httpHandlers.count(serverKey) > 0;
|
||||
|
||||
// Register MockS3Server with IP address - simulation environment doesn't support hostname resolution.
|
||||
// Persistence is automatically enabled in registerMockS3Server()
|
||||
wait(registerMockS3Server("127.0.0.1", "8080"));
|
||||
if (alreadyRegistered) {
|
||||
TraceEvent("S3ClientWorkload")
|
||||
.detail("Phase", "MockS3Server Already Registered")
|
||||
.detail("Address", serverKey)
|
||||
.detail("ChaosRequested", self->enableChaos)
|
||||
.detail("Reason", "Reusing existing HTTP handler from previous test");
|
||||
} else if (self->enableChaos) {
|
||||
TraceEvent("S3ClientWorkload")
|
||||
.detail("Phase", "Starting MockS3ServerChaos")
|
||||
.detail("URL", self->s3Url);
|
||||
|
||||
TraceEvent("S3ClientWorkload")
|
||||
.detail("Phase", "MockS3Server Registered")
|
||||
.detail("Address", "127.0.0.1:8080");
|
||||
// Start MockS3ServerChaos - has internal duplicate detection
|
||||
NetworkAddress listenAddress(IPAddress(0x7f000001), 8080);
|
||||
wait(startMockS3ServerChaos(listenAddress));
|
||||
|
||||
TraceEvent("S3ClientWorkload")
|
||||
.detail("Phase", "MockS3ServerChaos Started")
|
||||
.detail("Address", "127.0.0.1:8080");
|
||||
} else {
|
||||
TraceEvent("S3ClientWorkload")
|
||||
.detail("Phase", "Registering MockS3Server")
|
||||
.detail("URL", self->s3Url);
|
||||
|
||||
// Register regular MockS3Server using the proper registration function
|
||||
// which automatically enables persistence
|
||||
wait(registerMockS3Server("127.0.0.1", "8080"));
|
||||
|
||||
TraceEvent("S3ClientWorkload")
|
||||
.detail("Phase", "MockS3Server Registered")
|
||||
.detail("Address", "127.0.0.1:8080");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Configure chaos rates for all clients if chaos is enabled
|
||||
// This allows each test to have different chaos rates
|
||||
if (self->enableChaos && g_network->isSimulated()) {
|
||||
auto injector = S3FaultInjector::injector();
|
||||
injector->setErrorRate(self->errorRate);
|
||||
injector->setThrottleRate(self->throttleRate);
|
||||
injector->setDelayRate(self->delayRate);
|
||||
injector->setCorruptionRate(self->corruptionRate);
|
||||
injector->setMaxDelay(self->maxDelay);
|
||||
|
||||
TraceEvent("S3ClientWorkload")
|
||||
.detail("Phase", "Chaos Configured")
|
||||
.detail("ClientID", self->clientId)
|
||||
.detail("ErrorRate", self->errorRate)
|
||||
.detail("ThrottleRate", self->throttleRate)
|
||||
.detail("DelayRate", self->delayRate)
|
||||
.detail("CorruptionRate", self->corruptionRate);
|
||||
}
|
||||
|
||||
return Void();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -38,6 +38,10 @@ struct ChaosMetrics {
|
||||
void clear();
|
||||
unsigned int diskDelays;
|
||||
unsigned int bitFlips;
|
||||
unsigned int s3Errors;
|
||||
unsigned int s3Throttles;
|
||||
unsigned int s3Delays;
|
||||
unsigned int s3Corruptions;
|
||||
double startTime;
|
||||
|
||||
void getFields(TraceEvent* e);
|
||||
@@ -81,4 +85,48 @@ private: // construction
|
||||
BitFlipper(BitFlipper const&) = delete;
|
||||
};
|
||||
|
||||
// S3 Fault Injector - Controls chaos injection rates for S3 operations
|
||||
struct S3FaultInjector {
|
||||
static S3FaultInjector* injector();
|
||||
|
||||
// Basic chaos rates (0.0-1.0)
|
||||
void setErrorRate(double rate) { errorRate = rate; }
|
||||
void setThrottleRate(double rate) { throttleRate = rate; }
|
||||
void setDelayRate(double rate) { delayRate = rate; }
|
||||
void setCorruptionRate(double rate) { corruptionRate = rate; }
|
||||
void setMaxDelay(double seconds) { maxDelay = seconds; }
|
||||
|
||||
double getErrorRate() const { return errorRate; }
|
||||
double getThrottleRate() const { return throttleRate; }
|
||||
double getDelayRate() const { return delayRate; }
|
||||
double getCorruptionRate() const { return corruptionRate; }
|
||||
double getMaxDelay() const { return maxDelay; }
|
||||
|
||||
// Operation-specific multipliers
|
||||
void setReadMultiplier(double mult) { readMultiplier = mult; }
|
||||
void setWriteMultiplier(double mult) { writeMultiplier = mult; }
|
||||
void setDeleteMultiplier(double mult) { deleteMultiplier = mult; }
|
||||
void setListMultiplier(double mult) { listMultiplier = mult; }
|
||||
|
||||
double getReadMultiplier() const { return readMultiplier; }
|
||||
double getWriteMultiplier() const { return writeMultiplier; }
|
||||
double getDeleteMultiplier() const { return deleteMultiplier; }
|
||||
double getListMultiplier() const { return listMultiplier; }
|
||||
|
||||
private: // members
|
||||
double errorRate = 0.0;
|
||||
double throttleRate = 0.0;
|
||||
double delayRate = 0.0;
|
||||
double corruptionRate = 0.0;
|
||||
double maxDelay = 5.0;
|
||||
double readMultiplier = 1.0;
|
||||
double writeMultiplier = 1.0;
|
||||
double deleteMultiplier = 1.0;
|
||||
double listMultiplier = 1.0;
|
||||
|
||||
private: // construction
|
||||
S3FaultInjector() = default;
|
||||
S3FaultInjector(S3FaultInjector const&) = delete;
|
||||
};
|
||||
|
||||
#endif // FLOW_CHAOSMETRICS_H
|
||||
|
||||
@@ -172,6 +172,7 @@ public:
|
||||
enMetrics = 20,
|
||||
enGrpcState = 21,
|
||||
enProxy = 22,
|
||||
enS3FaultInjector = 23,
|
||||
COUNT // Add new fields before this enumerator
|
||||
};
|
||||
|
||||
|
||||
@@ -37,7 +37,10 @@ void ChaosMetrics::clear() {
|
||||
}
|
||||
|
||||
void ChaosMetrics::getFields(TraceEvent* e) {
|
||||
std::pair<const char*, unsigned int> metrics[] = { { "DiskDelays", diskDelays }, { "BitFlips", bitFlips } };
|
||||
std::pair<const char*, unsigned int> metrics[] = {
|
||||
{ "DiskDelays", diskDelays }, { "BitFlips", bitFlips }, { "S3Errors", s3Errors },
|
||||
{ "S3Throttles", s3Throttles }, { "S3Delays", s3Delays }, { "S3Corruptions", s3Corruptions }
|
||||
};
|
||||
if (e != nullptr) {
|
||||
for (auto& m : metrics) {
|
||||
char c = m.first[0];
|
||||
@@ -107,6 +110,15 @@ BitFlipper* BitFlipper::flipper() {
|
||||
return static_cast<BitFlipper*>(res);
|
||||
}
|
||||
|
||||
S3FaultInjector* S3FaultInjector::injector() {
|
||||
auto res = g_network->global(INetwork::enS3FaultInjector);
|
||||
if (!res) {
|
||||
res = new S3FaultInjector();
|
||||
g_network->setGlobal(INetwork::enS3FaultInjector, res);
|
||||
}
|
||||
return static_cast<S3FaultInjector*>(res);
|
||||
}
|
||||
|
||||
bool IPAddress::operator==(const IPAddress& rhs) const {
|
||||
return addr == rhs.addr;
|
||||
}
|
||||
|
||||
@@ -162,6 +162,7 @@ if(WITH_PYTHON)
|
||||
add_fdb_test(TEST_FILES slow/BulkDumpingS3.toml)
|
||||
add_fdb_test(TEST_FILES fast/BulkLoading.toml)
|
||||
add_fdb_test(TEST_FILES slow/S3Client.toml)
|
||||
add_fdb_test(TEST_FILES slow/S3ClientWorkloadWithChaos.toml)
|
||||
add_fdb_test(TEST_FILES fast/CloggedSideband.toml)
|
||||
add_fdb_test(TEST_FILES fast/CompressionUtilsUnit.toml IGNORE)
|
||||
add_fdb_test(TEST_FILES fast/ConfigureLocked.toml)
|
||||
|
||||
58
tests/slow/S3ClientWorkloadWithChaos.toml
Normal file
58
tests/slow/S3ClientWorkloadWithChaos.toml
Normal file
@@ -0,0 +1,58 @@
|
||||
# S3ClientWorkload with Chaos Injection Tests
|
||||
# Tests the original S3ClientWorkload against MockS3ServerChaos
|
||||
|
||||
[[test]]
|
||||
# S3ClientWorkload with stable MockS3Server (chaos server with 0% injection)
|
||||
testTitle = "S3ClientWorkloadStable"
|
||||
|
||||
[[test.workload]]
|
||||
testName = "S3ClientWorkload"
|
||||
enableChaos = true
|
||||
s3Url = 'blobstore://testkey:testsecret:testtoken@127.0.0.1:8080/?bucket=s3clientworkload®ion=us-east-1&secure_connection=0&bypass_simulation=0&global_connection_pool=0'
|
||||
errorRate = 0.0
|
||||
throttleRate = 0.0
|
||||
delayRate = 0.0
|
||||
corruptionRate = 0.0
|
||||
maxDelay = 0.0
|
||||
|
||||
[[test]]
|
||||
# S3ClientWorkload with light chaos injection
|
||||
testTitle = "S3ClientWorkloadLightChaos"
|
||||
|
||||
[[test.workload]]
|
||||
testName = "S3ClientWorkload"
|
||||
enableChaos = true
|
||||
s3Url = 'blobstore://testkey:testsecret:testtoken@127.0.0.1:8080/?bucket=s3clientworkload®ion=us-east-1&secure_connection=0&bypass_simulation=0&global_connection_pool=0'
|
||||
errorRate = 0.05
|
||||
throttleRate = 0.02
|
||||
delayRate = 0.1
|
||||
corruptionRate = 0.01
|
||||
maxDelay = 1.0
|
||||
|
||||
[[test]]
|
||||
# S3ClientWorkload with medium chaos injection
|
||||
testTitle = "S3ClientWorkloadMediumChaos"
|
||||
|
||||
[[test.workload]]
|
||||
testName = "S3ClientWorkload"
|
||||
enableChaos = true
|
||||
s3Url = 'blobstore://testkey:testsecret:testtoken@127.0.0.1:8080/?bucket=s3clientworkload®ion=us-east-1&secure_connection=0&bypass_simulation=0&global_connection_pool=0'
|
||||
errorRate = 0.15
|
||||
throttleRate = 0.08
|
||||
delayRate = 0.2
|
||||
corruptionRate = 0.03
|
||||
maxDelay = 2.0
|
||||
|
||||
[[test]]
|
||||
# S3ClientWorkload with heavy chaos injection
|
||||
testTitle = "S3ClientWorkloadHeavyChaos"
|
||||
|
||||
[[test.workload]]
|
||||
testName = "S3ClientWorkload"
|
||||
enableChaos = true
|
||||
s3Url = 'blobstore://testkey:testsecret:testtoken@127.0.0.1:8080/?bucket=s3clientworkload®ion=us-east-1&secure_connection=0&bypass_simulation=0&global_connection_pool=0'
|
||||
errorRate = 0.3
|
||||
throttleRate = 0.15
|
||||
delayRate = 0.4
|
||||
corruptionRate = 0.05
|
||||
maxDelay = 3.0
|
||||
Reference in New Issue
Block a user