mirror of
https://github.com/apple/foundationdb.git
synced 2026-01-25 04:18:18 +00:00
Add support for the isolate process group annotation to shutdown fdbserver processes (#11464)
* Add support for the isolate process group annotation to shutdown fdbserver processes
This commit is contained in:
committed by
GitHub
parent
eebcf8224c
commit
ffd43514f4
47
fdbkubernetesmonitor/api/annotations.go
Normal file
47
fdbkubernetesmonitor/api/annotations.go
Normal file
@@ -0,0 +1,47 @@
|
||||
// annotations.go
|
||||
//
|
||||
// This source file is part of the FoundationDB open source project
|
||||
//
|
||||
// Copyright 2021-2024 Apple Inc. and the FoundationDB project authors
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
|
||||
package api
|
||||
|
||||
const (
|
||||
// CurrentConfigurationAnnotation is the annotation we use to store the
|
||||
// latest configuration.
|
||||
CurrentConfigurationAnnotation = "foundationdb.org/launcher-current-configuration"
|
||||
|
||||
// EnvironmentAnnotation is the annotation we use to store the environment
|
||||
// variables.
|
||||
EnvironmentAnnotation = "foundationdb.org/launcher-environment"
|
||||
|
||||
// OutdatedConfigMapAnnotation is the annotation we read to get notified of
|
||||
// outdated configuration.
|
||||
OutdatedConfigMapAnnotation = "foundationdb.org/outdated-config-map-seen"
|
||||
|
||||
// DelayShutdownAnnotation defines how long the FDB Kubernetes monitor process should sleep before shutting itself down.
|
||||
// The FDB Kubernetes monitor will always shutdown all fdbserver processes, independent of this setting.
|
||||
// The value of this annotation must be a duration like "60s".
|
||||
DelayShutdownAnnotation = "foundationdb.org/delay-shutdown"
|
||||
|
||||
// ClusterFileChangeDetectedAnnotation is the annotation that will be updated if the fdb.cluster file is updated.
|
||||
ClusterFileChangeDetectedAnnotation = "foundationdb.org/cluster-file-change"
|
||||
|
||||
// IsolateProcessGroupAnnotation is the annotation that defines if the current Pod should be isolated. Isolated
|
||||
// process groups will shutdown the fdbserver instance but keep the Pod and other Kubernetes resources running
|
||||
// for debugging purpose.
|
||||
IsolateProcessGroupAnnotation = "foundationdb.org/isolate-process-group"
|
||||
)
|
||||
@@ -25,13 +25,15 @@ import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
// ProcessConfiguration models the configuration for starting a FoundationDB
|
||||
// process.
|
||||
type ProcessConfiguration struct {
|
||||
// Version provides the version of FoundationDB the process should run.
|
||||
Version string `json:"version"`
|
||||
Version *Version `json:"version"`
|
||||
|
||||
// RunServers defines whether we should run the server processes.
|
||||
// This defaults to true, but you can set it to false to prevent starting
|
||||
@@ -183,3 +185,12 @@ func (configuration *ProcessConfiguration) GenerateArguments(processNumber int,
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// ShouldRunServers returns true if RunServers is unset or set to true.
|
||||
func (configuration *ProcessConfiguration) ShouldRunServers() bool {
|
||||
if configuration == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return pointer.BoolDeref(configuration.RunServers, true)
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ var _ = Describe("Testing FDB Kubernetes Monitor API", func() {
|
||||
BeforeEach(func() {
|
||||
var err error
|
||||
config := loadConfigFromFile(".testdata/default_config.json")
|
||||
Expect(config.Version).To(Equal(&Version{Major: 6, Minor: 3, Patch: 15}))
|
||||
arguments, err = config.GenerateArguments(1, map[string]string{
|
||||
"FDB_PUBLIC_IP": "10.0.0.1",
|
||||
"FDB_POD_IP": "192.168.0.1",
|
||||
@@ -245,4 +246,26 @@ var _ = Describe("Testing FDB Kubernetes Monitor API", func() {
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
When("marshalling a process configuration", func() {
|
||||
var out string
|
||||
|
||||
BeforeEach(func() {
|
||||
config := &ProcessConfiguration{
|
||||
Version: &Version{
|
||||
Major: 7,
|
||||
Minor: 1,
|
||||
Patch: 57,
|
||||
},
|
||||
}
|
||||
|
||||
data, err := json.Marshal(config)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
out = string(data)
|
||||
})
|
||||
|
||||
It("should parse it correct", func() {
|
||||
Expect(out).To(Equal("{\"version\":\"7.1.57\"}"))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2023-2024 Apple Inc. and the FoundationDB project authors
|
||||
* Copyright 2021-2024 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
||||
185
fdbkubernetesmonitor/api/version.go
Normal file
185
fdbkubernetesmonitor/api/version.go
Normal file
@@ -0,0 +1,185 @@
|
||||
/*
|
||||
* version.go
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2021-2024 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Version represents a version of FoundationDB.
|
||||
//
|
||||
// This provides convenience methods for checking features available in
|
||||
// different versions.
|
||||
type Version struct {
|
||||
// Major is the major version
|
||||
Major int
|
||||
|
||||
// Minor is the minor version
|
||||
Minor int
|
||||
|
||||
// Patch is the patch version
|
||||
Patch int
|
||||
|
||||
// ReleaseCandidate is the number from the `-rc\d+` suffix version
|
||||
// of the version if it exists
|
||||
ReleaseCandidate int
|
||||
}
|
||||
|
||||
// versionRegex describes the format of a FoundationDB version.
|
||||
var versionRegex = regexp.MustCompile(`(\d+)\.(\d+)\.(\d+)(-rc(\d+))?`)
|
||||
|
||||
// MarshalJSON custom implementation of MarshalJSON
|
||||
func (version *Version) MarshalJSON() ([]byte, error) {
|
||||
return []byte(fmt.Sprintf("\"%s\"", version.String())), nil
|
||||
}
|
||||
|
||||
// UnmarshalJSON custom implementation of UnmarshalJSON
|
||||
func (version *Version) UnmarshalJSON(data []byte) error {
|
||||
trimmed := strings.Trim(string(data), "\"")
|
||||
currentVersion, err := ParseFdbVersion(trimmed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
version.Major = currentVersion.Major
|
||||
version.Minor = currentVersion.Minor
|
||||
version.Patch = currentVersion.Patch
|
||||
version.ReleaseCandidate = currentVersion.ReleaseCandidate
|
||||
return nil
|
||||
}
|
||||
|
||||
// ParseFdbVersion parses a version from its string representation.
|
||||
func ParseFdbVersion(version string) (Version, error) {
|
||||
matches := versionRegex.FindStringSubmatch(version)
|
||||
if matches == nil {
|
||||
return Version{}, fmt.Errorf("could not parse FDB version from %s", version)
|
||||
}
|
||||
|
||||
major, err := strconv.Atoi(matches[1])
|
||||
if err != nil {
|
||||
return Version{}, err
|
||||
}
|
||||
|
||||
minor, err := strconv.Atoi(matches[2])
|
||||
if err != nil {
|
||||
return Version{}, err
|
||||
}
|
||||
|
||||
patch, err := strconv.Atoi(matches[3])
|
||||
if err != nil {
|
||||
return Version{}, err
|
||||
}
|
||||
|
||||
rc, err := strconv.Atoi(matches[5])
|
||||
if err != nil {
|
||||
rc = 0
|
||||
}
|
||||
|
||||
return Version{Major: major, Minor: minor, Patch: patch, ReleaseCandidate: rc}, nil
|
||||
}
|
||||
|
||||
// String gets the string representation of an FDB version.
|
||||
func (version Version) String() string {
|
||||
if version.ReleaseCandidate == 0 {
|
||||
return fmt.Sprintf("%d.%d.%d", version.Major, version.Minor, version.Patch)
|
||||
}
|
||||
return fmt.Sprintf("%d.%d.%d-rc%d", version.Major, version.Minor, version.Patch, version.ReleaseCandidate)
|
||||
}
|
||||
|
||||
// Compact prints the version in the major.minor format.
|
||||
func (version Version) Compact() string {
|
||||
return fmt.Sprintf("%d.%d", version.Major, version.Minor)
|
||||
}
|
||||
|
||||
// IsAtLeast determines if a version is greater than or equal to another version.
|
||||
func (version Version) IsAtLeast(other Version) bool {
|
||||
if version.Major < other.Major {
|
||||
return false
|
||||
}
|
||||
if version.Major > other.Major {
|
||||
return true
|
||||
}
|
||||
if version.Minor < other.Minor {
|
||||
return false
|
||||
}
|
||||
if version.Minor > other.Minor {
|
||||
return true
|
||||
}
|
||||
if version.Patch < other.Patch {
|
||||
return false
|
||||
}
|
||||
if version.Patch > other.Patch {
|
||||
return true
|
||||
}
|
||||
if version.ReleaseCandidate == 0 {
|
||||
return true
|
||||
}
|
||||
if other.ReleaseCandidate == 0 {
|
||||
return false
|
||||
}
|
||||
if version.ReleaseCandidate < other.ReleaseCandidate {
|
||||
return false
|
||||
}
|
||||
if version.ReleaseCandidate > other.ReleaseCandidate {
|
||||
return true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// GetBinaryVersion Returns a version string compatible with the log implemented in the sidecars
|
||||
func (version Version) GetBinaryVersion() string {
|
||||
if version.ReleaseCandidate > 0 {
|
||||
return version.String()
|
||||
}
|
||||
return version.Compact()
|
||||
}
|
||||
|
||||
// IsProtocolCompatible determines whether two versions of FDB are protocol
|
||||
// compatible.
|
||||
func (version Version) IsProtocolCompatible(other Version) bool {
|
||||
return version.Major == other.Major && version.Minor == other.Minor && version.ReleaseCandidate == other.ReleaseCandidate
|
||||
}
|
||||
|
||||
// NextMajorVersion returns the next major version of FoundationDB.
|
||||
func (version Version) NextMajorVersion() Version {
|
||||
return Version{Major: version.Major + 1, Minor: 0, Patch: 0}
|
||||
}
|
||||
|
||||
// NextMinorVersion returns the next minor version of FoundationDB.
|
||||
func (version Version) NextMinorVersion() Version {
|
||||
return Version{Major: version.Major, Minor: version.Minor + 1, Patch: 0}
|
||||
}
|
||||
|
||||
// NextPatchVersion returns the next patch version of FoundationDB.
|
||||
func (version Version) NextPatchVersion() Version {
|
||||
return Version{Major: version.Major, Minor: version.Minor, Patch: version.Patch + 1}
|
||||
}
|
||||
|
||||
// Equal checks if two Version are the same.
|
||||
func (version Version) Equal(other Version) bool {
|
||||
return version.Major == other.Major &&
|
||||
version.Minor == other.Minor &&
|
||||
version.Patch == other.Patch &&
|
||||
version.ReleaseCandidate == other.ReleaseCandidate
|
||||
}
|
||||
122
fdbkubernetesmonitor/api/version_test.go
Normal file
122
fdbkubernetesmonitor/api/version_test.go
Normal file
@@ -0,0 +1,122 @@
|
||||
/*
|
||||
* version_test.go
|
||||
*
|
||||
* This source file is part of the FoundationDB open source project
|
||||
*
|
||||
* Copyright 2021-2024 Apple Inc. and the FoundationDB project authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
//"encoding/json"
|
||||
//"os"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("[api] FDBVersion", func() {
|
||||
// TODO test a json marshal and unmarshl!
|
||||
|
||||
When("checking if the protocol and the version are compatible", func() {
|
||||
It("should return the correct compatibility", func() {
|
||||
version := Version{Major: 6, Minor: 2, Patch: 20}
|
||||
Expect(version.IsProtocolCompatible(Version{Major: 6, Minor: 2, Patch: 20})).To(BeTrue())
|
||||
Expect(version.IsProtocolCompatible(Version{Major: 6, Minor: 2, Patch: 22})).To(BeTrue())
|
||||
Expect(version.IsProtocolCompatible(Version{Major: 6, Minor: 3, Patch: 0})).To(BeFalse())
|
||||
Expect(version.IsProtocolCompatible(Version{Major: 6, Minor: 3, Patch: 20})).To(BeFalse())
|
||||
Expect(version.IsProtocolCompatible(Version{Major: 7, Minor: 2, Patch: 20})).To(BeFalse())
|
||||
})
|
||||
|
||||
When("release candidates differ", func() {
|
||||
It("should be incompatible", func() {
|
||||
version := Version{Major: 7, Minor: 0, Patch: 0, ReleaseCandidate: 1}
|
||||
Expect(version.IsProtocolCompatible(Version{Major: 7, Minor: 0, Patch: 0, ReleaseCandidate: 2})).To(BeFalse())
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Context("Using the fdb version", func() {
|
||||
It("should return the fdb version struct", func() {
|
||||
version, err := ParseFdbVersion("6.2.11")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(version).To(Equal(Version{Major: 6, Minor: 2, Patch: 11}))
|
||||
|
||||
version, err = ParseFdbVersion("prerelease-6.2.11")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(version).To(Equal(Version{Major: 6, Minor: 2, Patch: 11}))
|
||||
|
||||
version, err = ParseFdbVersion("test-6.2.11-test")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(version).To(Equal(Version{Major: 6, Minor: 2, Patch: 11, ReleaseCandidate: 0}))
|
||||
|
||||
version, err = ParseFdbVersion("7.0.0")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(version).To(Equal(Version{Major: 7, Minor: 0, Patch: 0, ReleaseCandidate: 0}))
|
||||
|
||||
version, err = ParseFdbVersion("7.0.0-rc1")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(version).To(Equal(Version{Major: 7, Minor: 0, Patch: 0, ReleaseCandidate: 1}))
|
||||
|
||||
version, err = ParseFdbVersion("7.1.0-rc39")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(version).To(Equal(Version{Major: 7, Minor: 1, Patch: 0, ReleaseCandidate: 39}))
|
||||
|
||||
_, err = ParseFdbVersion("6.2")
|
||||
Expect(err).To(HaveOccurred())
|
||||
Expect(err.Error()).To(Equal("could not parse FDB version from 6.2"))
|
||||
})
|
||||
|
||||
It("should format the version correctly", func() {
|
||||
version := Version{Major: 6, Minor: 2, Patch: 11}
|
||||
Expect(version.String()).To(Equal("6.2.11"))
|
||||
version = Version{Major: 6, Minor: 2, Patch: 11, ReleaseCandidate: 0}
|
||||
Expect(version.String()).To(Equal("6.2.11"))
|
||||
version = Version{Major: 6, Minor: 2, Patch: 11, ReleaseCandidate: 1}
|
||||
Expect(version.String()).To(Equal("6.2.11-rc1"))
|
||||
})
|
||||
})
|
||||
|
||||
When("getting the next version of the current FDBVersion", func() {
|
||||
It("should return the correct next version", func() {
|
||||
version := Version{Major: 6, Minor: 2, Patch: 20}
|
||||
Expect(version.NextMajorVersion()).To(Equal(Version{Major: 7, Minor: 0, Patch: 0}))
|
||||
Expect(version.NextMinorVersion()).To(Equal(Version{Major: version.Major, Minor: 3, Patch: 0}))
|
||||
Expect(version.NextPatchVersion()).To(Equal(Version{Major: version.Major, Minor: version.Minor, Patch: 21}))
|
||||
})
|
||||
})
|
||||
|
||||
When("comparing two FDBVersions", func() {
|
||||
It("should return if they are equal", func() {
|
||||
version := Version{Major: 6, Minor: 2, Patch: 20}
|
||||
Expect(version.Equal(version)).To(BeTrue())
|
||||
Expect(version.Equal(Version{Major: 7, Minor: 0, Patch: 0})).To(BeFalse())
|
||||
Expect(version.Equal(Version{Major: 7, Minor: 0, Patch: 0})).To(BeFalse())
|
||||
Expect(version.Equal(Version{Major: 6, Minor: 3, Patch: 20})).To(BeFalse())
|
||||
Expect(version.Equal(Version{Major: 6, Minor: 2, Patch: 21})).To(BeFalse())
|
||||
})
|
||||
|
||||
It("should return correct result for IsAtleast", func() {
|
||||
version := Version{Major: 7, Minor: 1, Patch: 0, ReleaseCandidate: 2}
|
||||
Expect(version.IsAtLeast(Version{Major: 7, Minor: 1, Patch: 0})).To(BeFalse())
|
||||
Expect(version.IsAtLeast(Version{Major: 7, Minor: 1, Patch: 0, ReleaseCandidate: 1})).To(BeTrue())
|
||||
Expect(version.IsAtLeast(Version{Major: 7, Minor: 1, Patch: 0, ReleaseCandidate: 3})).To(BeFalse())
|
||||
|
||||
version = Version{Major: 7, Minor: 1, Patch: 0}
|
||||
Expect(version.IsAtLeast(Version{Major: 7, Minor: 1, Patch: 0, ReleaseCandidate: 1})).To(BeTrue())
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -81,7 +81,7 @@ func copyFile(logger logr.Logger, inputPath string, outputPath string, required
|
||||
}
|
||||
|
||||
// CopyFiles copies a list of files into the output directory.
|
||||
func CopyFiles(logger logr.Logger, outputDir string, copyDetails map[string]string, requiredCopies map[string]bool) error {
|
||||
func copyFiles(logger logr.Logger, outputDir string, copyDetails map[string]string, requiredCopies map[string]bool) error {
|
||||
for inputPath, outputSubpath := range copyDetails {
|
||||
if outputSubpath == "" {
|
||||
outputSubpath = path.Base(inputPath)
|
||||
|
||||
@@ -311,7 +311,7 @@ var _ = Describe("Testing the copy methods", func() {
|
||||
copyDetails, _, err := getCopyDetails("", "", "", nil, binaries, nil, nil, "7.1.43", executionModeInit)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(copyDetails).To(HaveLen(3))
|
||||
Expect(CopyFiles(GinkgoLogr, outputBinaryDir, copyDetails, map[string]bool{})).NotTo(HaveOccurred())
|
||||
Expect(copyFiles(GinkgoLogr, outputBinaryDir, copyDetails, map[string]bool{})).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should copy all the files", func() {
|
||||
@@ -328,7 +328,7 @@ var _ = Describe("Testing the copy methods", func() {
|
||||
copyDetails, _, err := getCopyDetails("", "", "", nil, binaries, nil, nil, "7.1.43", executionModeSidecar)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(copyDetails).To(HaveLen(3))
|
||||
Expect(CopyFiles(GinkgoLogr, outputBinaryDir, copyDetails, map[string]bool{})).NotTo(HaveOccurred())
|
||||
Expect(copyFiles(GinkgoLogr, outputBinaryDir, copyDetails, map[string]bool{})).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should copy all the files", func() {
|
||||
@@ -366,7 +366,7 @@ var _ = Describe("Testing the copy methods", func() {
|
||||
copyDetails, _, err = getCopyDetails("", "", "", nil, nil, libraries, nil, "7.1.43", executionModeInit)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(copyDetails).To(HaveLen(3))
|
||||
Expect(CopyFiles(GinkgoLogr, outputLibraryDir, copyDetails, map[string]bool{})).NotTo(HaveOccurred())
|
||||
Expect(copyFiles(GinkgoLogr, outputLibraryDir, copyDetails, map[string]bool{})).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should copy all the files", func() {
|
||||
@@ -384,7 +384,7 @@ var _ = Describe("Testing the copy methods", func() {
|
||||
copyDetails, _, err = getCopyDetails("", "7.1", "", nil, nil, libraries, nil, "7.1.43", executionModeInit)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(copyDetails).To(HaveLen(3))
|
||||
Expect(CopyFiles(GinkgoLogr, outputLibraryDir, copyDetails, map[string]bool{})).NotTo(HaveOccurred())
|
||||
Expect(copyFiles(GinkgoLogr, outputLibraryDir, copyDetails, map[string]bool{})).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should copy all the files", func() {
|
||||
@@ -413,7 +413,7 @@ var _ = Describe("Testing the copy methods", func() {
|
||||
copyDetails, _, err = getCopyDetails(testInputDir, "", "", []string{"testfile"}, nil, nil, nil, "7.1.43", executionModeInit)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(copyDetails).To(HaveLen(1))
|
||||
Expect(CopyFiles(GinkgoLogr, testOutputDir, copyDetails, map[string]bool{})).NotTo(HaveOccurred())
|
||||
Expect(copyFiles(GinkgoLogr, testOutputDir, copyDetails, map[string]bool{})).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should copy the file", func() {
|
||||
@@ -434,7 +434,7 @@ var _ = Describe("Testing the copy methods", func() {
|
||||
|
||||
When("the file is empty", func() {
|
||||
It("should not copy the file", func() {
|
||||
Expect(CopyFiles(GinkgoLogr, testOutputDir, copyDetails, requiredFiles)).To(HaveOccurred())
|
||||
Expect(copyFiles(GinkgoLogr, testOutputDir, copyDetails, requiredFiles)).To(HaveOccurred())
|
||||
Expect(path.Join(testOutputDir, "testfile")).NotTo(BeAnExistingFile())
|
||||
})
|
||||
})
|
||||
@@ -445,7 +445,7 @@ var _ = Describe("Testing the copy methods", func() {
|
||||
})
|
||||
|
||||
It("should copy the file", func() {
|
||||
Expect(CopyFiles(GinkgoLogr, testOutputDir, copyDetails, requiredFiles)).NotTo(HaveOccurred())
|
||||
Expect(copyFiles(GinkgoLogr, testOutputDir, copyDetails, requiredFiles)).NotTo(HaveOccurred())
|
||||
Expect(path.Join(testOutputDir, "testfile")).To(BeAnExistingFile())
|
||||
})
|
||||
})
|
||||
|
||||
@@ -43,44 +43,23 @@ import (
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
)
|
||||
|
||||
const (
|
||||
// CurrentConfigurationAnnotation is the annotation we use to store the
|
||||
// latest configuration.
|
||||
CurrentConfigurationAnnotation = "foundationdb.org/launcher-current-configuration"
|
||||
|
||||
// EnvironmentAnnotation is the annotation we use to store the environment
|
||||
// variables.
|
||||
EnvironmentAnnotation = "foundationdb.org/launcher-environment"
|
||||
|
||||
// OutdatedConfigMapAnnotation is the annotation we read to get notified of
|
||||
// outdated configuration.
|
||||
OutdatedConfigMapAnnotation = "foundationdb.org/outdated-config-map-seen"
|
||||
|
||||
// DelayShutdownAnnotation defines how long the FDB Kubernetes monitor process should sleep before shutting itself down.
|
||||
// The FDB Kubernetes monitor will always shutdown all fdbserver processes, independent of this setting.
|
||||
// The value of this annotation must be a duration like "60s".
|
||||
DelayShutdownAnnotation = "foundationdb.org/delay-shutdown"
|
||||
|
||||
// ClusterFileChangeDetectedAnnotation is the annotation that will be updated if the fdb.cluster file is updated.
|
||||
ClusterFileChangeDetectedAnnotation = "foundationdb.org/cluster-file-change"
|
||||
)
|
||||
|
||||
// PodClient is a wrapper around the pod API.
|
||||
type PodClient struct {
|
||||
// kubernetesClient is a wrapper around the Kubernetes API.
|
||||
type kubernetesClient struct {
|
||||
// podMetadata is the latest metadata that was seen by the fdb-kubernetes-monitor for the according Pod.
|
||||
podMetadata *metav1.PartialObjectMetadata
|
||||
|
||||
// nodeMetadata is the latest metadata that was seen by the fdb-kubernetes-monitor for the according node that hosts the Pod.
|
||||
nodeMetadata *metav1.PartialObjectMetadata
|
||||
|
||||
// TimestampFeed is a channel where the pod client will send updates with
|
||||
// the values from OutdatedConfigMapAnnotation.
|
||||
// TimestampFeed is a channel where the kubernetes client will send updates with
|
||||
// the values from api.OutdatedConfigMapAnnotation. If the api.IsolateProcessGroupAnnotation is changed the current
|
||||
// timestamp will be sent to the channel to force the monitor to change its configuration accordingly.
|
||||
TimestampFeed chan int64
|
||||
|
||||
// Logger is the logger we use for this client.
|
||||
Logger logr.Logger
|
||||
|
||||
// Adds the controller runtime client to the PodClient.
|
||||
// Adds the controller runtime client to the kubernetesClient.
|
||||
client.Client
|
||||
}
|
||||
|
||||
@@ -124,14 +103,14 @@ func setupCache(namespace string, podName string, nodeName string) (client.WithW
|
||||
return internalClient, internalCache, nil
|
||||
}
|
||||
|
||||
// CreatePodClient creates a new client for working with the pod object.
|
||||
func CreatePodClient(ctx context.Context, logger logr.Logger, enableNodeWatcher bool, setupCache func(string, string, string) (client.WithWatch, cache.Cache, error)) (*PodClient, error) {
|
||||
// createPodClient creates a new client for working with the pod object.
|
||||
func createPodClient(ctx context.Context, logger logr.Logger, enableNodeWatcher bool, setupCache func(string, string, string) (client.WithWatch, cache.Cache, error)) (*kubernetesClient, error) {
|
||||
namespace := os.Getenv("FDB_POD_NAMESPACE")
|
||||
podName := os.Getenv("FDB_POD_NAME")
|
||||
nodeName := os.Getenv("FDB_NODE_NAME")
|
||||
|
||||
internalClient, internalCache, err := setupCache(namespace, podName, nodeName)
|
||||
podClient := &PodClient{
|
||||
podClient := &kubernetesClient{
|
||||
podMetadata: nil,
|
||||
nodeMetadata: nil,
|
||||
TimestampFeed: make(chan int64, 10),
|
||||
@@ -185,7 +164,7 @@ func CreatePodClient(ctx context.Context, logger logr.Logger, enableNodeWatcher
|
||||
|
||||
podClient.Client = controllerClient
|
||||
|
||||
// Fetch the current metadata before returning the PodClient
|
||||
// Fetch the current metadata before returning the kubernetesClient
|
||||
currentPodMetadata := &metav1.PartialObjectMetadata{}
|
||||
currentPodMetadata.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod"))
|
||||
err = podClient.Client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: podName}, currentPodMetadata)
|
||||
@@ -213,9 +192,9 @@ func CreatePodClient(ctx context.Context, logger logr.Logger, enableNodeWatcher
|
||||
|
||||
// retrieveEnvironmentVariables extracts the environment variables we have for
|
||||
// an argument into a map.
|
||||
func retrieveEnvironmentVariables(monitor *Monitor, argument api.Argument, target map[string]string) {
|
||||
func retrieveEnvironmentVariables(monitor *monitor, argument api.Argument, target map[string]string) {
|
||||
if argument.Source != "" {
|
||||
value, err := argument.LookupEnv(monitor.CustomEnvironment)
|
||||
value, err := argument.LookupEnv(monitor.customEnvironment)
|
||||
if err == nil {
|
||||
target[argument.Source] = value
|
||||
}
|
||||
@@ -227,39 +206,44 @@ func retrieveEnvironmentVariables(monitor *Monitor, argument api.Argument, targe
|
||||
}
|
||||
}
|
||||
|
||||
// UpdateAnnotations updates annotations on the pod after loading new
|
||||
// updateAnnotations updates annotations on the pod after loading new
|
||||
// configuration.
|
||||
func (podClient *PodClient) UpdateAnnotations(monitor *Monitor) error {
|
||||
func (podClient *kubernetesClient) updateAnnotations(monitor *monitor) error {
|
||||
environment := make(map[string]string)
|
||||
for _, argument := range monitor.ActiveConfiguration.Arguments {
|
||||
for _, argument := range monitor.activeConfiguration.Arguments {
|
||||
retrieveEnvironmentVariables(monitor, argument, environment)
|
||||
}
|
||||
environment["BINARY_DIR"] = path.Dir(monitor.ActiveConfiguration.BinaryPath)
|
||||
environment["BINARY_DIR"] = path.Dir(monitor.activeConfiguration.BinaryPath)
|
||||
jsonEnvironment, err := json.Marshal(environment)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return podClient.updateAnnotationsOnPod(map[string]string{
|
||||
CurrentConfigurationAnnotation: string(monitor.ActiveConfigurationBytes),
|
||||
EnvironmentAnnotation: string(jsonEnvironment),
|
||||
api.CurrentConfigurationAnnotation: string(monitor.activeConfigurationBytes),
|
||||
api.EnvironmentAnnotation: string(jsonEnvironment),
|
||||
})
|
||||
}
|
||||
|
||||
// updateFdbClusterTimestampAnnotation updates the ClusterFileChangeDetectedAnnotation annotation on the pod
|
||||
// after a change to the fdb.cluster file was detected, e.g. because the coordinators were changed.
|
||||
func (podClient *PodClient) updateFdbClusterTimestampAnnotation() error {
|
||||
func (podClient *kubernetesClient) updateFdbClusterTimestampAnnotation() error {
|
||||
return podClient.updateAnnotationsOnPod(map[string]string{
|
||||
ClusterFileChangeDetectedAnnotation: strconv.FormatInt(time.Now().Unix(), 10),
|
||||
api.ClusterFileChangeDetectedAnnotation: strconv.FormatInt(time.Now().Unix(), 10),
|
||||
})
|
||||
}
|
||||
|
||||
// updateAnnotationsOnPod will update the annotations with the provided annotationChanges. If an annotation exists, it
|
||||
// will be updated if the annotation is absent it will be added.
|
||||
func (podClient *PodClient) updateAnnotationsOnPod(annotationChanges map[string]string) error {
|
||||
func (podClient *kubernetesClient) updateAnnotationsOnPod(annotationChanges map[string]string) error {
|
||||
if podClient.podMetadata == nil {
|
||||
return fmt.Errorf("pod client has no metadata present")
|
||||
}
|
||||
|
||||
annotations := podClient.podMetadata.Annotations
|
||||
if len(annotations) == 0 {
|
||||
annotations = map[string]string{}
|
||||
}
|
||||
|
||||
if !podClient.podMetadata.DeletionTimestamp.IsZero() {
|
||||
return fmt.Errorf("pod is marked for deletion, cannot update annotations")
|
||||
@@ -304,7 +288,7 @@ func (podClient *PodClient) updateAnnotationsOnPod(annotationChanges map[string]
|
||||
}
|
||||
|
||||
// OnAdd is called when an object is added.
|
||||
func (podClient *PodClient) OnAdd(obj interface{}) {
|
||||
func (podClient *kubernetesClient) OnAdd(obj interface{}) {
|
||||
switch castedObj := obj.(type) {
|
||||
case *corev1.Pod:
|
||||
podClient.Logger.Info("Got event for OnAdd for Pod resource", "name", castedObj.Name, "namespace", castedObj.Namespace)
|
||||
@@ -324,10 +308,15 @@ func (podClient *PodClient) OnAdd(obj interface{}) {
|
||||
// OnUpdate is also called when a re-list happens, and it will
|
||||
// get called even if nothing changed. This is useful for periodically
|
||||
// evaluating or syncing something.
|
||||
func (podClient *PodClient) OnUpdate(_, newObj interface{}) {
|
||||
func (podClient *kubernetesClient) OnUpdate(_, newObj interface{}) {
|
||||
switch castedObj := newObj.(type) {
|
||||
case *corev1.Pod:
|
||||
podClient.Logger.Info("Got event for OnUpdate for Pod resource", "name", castedObj.Name, "namespace", castedObj.Namespace, "generation", castedObj.Generation)
|
||||
var previousAnnotations map[string]string
|
||||
if podClient.podMetadata != nil {
|
||||
previousAnnotations = podClient.podMetadata.Annotations
|
||||
}
|
||||
|
||||
podClient.podMetadata = &metav1.PartialObjectMetadata{
|
||||
TypeMeta: castedObj.TypeMeta,
|
||||
ObjectMeta: castedObj.ObjectMeta,
|
||||
@@ -337,14 +326,25 @@ func (podClient *PodClient) OnUpdate(_, newObj interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
annotation := podClient.podMetadata.Annotations[OutdatedConfigMapAnnotation]
|
||||
// If the IsolateProcessGroupAnnotation changes force a reload of the configuration to make sure the processes
|
||||
// will be shutdown.
|
||||
previousIsolateProcessGroupAnnotationValue := previousAnnotations[api.IsolateProcessGroupAnnotation]
|
||||
newIsolateProcessGroupAnnotationValue := podClient.podMetadata.Annotations[api.IsolateProcessGroupAnnotation]
|
||||
if previousIsolateProcessGroupAnnotationValue != newIsolateProcessGroupAnnotationValue {
|
||||
podClient.Logger.Info("Got change in isolate process group annotation", "previous", previousIsolateProcessGroupAnnotationValue, "new", newIsolateProcessGroupAnnotationValue)
|
||||
podClient.TimestampFeed <- time.Now().Unix()
|
||||
// In this case we can return as the timestamp feed already has a new value.
|
||||
return
|
||||
}
|
||||
|
||||
annotation := podClient.podMetadata.Annotations[api.OutdatedConfigMapAnnotation]
|
||||
if annotation == "" {
|
||||
return
|
||||
}
|
||||
|
||||
timestamp, err := strconv.ParseInt(annotation, 10, 64)
|
||||
if err != nil {
|
||||
podClient.Logger.Error(err, "Error parsing annotation", "key", OutdatedConfigMapAnnotation, "rawAnnotation", annotation)
|
||||
podClient.Logger.Error(err, "Error parsing annotation", "key", api.OutdatedConfigMapAnnotation, "rawAnnotation", annotation)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -362,7 +362,7 @@ func (podClient *PodClient) OnUpdate(_, newObj interface{}) {
|
||||
// it will get an object of type DeletedFinalStateUnknown. This can
|
||||
// happen if the watch is closed and misses the delete event and we don't
|
||||
// notice the deletion until the subsequent re-list.
|
||||
func (podClient *PodClient) OnDelete(obj interface{}) {
|
||||
func (podClient *kubernetesClient) OnDelete(obj interface{}) {
|
||||
switch castedObj := obj.(type) {
|
||||
case *corev1.Pod:
|
||||
podClient.Logger.Info("Got event for OnDelete for Pod resource", "name", castedObj.Name, "namespace", castedObj.Namespace)
|
||||
|
||||
@@ -42,7 +42,7 @@ import (
|
||||
var _ = Describe("Testing FDB Pod client", func() {
|
||||
var enableNodeWatcher bool
|
||||
var fakeClient client.WithWatch
|
||||
var podClient *PodClient
|
||||
var podClient *kubernetesClient
|
||||
var namespace, podName, nodeName string
|
||||
var internalCache *informertest.FakeInformers
|
||||
|
||||
@@ -63,10 +63,10 @@ var _ = Describe("Testing FDB Pod client", func() {
|
||||
internalCache.Scheme = fakeClient.Scheme()
|
||||
})
|
||||
|
||||
When("the PodClient was started", func() {
|
||||
When("the kubernetesClient was started", func() {
|
||||
JustBeforeEach(func() {
|
||||
var err error
|
||||
podClient, err = CreatePodClient(context.Background(), GinkgoLogr, enableNodeWatcher, func(fncNamespace string, fncPodName string, fncNodeName string) (client.WithWatch, cache.Cache, error) {
|
||||
podClient, err = createPodClient(context.Background(), GinkgoLogr, enableNodeWatcher, func(fncNamespace string, fncPodName string, fncNodeName string) (client.WithWatch, cache.Cache, error) {
|
||||
Expect(fncNamespace).To(Equal(namespace))
|
||||
Expect(fncPodName).To(Equal(podName))
|
||||
Expect(fncNodeName).To(Equal(nodeName))
|
||||
@@ -102,10 +102,10 @@ var _ = Describe("Testing FDB Pod client", func() {
|
||||
})
|
||||
})
|
||||
|
||||
When("the PodClient handles events", func() {
|
||||
When("the kubernetesClient handles events", func() {
|
||||
BeforeEach(func() {
|
||||
var err error
|
||||
podClient, err = CreatePodClient(context.Background(), GinkgoLogr, enableNodeWatcher, func(fncNamespace string, fncPodName string, fncNodeName string) (client.WithWatch, cache.Cache, error) {
|
||||
podClient, err = createPodClient(context.Background(), GinkgoLogr, enableNodeWatcher, func(fncNamespace string, fncPodName string, fncNodeName string) (client.WithWatch, cache.Cache, error) {
|
||||
Expect(fncNamespace).To(Equal(namespace))
|
||||
Expect(fncPodName).To(Equal(podName))
|
||||
Expect(fncNodeName).To(Equal(nodeName))
|
||||
@@ -173,7 +173,7 @@ var _ = Describe("Testing FDB Pod client", func() {
|
||||
BeforeEach(func() {
|
||||
timestamp = time.Now().Unix()
|
||||
pod.Annotations = map[string]string{
|
||||
OutdatedConfigMapAnnotation: strconv.FormatInt(timestamp, 10),
|
||||
api.OutdatedConfigMapAnnotation: strconv.FormatInt(timestamp, 10),
|
||||
}
|
||||
fakeInformer.Update(nil, pod)
|
||||
})
|
||||
@@ -193,7 +193,7 @@ var _ = Describe("Testing FDB Pod client", func() {
|
||||
When("an UpdateEvent is handled that updates the OutdatedConfigMapAnnotation with a bad value", func() {
|
||||
BeforeEach(func() {
|
||||
pod.Annotations = map[string]string{
|
||||
OutdatedConfigMapAnnotation: "boom!",
|
||||
api.OutdatedConfigMapAnnotation: "boom!",
|
||||
}
|
||||
fakeInformer.Update(nil, pod)
|
||||
})
|
||||
@@ -282,12 +282,12 @@ var _ = Describe("Testing FDB Pod client", func() {
|
||||
})
|
||||
})
|
||||
|
||||
When("the PodClient should update the annotations", func() {
|
||||
var monitor *Monitor
|
||||
When("the kubernetesClient should update the annotations", func() {
|
||||
var mon *monitor
|
||||
|
||||
JustBeforeEach(func() {
|
||||
var err error
|
||||
podClient, err = CreatePodClient(context.Background(), GinkgoLogr, enableNodeWatcher, func(fncNamespace string, fncPodName string, fncNodeName string) (client.WithWatch, cache.Cache, error) {
|
||||
podClient, err = createPodClient(context.Background(), GinkgoLogr, enableNodeWatcher, func(fncNamespace string, fncPodName string, fncNodeName string) (client.WithWatch, cache.Cache, error) {
|
||||
Expect(fncNamespace).To(Equal(namespace))
|
||||
Expect(fncPodName).To(Equal(podName))
|
||||
Expect(fncNodeName).To(Equal(nodeName))
|
||||
@@ -314,13 +314,13 @@ var _ = Describe("Testing FDB Pod client", func() {
|
||||
})).NotTo(HaveOccurred())
|
||||
|
||||
// Execute the update annotations.
|
||||
Expect(podClient.UpdateAnnotations(monitor)).NotTo(HaveOccurred())
|
||||
Expect(podClient.updateAnnotations(mon)).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
When("no additional env variables are set", func() {
|
||||
BeforeEach(func() {
|
||||
monitor = &Monitor{
|
||||
ActiveConfiguration: &api.ProcessConfiguration{
|
||||
mon = &monitor{
|
||||
activeConfiguration: &api.ProcessConfiguration{
|
||||
BinaryPath: "/usr/bin",
|
||||
},
|
||||
}
|
||||
@@ -330,20 +330,20 @@ var _ = Describe("Testing FDB Pod client", func() {
|
||||
pod := &corev1.Pod{}
|
||||
Expect(fakeClient.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: podName}, pod)).NotTo(HaveOccurred())
|
||||
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(CurrentConfigurationAnnotation, ""))
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(EnvironmentAnnotation, "{\"BINARY_DIR\":\"/usr\"}"))
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(api.CurrentConfigurationAnnotation, ""))
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(api.EnvironmentAnnotation, "{\"BINARY_DIR\":\"/usr\"}"))
|
||||
})
|
||||
})
|
||||
|
||||
When("one flat additional env variable is set", func() {
|
||||
BeforeEach(func() {
|
||||
GinkgoT().Setenv("TEST", "test-value")
|
||||
monitor = &Monitor{
|
||||
ActiveConfiguration: &api.ProcessConfiguration{
|
||||
mon = &monitor{
|
||||
activeConfiguration: &api.ProcessConfiguration{
|
||||
BinaryPath: "/usr/bin",
|
||||
Arguments: []api.Argument{
|
||||
{
|
||||
ArgumentType: EnvironmentAnnotation,
|
||||
ArgumentType: api.EnvironmentAnnotation,
|
||||
Source: "TEST",
|
||||
},
|
||||
},
|
||||
@@ -355,16 +355,16 @@ var _ = Describe("Testing FDB Pod client", func() {
|
||||
pod := &corev1.Pod{}
|
||||
Expect(fakeClient.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: podName}, pod)).NotTo(HaveOccurred())
|
||||
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(CurrentConfigurationAnnotation, ""))
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(EnvironmentAnnotation, "{\"BINARY_DIR\":\"/usr\",\"TEST\":\"test-value\"}"))
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(api.CurrentConfigurationAnnotation, ""))
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(api.EnvironmentAnnotation, "{\"BINARY_DIR\":\"/usr\",\"TEST\":\"test-value\"}"))
|
||||
})
|
||||
})
|
||||
|
||||
When("one nested flat additional env variable is set", func() {
|
||||
BeforeEach(func() {
|
||||
GinkgoT().Setenv("TEST", "test-value")
|
||||
monitor = &Monitor{
|
||||
ActiveConfiguration: &api.ProcessConfiguration{
|
||||
mon = &monitor{
|
||||
activeConfiguration: &api.ProcessConfiguration{
|
||||
BinaryPath: "/usr/bin",
|
||||
Arguments: []api.Argument{
|
||||
{
|
||||
@@ -385,8 +385,8 @@ var _ = Describe("Testing FDB Pod client", func() {
|
||||
pod := &corev1.Pod{}
|
||||
Expect(fakeClient.Get(context.Background(), client.ObjectKey{Namespace: namespace, Name: podName}, pod)).NotTo(HaveOccurred())
|
||||
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(CurrentConfigurationAnnotation, ""))
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(EnvironmentAnnotation, "{\"BINARY_DIR\":\"/usr\",\"TEST\":\"test-value\"}"))
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(api.CurrentConfigurationAnnotation, ""))
|
||||
Expect(pod.Annotations).To(HaveKeyWithValue(api.EnvironmentAnnotation, "{\"BINARY_DIR\":\"/usr\",\"TEST\":\"test-value\"}"))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -28,6 +28,8 @@ import (
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/apple/foundationdb/fdbkubernetesmonitor/api"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
"github.com/go-logr/zapr"
|
||||
"github.com/spf13/pflag"
|
||||
@@ -109,7 +111,7 @@ func parseFlagsAndSetEnvDefaults() error {
|
||||
}
|
||||
|
||||
func main() {
|
||||
var copyFiles, copyBinaries, copyLibraries, requiredCopyFiles []string
|
||||
var filesToCopy, copyBinaries, copyLibraries, requiredCopyFiles []string
|
||||
var inputDir, copyPrimaryLibrary, binaryOutputDirectory, certPath, keyPath string
|
||||
|
||||
pflag.StringVar(&executionModeString, "mode", "launcher", "Execution mode. Valid options are launcher, sidecar, and init")
|
||||
@@ -118,7 +120,7 @@ func main() {
|
||||
pflag.StringVar(&monitorConfFile, "input-monitor-conf", "config.json", "Name of the file in the input directory that contains the monitor configuration")
|
||||
pflag.StringVar(&logPath, "log-path", "", "Name of a file to send logs to. Logs will be sent to stdout in addition the file you pass in this argument. If this is blank, logs will only by sent to stdout")
|
||||
pflag.StringVar(&outputDir, "output-dir", ".", "Directory to copy files into")
|
||||
pflag.StringArrayVar(©Files, "copy-file", nil, "A list of files to copy")
|
||||
pflag.StringArrayVar(&filesToCopy, "copy-file", nil, "A list of files to copy")
|
||||
pflag.StringArrayVar(©Binaries, "copy-binary", nil, "A list of binaries to copy from /usr/bin")
|
||||
pflag.StringVar(&versionFilePath, "version-file", "/var/fdb/version", "Path to a file containing the current FDB version")
|
||||
pflag.StringVar(&sharedBinaryDir, "shared-binary-dir", "/var/fdb/shared-binaries/bin", "A directory containing binaries that are copied from a sidecar process")
|
||||
@@ -146,7 +148,7 @@ func main() {
|
||||
}
|
||||
currentContainerVersion := strings.TrimSpace(string(versionBytes))
|
||||
mode := executionMode(executionModeString)
|
||||
copyDetails, requiredCopies, err := getCopyDetails(inputDir, copyPrimaryLibrary, binaryOutputDirectory, copyFiles, copyBinaries, copyLibraries, requiredCopyFiles, currentContainerVersion, mode)
|
||||
copyDetails, requiredCopies, err := getCopyDetails(inputDir, copyPrimaryLibrary, binaryOutputDirectory, filesToCopy, copyBinaries, copyLibraries, requiredCopyFiles, currentContainerVersion, mode)
|
||||
if err != nil {
|
||||
logger.Error(err, "Error getting list of files to copy")
|
||||
os.Exit(1)
|
||||
@@ -164,16 +166,22 @@ func main() {
|
||||
certPath: certPath,
|
||||
keyPath: keyPath,
|
||||
}
|
||||
StartMonitor(context.Background(), logger, path.Join(inputDir, monitorConfFile), customEnvironment, processCount, promConfig, enablePprof, currentContainerVersion, enableNodeWatch)
|
||||
|
||||
parsedVersion, err := api.ParseFdbVersion(currentContainerVersion)
|
||||
if err != nil {
|
||||
logger.Error(err, "Error parsing container version", "currentContainerVersion", currentContainerVersion)
|
||||
os.Exit(1)
|
||||
}
|
||||
startMonitor(context.Background(), logger, path.Join(inputDir, monitorConfFile), customEnvironment, processCount, promConfig, enablePprof, parsedVersion, enableNodeWatch)
|
||||
case executionModeInit:
|
||||
err = CopyFiles(logger, outputDir, copyDetails, requiredCopies)
|
||||
err = copyFiles(logger, outputDir, copyDetails, requiredCopies)
|
||||
if err != nil {
|
||||
logger.Error(err, "Error copying files")
|
||||
os.Exit(1)
|
||||
}
|
||||
case executionModeSidecar:
|
||||
if mainContainerVersion != currentContainerVersion {
|
||||
err = CopyFiles(logger, outputDir, copyDetails, requiredCopies)
|
||||
err = copyFiles(logger, outputDir, copyDetails, requiredCopies)
|
||||
if err != nil {
|
||||
logger.Error(err, "Error copying files")
|
||||
os.Exit(1)
|
||||
|
||||
@@ -30,7 +30,7 @@ const (
|
||||
// processLabel represents the process label for the prometheus metrics.
|
||||
processLabel = "process"
|
||||
// namespace is the prometheus namespace for the metrics
|
||||
namespace = "fdbkubernetesmonitor"
|
||||
prometheusNamespace = "fdbkubernetesmonitor"
|
||||
// restartCountMetricName represents the name of the restart metric.
|
||||
restartCountMetricName = "restart_count"
|
||||
// configurationChangeCountMetricName represents the configuration_change_count metric.
|
||||
@@ -45,7 +45,7 @@ const (
|
||||
desiredVersionMetricName = "desired_version"
|
||||
)
|
||||
|
||||
// metrics represents the custom prometheus metrics for the Monitor.
|
||||
// metrics represents the custom prometheus metrics for the monitor.
|
||||
type metrics struct {
|
||||
// restartCount represents the total number of fdbserver process restarts.
|
||||
restartCount *prometheus.CounterVec
|
||||
@@ -98,32 +98,32 @@ func registerMetrics(reg prometheus.Registerer) *metrics {
|
||||
monitorMetrics := &metrics{
|
||||
restartCount: prometheus.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Namespace: prometheusNamespace,
|
||||
Name: restartCountMetricName,
|
||||
Help: "Number of fdbserver process restarts in total.",
|
||||
}, []string{processLabel}),
|
||||
configurationChangeCount: prometheus.NewCounter(prometheus.CounterOpts{
|
||||
Namespace: namespace,
|
||||
Namespace: prometheusNamespace,
|
||||
Name: configurationChangeCountMetricName,
|
||||
Help: "Number of observed configuration changes.",
|
||||
}),
|
||||
lastAppliedConfigurationTimestamp: prometheus.NewGauge(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Namespace: prometheusNamespace,
|
||||
Name: lastAppliedConfigurationTimestampMetricName,
|
||||
Help: "Timestamp when the last time the configuration was applied.",
|
||||
}),
|
||||
startTimestamp: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Namespace: prometheusNamespace,
|
||||
Name: startTimestampMetricName,
|
||||
Help: "Timestamp when the last time the configuration was applied.",
|
||||
}, []string{processLabel}),
|
||||
runningVersion: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Namespace: prometheusNamespace,
|
||||
Name: runningVersionMetricName,
|
||||
Help: "The current running version of the fdbserver processes started by this monitor.",
|
||||
}, []string{versionLabel}),
|
||||
desiredVersion: prometheus.NewGaugeVec(prometheus.GaugeOpts{
|
||||
Namespace: namespace,
|
||||
Namespace: prometheusNamespace,
|
||||
Name: desiredVersionMetricName,
|
||||
Help: "The desired running version of the fdbserver processes started by this monitor.",
|
||||
}, []string{versionLabel}),
|
||||
|
||||
@@ -27,7 +27,7 @@ import (
|
||||
"k8s.io/utils/pointer"
|
||||
)
|
||||
|
||||
var _ = Describe("Testing Monitor metrics", func() {
|
||||
var _ = Describe("Testing monitor metrics", func() {
|
||||
When("getting the copy details", func() {
|
||||
sevenOneVersion := "7.1.57"
|
||||
sevenThreeVersion := "7.3.37"
|
||||
|
||||
@@ -33,6 +33,7 @@ import (
|
||||
"os/exec"
|
||||
"os/signal"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
@@ -60,49 +61,49 @@ const (
|
||||
fdbClusterFilePath = "/var/fdb/data/fdb.cluster"
|
||||
)
|
||||
|
||||
// Monitor provides the main monitor loop
|
||||
type Monitor struct {
|
||||
// ConfigFile defines the path to the config file to load.
|
||||
ConfigFile string
|
||||
// monitor provides the main monitor loop
|
||||
type monitor struct {
|
||||
// configFile defines the path to the config file to load.
|
||||
configFile string
|
||||
|
||||
// CurrentContainerVersion defines the version of the container. This will be the same as the fdbserver version.
|
||||
CurrentContainerVersion string
|
||||
// currentContainerVersion defines the version of the container. This will be the same as the fdbserver version.
|
||||
currentContainerVersion api.Version
|
||||
|
||||
// CustomEnvironment defines the custom environment variables to use when
|
||||
// customEnvironment defines the custom environment variables to use when
|
||||
// interpreting the monitor configuration.
|
||||
CustomEnvironment map[string]string
|
||||
customEnvironment map[string]string
|
||||
|
||||
// ActiveConfiguration defines the active process configuration.
|
||||
ActiveConfiguration *api.ProcessConfiguration
|
||||
// activeConfiguration defines the active process configuration.
|
||||
activeConfiguration *api.ProcessConfiguration
|
||||
|
||||
// ActiveConfigurationBytes defines the source data for the active process
|
||||
// activeConfigurationBytes defines the source data for the active process
|
||||
// configuration.
|
||||
ActiveConfigurationBytes []byte
|
||||
activeConfigurationBytes []byte
|
||||
|
||||
// LastConfigurationTime is the last time we successfully reloaded the
|
||||
// lastConfigurationTime is the last time we successfully reloaded the
|
||||
// configuration file.
|
||||
LastConfigurationTime time.Time
|
||||
lastConfigurationTime time.Time
|
||||
|
||||
// ProcessCount defines how many processes the
|
||||
ProcessCount int
|
||||
// processCount defines how many processes the
|
||||
processCount int
|
||||
|
||||
// ProcessIDs stores the PIDs of the processes that are running. A PID of
|
||||
// processIDs stores the PIDs of the processes that are running. A PID of
|
||||
// zero will indicate that a process does not have a run loop. A PID of -1
|
||||
// will indicate that a process has a run loop but is not currently running
|
||||
// the subprocess.
|
||||
ProcessIDs []int
|
||||
processIDs []int
|
||||
|
||||
// Mutex defines a mutex around working with configuration.
|
||||
// mutex defines a mutex around working with configuration.
|
||||
// This is used to synchronize access to local state like the active
|
||||
// configuration and the process IDs from multiple goroutines.
|
||||
Mutex sync.Mutex
|
||||
mutex sync.Mutex
|
||||
|
||||
// PodClient is a client for posting updates about this pod to
|
||||
// podClient is a client for posting updates about this pod to
|
||||
// Kubernetes.
|
||||
PodClient *PodClient
|
||||
podClient *kubernetesClient
|
||||
|
||||
// Logger is the logger instance for this monitor.
|
||||
Logger logr.Logger
|
||||
// logger is the logger instance for this monitor.
|
||||
logger logr.Logger
|
||||
|
||||
// metrics represents the prometheus monitor metrics.
|
||||
metrics *metrics
|
||||
@@ -112,25 +113,25 @@ type httpConfig struct {
|
||||
listenAddr, certPath, keyPath, rootCaPath string
|
||||
}
|
||||
|
||||
// StartMonitor starts the monitor loop.
|
||||
func StartMonitor(ctx context.Context, logger logr.Logger, configFile string, customEnvironment map[string]string, processCount int, promConfig httpConfig, enableDebug bool, currentContainerVersion string, enableNodeWatcher bool) {
|
||||
podClient, err := CreatePodClient(ctx, logger, enableNodeWatcher, setupCache)
|
||||
// startMonitor starts the monitor loop.
|
||||
func startMonitor(ctx context.Context, logger logr.Logger, configFile string, customEnvironment map[string]string, processCount int, promConfig httpConfig, enableDebug bool, currentContainerVersion api.Version, enableNodeWatcher bool) {
|
||||
client, err := createPodClient(ctx, logger, enableNodeWatcher, setupCache)
|
||||
if err != nil {
|
||||
logger.Error(err, "could not create Pod client")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
monitor := &Monitor{
|
||||
ConfigFile: configFile,
|
||||
PodClient: podClient,
|
||||
Logger: logger,
|
||||
CustomEnvironment: customEnvironment,
|
||||
ProcessCount: processCount,
|
||||
ProcessIDs: make([]int, processCount+1),
|
||||
CurrentContainerVersion: currentContainerVersion,
|
||||
mon := &monitor{
|
||||
configFile: configFile,
|
||||
podClient: client,
|
||||
logger: logger,
|
||||
customEnvironment: customEnvironment,
|
||||
processCount: processCount,
|
||||
processIDs: make([]int, processCount+1),
|
||||
currentContainerVersion: currentContainerVersion,
|
||||
}
|
||||
|
||||
go func() { monitor.WatchPodTimestamps() }()
|
||||
go func() { mon.watchPodTimestamps() }()
|
||||
|
||||
mux := http.NewServeMux()
|
||||
// Enable pprof endpoints for debugging purposes.
|
||||
@@ -152,7 +153,7 @@ func StartMonitor(ctx context.Context, logger logr.Logger, configFile string, cu
|
||||
// Enable the default go metrics.
|
||||
reg.MustRegister(collectors.NewGoCollector())
|
||||
monitorMetrics := registerMetrics(reg)
|
||||
monitor.metrics = monitorMetrics
|
||||
mon.metrics = monitorMetrics
|
||||
promHandler := promhttp.HandlerFor(reg, promhttp.HandlerOpts{})
|
||||
|
||||
// Add Prometheus support
|
||||
@@ -181,27 +182,27 @@ func StartMonitor(ctx context.Context, logger logr.Logger, configFile string, cu
|
||||
}
|
||||
}()
|
||||
|
||||
monitor.Run()
|
||||
mon.run()
|
||||
}
|
||||
|
||||
// updateCustomEnvironment will add the node labels and their values to the custom environment map. All the generated
|
||||
// environment variables will start with NODE_LABEL and "/" and "." will be replaced in the key as "_", e.g. from the
|
||||
// label "foundationdb.org/testing = awesome" the env variables NODE_LABEL_FOUNDATIONDB_ORG_TESTING = awesome" will be
|
||||
// generated.
|
||||
func (monitor *Monitor) updateCustomEnvironmentFromNodeMetadata() {
|
||||
if monitor.PodClient.nodeMetadata == nil {
|
||||
func (monitor *monitor) updateCustomEnvironmentFromNodeMetadata() {
|
||||
if monitor.podClient.nodeMetadata == nil {
|
||||
return
|
||||
}
|
||||
|
||||
nodeLabels := monitor.PodClient.nodeMetadata.Labels
|
||||
nodeLabels := monitor.podClient.nodeMetadata.Labels
|
||||
for key, value := range nodeLabels {
|
||||
sanitizedKey := strings.ReplaceAll(key, "/", "_")
|
||||
sanitizedKey = strings.ReplaceAll(sanitizedKey, ".", "_")
|
||||
envKey := "NODE_LABEL_" + strings.ToUpper(sanitizedKey)
|
||||
currentValue, ok := monitor.CustomEnvironment[envKey]
|
||||
currentValue, ok := monitor.customEnvironment[envKey]
|
||||
if !ok {
|
||||
monitor.Logger.Info("adding new custom environment variable from node labels", "key", envKey, "value", value)
|
||||
monitor.CustomEnvironment[envKey] = value
|
||||
monitor.logger.Info("adding new custom environment variable from node labels", "key", envKey, "value", value)
|
||||
monitor.customEnvironment[envKey] = value
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -209,51 +210,75 @@ func (monitor *Monitor) updateCustomEnvironmentFromNodeMetadata() {
|
||||
continue
|
||||
}
|
||||
|
||||
monitor.Logger.Info("update custom environment variable from node labels", "key", envKey, "newValue", value, "currentValue", currentValue)
|
||||
monitor.CustomEnvironment[envKey] = value
|
||||
monitor.logger.Info("update custom environment variable from node labels", "key", envKey, "newValue", value, "currentValue", currentValue)
|
||||
monitor.customEnvironment[envKey] = value
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// LoadConfiguration loads the latest configuration from the config file.
|
||||
func (monitor *Monitor) LoadConfiguration() {
|
||||
file, err := os.Open(monitor.ConfigFile)
|
||||
// readConfiguration reads the latest configuration from the monitor file.
|
||||
func (monitor *monitor) readConfiguration() (*api.ProcessConfiguration, []byte) {
|
||||
file, err := os.Open(monitor.configFile)
|
||||
if err != nil {
|
||||
monitor.Logger.Error(err, "Error reading monitor config file", "monitorConfigPath", monitor.ConfigFile)
|
||||
return
|
||||
monitor.logger.Error(err, "Error reading monitor config file", "monitorConfigPath", monitor.configFile)
|
||||
return nil, nil
|
||||
}
|
||||
defer func() {
|
||||
err := file.Close()
|
||||
monitor.Logger.Error(err, "Error could not close file", "monitorConfigPath", monitor.ConfigFile)
|
||||
monitor.logger.Error(err, "Error could not close file", "monitorConfigPath", monitor.configFile)
|
||||
}()
|
||||
configuration := &api.ProcessConfiguration{}
|
||||
configurationBytes, err := io.ReadAll(file)
|
||||
if err != nil {
|
||||
monitor.Logger.Error(err, "Error reading monitor configuration", "monitorConfigPath", monitor.ConfigFile)
|
||||
monitor.logger.Error(err, "Error reading monitor configuration", "monitorConfigPath", monitor.configFile)
|
||||
}
|
||||
err = json.Unmarshal(configurationBytes, configuration)
|
||||
if err != nil {
|
||||
monitor.Logger.Error(err, "Error parsing monitor configuration", "rawConfiguration", string(configurationBytes))
|
||||
return
|
||||
monitor.logger.Error(err, "Error parsing monitor configuration", "rawConfiguration", string(configurationBytes))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if monitor.CurrentContainerVersion == configuration.Version {
|
||||
if configuration.Version == nil {
|
||||
monitor.logger.Error(err, "Error could not parse configured version", "rawConfiguration", string(configurationBytes))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// If the versions are protocol compatible don't try to point to another binary path. Otherwise, the processes will
|
||||
// cannot restart when a process crashes during a patch upgrade.
|
||||
if monitor.currentContainerVersion.IsProtocolCompatible(*configuration.Version) {
|
||||
configuration.BinaryPath = fdbserverPath
|
||||
} else {
|
||||
configuration.BinaryPath = path.Join(sharedBinaryDir, configuration.Version, "fdbserver")
|
||||
configuration.BinaryPath = path.Join(sharedBinaryDir, configuration.Version.String(), "fdbserver")
|
||||
}
|
||||
|
||||
err = checkOwnerExecutable(configuration.BinaryPath)
|
||||
if err != nil {
|
||||
monitor.Logger.Error(err, "Error with binary path for latest configuration", "configuration", configuration, "binaryPath", configuration.BinaryPath)
|
||||
return
|
||||
monitor.logger.Error(err, "Error with binary path for latest configuration", "configuration", configuration, "binaryPath", configuration.BinaryPath)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
monitor.updateCustomEnvironmentFromNodeMetadata()
|
||||
|
||||
_, err = configuration.GenerateArguments(1, monitor.CustomEnvironment)
|
||||
_, err = configuration.GenerateArguments(1, monitor.customEnvironment)
|
||||
if err != nil {
|
||||
monitor.Logger.Error(err, "Error generating arguments for latest configuration", "configuration", configuration, "binaryPath", configuration.BinaryPath)
|
||||
monitor.logger.Error(err, "Error generating arguments for latest configuration", "configuration", configuration, "binaryPath", configuration.BinaryPath)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if configuration.ShouldRunServers() {
|
||||
// In case that the process is isolated we don't want to start the servers and we should terminate the running fdbserver
|
||||
// instances.
|
||||
if monitor.processIsIsolated() {
|
||||
configuration.RunServers = pointer.Bool(false)
|
||||
}
|
||||
}
|
||||
|
||||
return configuration, configurationBytes
|
||||
}
|
||||
|
||||
// loadConfiguration loads the latest configuration from the config file.
|
||||
func (monitor *monitor) loadConfiguration() {
|
||||
configuration, configurationBytes := monitor.readConfiguration()
|
||||
if configuration == nil || len(configurationBytes) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -275,33 +300,42 @@ func checkOwnerExecutable(path string) error {
|
||||
|
||||
// acceptConfiguration is called when the monitor process parses and accepts
|
||||
// a configuration from the local config file.
|
||||
func (monitor *Monitor) acceptConfiguration(configuration *api.ProcessConfiguration, configurationBytes []byte) {
|
||||
monitor.Mutex.Lock()
|
||||
defer monitor.Mutex.Unlock()
|
||||
func (monitor *monitor) acceptConfiguration(configuration *api.ProcessConfiguration, configurationBytes []byte) {
|
||||
monitor.mutex.Lock()
|
||||
defer monitor.mutex.Unlock()
|
||||
|
||||
// If the configuration hasn't changed ignore those events to prevent noisy logging.
|
||||
if equality.Semantic.DeepEqual(monitor.ActiveConfiguration, configuration) {
|
||||
if equality.Semantic.DeepEqual(monitor.activeConfiguration, configuration) {
|
||||
return
|
||||
}
|
||||
|
||||
monitor.Logger.Info("Received new configuration file", "configuration", configuration)
|
||||
monitor.ActiveConfiguration = configuration
|
||||
monitor.ActiveConfigurationBytes = configurationBytes
|
||||
monitor.LastConfigurationTime = time.Now()
|
||||
monitor.logger.Info("Received new configuration file", "configuration", configuration)
|
||||
monitor.activeConfiguration = configuration
|
||||
monitor.activeConfigurationBytes = configurationBytes
|
||||
monitor.lastConfigurationTime = time.Now()
|
||||
// Update the prometheus metrics.
|
||||
monitor.metrics.registerConfigurationChange(configuration.Version)
|
||||
monitor.metrics.registerConfigurationChange(configuration.Version.String())
|
||||
|
||||
for processNumber := 1; processNumber <= monitor.ProcessCount; processNumber++ {
|
||||
if monitor.ProcessIDs[processNumber] == 0 {
|
||||
monitor.ProcessIDs[processNumber] = -1
|
||||
var hasRunningProcesses bool
|
||||
for processNumber := 1; processNumber <= monitor.processCount; processNumber++ {
|
||||
if monitor.processIDs[processNumber] == 0 {
|
||||
monitor.processIDs[processNumber] = -1
|
||||
tempNumber := processNumber
|
||||
go func() { monitor.RunProcess(tempNumber) }()
|
||||
go func() { monitor.runProcess(tempNumber) }()
|
||||
continue
|
||||
}
|
||||
|
||||
hasRunningProcesses = true
|
||||
}
|
||||
|
||||
err := monitor.PodClient.UpdateAnnotations(monitor)
|
||||
// If the monitor has running processes but the processes shouldn't be running, kill them with SIGTERM.
|
||||
if hasRunningProcesses && !monitor.activeConfiguration.ShouldRunServers() {
|
||||
monitor.sendSignalToProcesses(syscall.SIGTERM)
|
||||
}
|
||||
|
||||
err := monitor.podClient.updateAnnotations(monitor)
|
||||
if err != nil {
|
||||
monitor.Logger.Error(err, "Error updating pod annotations")
|
||||
monitor.logger.Error(err, "Error updating pod annotations")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -315,10 +349,10 @@ func getBackoffDuration(errorCounter int) time.Duration {
|
||||
return timeToBackoff
|
||||
}
|
||||
|
||||
// RunProcess runs a loop to continually start and watch a process.
|
||||
func (monitor *Monitor) RunProcess(processNumber int) {
|
||||
// runProcess runs a loop to continually start and watch a process.
|
||||
func (monitor *monitor) runProcess(processNumber int) {
|
||||
pid := 0
|
||||
logger := monitor.Logger.WithValues("processNumber", processNumber, "area", "RunProcess")
|
||||
logger := monitor.logger.WithValues("processNumber", processNumber, "area", "runProcess")
|
||||
logger.Info("Starting run loop")
|
||||
startTime := time.Now()
|
||||
// Counts the successive errors that occurred during process start up. Based on the error count the backoff time
|
||||
@@ -336,10 +370,10 @@ func (monitor *Monitor) RunProcess(processNumber int) {
|
||||
errorCounter = 0
|
||||
}
|
||||
|
||||
arguments, err := monitor.ActiveConfiguration.GenerateArguments(processNumber, monitor.CustomEnvironment)
|
||||
arguments, err := monitor.activeConfiguration.GenerateArguments(processNumber, monitor.customEnvironment)
|
||||
if err != nil {
|
||||
backoffDuration := getBackoffDuration(errorCounter)
|
||||
logger.Error(err, "Error generating arguments for subprocess", "configuration", monitor.ActiveConfiguration, "errorCounter", errorCounter, "backoffDuration", backoffDuration.String())
|
||||
logger.Error(err, "Error generating arguments for subprocess", "configuration", monitor.activeConfiguration, "errorCounter", errorCounter, "backoffDuration", backoffDuration.String())
|
||||
time.Sleep(backoffDuration)
|
||||
errorCounter++
|
||||
continue
|
||||
@@ -371,7 +405,7 @@ func (monitor *Monitor) RunProcess(processNumber int) {
|
||||
}
|
||||
|
||||
// Update the prometheus metrics for the process.
|
||||
monitor.metrics.registerProcessStartup(processNumber, monitor.ActiveConfiguration.Version)
|
||||
monitor.metrics.registerProcessStartup(processNumber, monitor.activeConfiguration.Version.String())
|
||||
|
||||
if cmd.Process != nil {
|
||||
pid = cmd.Process.Pid
|
||||
@@ -430,14 +464,14 @@ func (monitor *Monitor) RunProcess(processNumber int) {
|
||||
// If the process is no longer desired, this will remove it from the process ID
|
||||
// list and return false. If the process is still desired, this will return
|
||||
// true.
|
||||
func (monitor *Monitor) processRequired(processNumber int) bool {
|
||||
monitor.Mutex.Lock()
|
||||
defer monitor.Mutex.Unlock()
|
||||
logger := monitor.Logger.WithValues("processNumber", processNumber, "area", "processRequired")
|
||||
if monitor.ProcessCount < processNumber || !pointer.BoolDeref(monitor.ActiveConfiguration.RunServers, true) {
|
||||
if monitor.ProcessIDs[processNumber] != 0 {
|
||||
func (monitor *monitor) processRequired(processNumber int) bool {
|
||||
monitor.mutex.Lock()
|
||||
defer monitor.mutex.Unlock()
|
||||
logger := monitor.logger.WithValues("processNumber", processNumber, "area", "processRequired")
|
||||
if monitor.processCount < processNumber || !monitor.activeConfiguration.ShouldRunServers() {
|
||||
if monitor.processIDs[processNumber] != 0 {
|
||||
logger.Info("Terminating run loop")
|
||||
monitor.ProcessIDs[processNumber] = 0
|
||||
monitor.processIDs[processNumber] = 0
|
||||
}
|
||||
|
||||
return false
|
||||
@@ -446,15 +480,39 @@ func (monitor *Monitor) processRequired(processNumber int) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// updateProcessID records a new Process ID from a newly launched process.
|
||||
func (monitor *Monitor) updateProcessID(processNumber int, pid int) {
|
||||
monitor.Mutex.Lock()
|
||||
defer monitor.Mutex.Unlock()
|
||||
monitor.ProcessIDs[processNumber] = pid
|
||||
// processIsIsolated returns true if the IsolateProcessGroupAnnotation is set to "true".
|
||||
func (monitor *monitor) processIsIsolated() bool {
|
||||
if monitor.podClient.podMetadata == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if monitor.podClient.podMetadata.Annotations == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
val, ok := monitor.podClient.podMetadata.Annotations[api.IsolateProcessGroupAnnotation]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
isolated, err := strconv.ParseBool(val)
|
||||
if err != nil {
|
||||
monitor.logger.Error(err, "could not parse the value of the %s annotation", api.IsolateProcessGroupAnnotation)
|
||||
return false
|
||||
}
|
||||
|
||||
return isolated
|
||||
}
|
||||
|
||||
// WatchConfiguration detects changes to the monitor configuration file.
|
||||
func (monitor *Monitor) WatchConfiguration(watcher *fsnotify.Watcher) {
|
||||
// updateProcessID records a new Process ID from a newly launched process.
|
||||
func (monitor *monitor) updateProcessID(processNumber int, pid int) {
|
||||
monitor.mutex.Lock()
|
||||
defer monitor.mutex.Unlock()
|
||||
monitor.processIDs[processNumber] = pid
|
||||
}
|
||||
|
||||
// watchConfiguration detects changes to the monitor configuration file.
|
||||
func (monitor *monitor) watchConfiguration(watcher *fsnotify.Watcher) {
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-watcher.Events:
|
||||
@@ -462,7 +520,7 @@ func (monitor *Monitor) WatchConfiguration(watcher *fsnotify.Watcher) {
|
||||
return
|
||||
}
|
||||
|
||||
monitor.Logger.Info("Detected event on monitor conf file or cluster file", "event", event)
|
||||
monitor.logger.Info("Detected event on monitor conf file or cluster file", "event", event)
|
||||
if event.Op&fsnotify.Write == fsnotify.Write || event.Op&fsnotify.Create == fsnotify.Create {
|
||||
monitor.handleFileChange(event.Name)
|
||||
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
|
||||
@@ -476,58 +534,62 @@ func (monitor *Monitor) WatchConfiguration(watcher *fsnotify.Watcher) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
monitor.Logger.Error(err, "Error watching for file system events")
|
||||
monitor.logger.Error(err, "Error watching for file system events")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleFileChange will perform the required action based on the changed/modified file.
|
||||
func (monitor *Monitor) handleFileChange(changedFile string) {
|
||||
func (monitor *monitor) handleFileChange(changedFile string) {
|
||||
if changedFile == fdbClusterFilePath {
|
||||
err := monitor.PodClient.updateFdbClusterTimestampAnnotation()
|
||||
err := monitor.podClient.updateFdbClusterTimestampAnnotation()
|
||||
if err != nil {
|
||||
monitor.Logger.Error(err, fmt.Sprintf("could not update %s annotation", ClusterFileChangeDetectedAnnotation))
|
||||
monitor.logger.Error(err, fmt.Sprintf("could not update %s annotation", api.ClusterFileChangeDetectedAnnotation))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
monitor.LoadConfiguration()
|
||||
monitor.loadConfiguration()
|
||||
}
|
||||
|
||||
// Run runs the monitor loop.
|
||||
func (monitor *Monitor) Run() {
|
||||
func (monitor *monitor) sendSignalToProcesses(signal os.Signal) {
|
||||
for processNumber, processID := range monitor.processIDs {
|
||||
if processID <= 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
subprocessLogger := monitor.logger.WithValues("processNumber", processNumber, "PID", processID)
|
||||
process, err := os.FindProcess(processID)
|
||||
if err != nil {
|
||||
subprocessLogger.Error(err, "Error finding subprocess")
|
||||
continue
|
||||
}
|
||||
subprocessLogger.Info("Sending signal to subprocess", "signal", signal)
|
||||
err = process.Signal(signal)
|
||||
if err != nil {
|
||||
subprocessLogger.Error(err, "Error signaling subprocess")
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// run runs the monitor loop.
|
||||
func (monitor *monitor) run() {
|
||||
done := make(chan bool, 1)
|
||||
signals := make(chan os.Signal, 1)
|
||||
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
|
||||
|
||||
go func() {
|
||||
latestSignal := <-signals
|
||||
monitor.Logger.Info("Received system signal", "signal", latestSignal)
|
||||
monitor.logger.Info("Received system signal", "signal", latestSignal)
|
||||
|
||||
// Reset the ProcessCount to 0 to make sure the monitor doesn't try to restart the processes.
|
||||
monitor.ProcessCount = 0
|
||||
for processNumber, processID := range monitor.ProcessIDs {
|
||||
if processID <= 0 {
|
||||
continue
|
||||
}
|
||||
// Reset the processCount to 0 to make sure the monitor doesn't try to restart the processes.
|
||||
monitor.processCount = 0
|
||||
monitor.sendSignalToProcesses(latestSignal)
|
||||
|
||||
subprocessLogger := monitor.Logger.WithValues("processNumber", processNumber, "PID", processID)
|
||||
process, err := os.FindProcess(processID)
|
||||
if err != nil {
|
||||
subprocessLogger.Error(err, "Error finding subprocess")
|
||||
continue
|
||||
}
|
||||
subprocessLogger.Info("Sending signal to subprocess", "signal", latestSignal)
|
||||
err = process.Signal(latestSignal)
|
||||
if err != nil {
|
||||
subprocessLogger.Error(err, "Error signaling subprocess")
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
annotations := monitor.PodClient.podMetadata.Annotations
|
||||
annotations := monitor.podClient.podMetadata.Annotations
|
||||
if len(annotations) > 0 {
|
||||
delayValue, ok := annotations[DelayShutdownAnnotation]
|
||||
delayValue, ok := annotations[api.DelayShutdownAnnotation]
|
||||
if ok {
|
||||
delay, err := time.ParseDuration(delayValue)
|
||||
if err == nil {
|
||||
@@ -539,13 +601,13 @@ func (monitor *Monitor) Run() {
|
||||
done <- true
|
||||
}()
|
||||
|
||||
monitor.LoadConfiguration()
|
||||
monitor.loadConfiguration()
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
monitor.Logger.Info("adding watch for file", "path", path.Base(monitor.ConfigFile))
|
||||
err = watcher.Add(monitor.ConfigFile)
|
||||
monitor.logger.Info("adding watch for file", "path", path.Base(monitor.configFile))
|
||||
err = watcher.Add(monitor.configFile)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
@@ -553,10 +615,10 @@ func (monitor *Monitor) Run() {
|
||||
defer func(watcher *fsnotify.Watcher) {
|
||||
err := watcher.Close()
|
||||
if err != nil {
|
||||
monitor.Logger.Error(err, "could not close watcher")
|
||||
monitor.logger.Error(err, "could not close watcher")
|
||||
}
|
||||
}(watcher)
|
||||
go func() { monitor.WatchConfiguration(watcher) }()
|
||||
go func() { monitor.watchConfiguration(watcher) }()
|
||||
|
||||
// The cluster file will be created and managed by the fdbserver processes, so we have to wait until the fdbserver
|
||||
// processes have been started. Except for the initial cluster creation this file should be present as soon as the
|
||||
@@ -564,12 +626,12 @@ func (monitor *Monitor) Run() {
|
||||
for {
|
||||
_, err = os.Stat(fdbClusterFilePath)
|
||||
if errors.Is(err, os.ErrNotExist) {
|
||||
monitor.Logger.Info("waiting for file to be created", "path", fdbClusterFilePath)
|
||||
monitor.logger.Info("waiting for file to be created", "path", fdbClusterFilePath)
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
monitor.Logger.Info("adding watch for file", "path", fdbClusterFilePath)
|
||||
monitor.logger.Info("adding watch for file", "path", fdbClusterFilePath)
|
||||
err = watcher.Add(fdbClusterFilePath)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
@@ -580,11 +642,11 @@ func (monitor *Monitor) Run() {
|
||||
<-done
|
||||
}
|
||||
|
||||
// WatchPodTimestamps watches the timestamp feed to reload the configuration.
|
||||
func (monitor *Monitor) WatchPodTimestamps() {
|
||||
for timestamp := range monitor.PodClient.TimestampFeed {
|
||||
if timestamp > monitor.LastConfigurationTime.Unix() {
|
||||
monitor.LoadConfiguration()
|
||||
// watchPodTimestamps watches the timestamp feed to reload the configuration.
|
||||
func (monitor *monitor) watchPodTimestamps() {
|
||||
for timestamp := range monitor.podClient.TimestampFeed {
|
||||
if timestamp > monitor.lastConfigurationTime.Unix() {
|
||||
monitor.loadConfiguration()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,41 +20,46 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/apple/foundationdb/fdbkubernetesmonitor/api"
|
||||
"k8s.io/apimachinery/pkg/util/json"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2"
|
||||
. "github.com/onsi/gomega"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
var _ = Describe("Testing FDB Kubernetes Monitor", func() {
|
||||
var _ = Describe("Testing FDB Kubernetes monitor", func() {
|
||||
When("updating the custom environment variables from the node metadata", func() {
|
||||
var monitor *Monitor
|
||||
var mon *monitor
|
||||
|
||||
BeforeEach(func() {
|
||||
monitor = &Monitor{
|
||||
Logger: GinkgoLogr,
|
||||
CustomEnvironment: map[string]string{
|
||||
mon = &monitor{
|
||||
logger: GinkgoLogr,
|
||||
customEnvironment: map[string]string{
|
||||
"testing": "testing",
|
||||
},
|
||||
PodClient: &PodClient{},
|
||||
podClient: &kubernetesClient{},
|
||||
}
|
||||
})
|
||||
|
||||
JustBeforeEach(func() {
|
||||
monitor.updateCustomEnvironmentFromNodeMetadata()
|
||||
mon.updateCustomEnvironmentFromNodeMetadata()
|
||||
})
|
||||
|
||||
When("no node metadata is present", func() {
|
||||
It("shouldn't add new entries", func() {
|
||||
Expect(monitor.CustomEnvironment).To(HaveLen(1))
|
||||
Expect(monitor.CustomEnvironment).To(HaveKeyWithValue("testing", "testing"))
|
||||
Expect(mon.customEnvironment).To(HaveLen(1))
|
||||
Expect(mon.customEnvironment).To(HaveKeyWithValue("testing", "testing"))
|
||||
})
|
||||
})
|
||||
|
||||
When("node metadata is present", func() {
|
||||
BeforeEach(func() {
|
||||
monitor.PodClient.nodeMetadata = &metav1.PartialObjectMetadata{
|
||||
mon.podClient.nodeMetadata = &metav1.PartialObjectMetadata{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Labels: map[string]string{
|
||||
// Examples are taken from: https://kubernetes.io/docs/reference/labels-annotations-taints/
|
||||
@@ -66,10 +71,10 @@ var _ = Describe("Testing FDB Kubernetes Monitor", func() {
|
||||
})
|
||||
|
||||
It("should add the new entries", func() {
|
||||
Expect(monitor.CustomEnvironment).To(HaveLen(3))
|
||||
Expect(monitor.CustomEnvironment).To(HaveKeyWithValue("testing", "testing"))
|
||||
Expect(monitor.CustomEnvironment).To(HaveKeyWithValue("NODE_LABEL_TOPOLOGY_KUBERNETES_IO_ZONE", "us-east-1c"))
|
||||
Expect(monitor.CustomEnvironment).To(HaveKeyWithValue("NODE_LABEL_KUBERNETES_IO_HOSTNAME", "ip-172-20-114-199.ec2.internal"))
|
||||
Expect(mon.customEnvironment).To(HaveLen(3))
|
||||
Expect(mon.customEnvironment).To(HaveKeyWithValue("testing", "testing"))
|
||||
Expect(mon.customEnvironment).To(HaveKeyWithValue("NODE_LABEL_TOPOLOGY_KUBERNETES_IO_ZONE", "us-east-1c"))
|
||||
Expect(mon.customEnvironment).To(HaveKeyWithValue("NODE_LABEL_KUBERNETES_IO_HOSTNAME", "ip-172-20-114-199.ec2.internal"))
|
||||
})
|
||||
})
|
||||
})
|
||||
@@ -98,4 +103,90 @@ var _ = Describe("Testing FDB Kubernetes Monitor", func() {
|
||||
60*time.Second,
|
||||
),
|
||||
)
|
||||
|
||||
When("reading the configuration file", func() {
|
||||
var mon *monitor
|
||||
var configurationFilePath string
|
||||
var configuration *api.ProcessConfiguration
|
||||
var configurationBytes []byte
|
||||
defaultVersion := api.Version{Major: 7, Minor: 1, Patch: 51}
|
||||
|
||||
BeforeEach(func() {
|
||||
tmpDir := GinkgoT().TempDir()
|
||||
// Set the fdbserverPath to make sure the executable check works.
|
||||
fdbserverPath = path.Join(tmpDir, "fdberver")
|
||||
Expect(os.WriteFile(fdbserverPath, []byte(""), 0700)).NotTo(HaveOccurred())
|
||||
configurationFilePath = path.Join(tmpDir, "config.json")
|
||||
mon = &monitor{
|
||||
logger: GinkgoLogr,
|
||||
configFile: configurationFilePath,
|
||||
customEnvironment: map[string]string{},
|
||||
podClient: &kubernetesClient{},
|
||||
currentContainerVersion: defaultVersion,
|
||||
}
|
||||
})
|
||||
|
||||
JustBeforeEach(func() {
|
||||
configuration, configurationBytes = mon.readConfiguration()
|
||||
})
|
||||
|
||||
When("the configuration file is empty", func() {
|
||||
BeforeEach(func() {
|
||||
Expect(os.WriteFile(configurationFilePath, []byte("{}"), 0600)).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should return an empty configuration", func() {
|
||||
Expect(configuration).To(BeNil())
|
||||
Expect(configurationBytes).To(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
When("the configuration is valid", func() {
|
||||
var out []byte
|
||||
|
||||
BeforeEach(func() {
|
||||
config := &api.ProcessConfiguration{
|
||||
Version: &defaultVersion,
|
||||
}
|
||||
|
||||
var err error
|
||||
out, err = json.Marshal(config)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(os.WriteFile(configurationFilePath, out, 0600)).NotTo(HaveOccurred())
|
||||
})
|
||||
|
||||
It("should return the configuration", func() {
|
||||
Expect(configuration).NotTo(BeNil())
|
||||
Expect(configuration.Version).To(Equal(&defaultVersion))
|
||||
Expect(configuration.BinaryPath).To(Equal(fdbserverPath))
|
||||
Expect(configuration.RunServers).To(BeNil())
|
||||
Expect(configuration.Arguments).To(BeEmpty())
|
||||
Expect(configurationBytes).To(Equal(out))
|
||||
})
|
||||
|
||||
When("the pod has the isolate annotation set to true", func() {
|
||||
BeforeEach(func() {
|
||||
mon.podClient = &kubernetesClient{
|
||||
podMetadata: &metav1.PartialObjectMetadata{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
api.IsolateProcessGroupAnnotation: "true",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
})
|
||||
|
||||
It("should return the configuration with runServers set to false", func() {
|
||||
Expect(configuration).NotTo(BeNil())
|
||||
Expect(configuration.Version).To(Equal(&defaultVersion))
|
||||
Expect(configuration.BinaryPath).To(Equal(fdbserverPath))
|
||||
Expect(configuration.RunServers).NotTo(BeNil())
|
||||
Expect(*configuration.RunServers).To(BeFalse())
|
||||
Expect(configuration.Arguments).To(BeEmpty())
|
||||
Expect(configurationBytes).To(Equal(out))
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -33,5 +33,5 @@ import (
|
||||
func TestAPIs(t *testing.T) {
|
||||
RegisterFailHandler(Fail)
|
||||
SetDefaultEventuallyTimeout(10 * time.Second)
|
||||
RunSpecs(t, "FDB Kubernetes Monitor")
|
||||
RunSpecs(t, "FDB Kubernetes monitor")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user