mirror of
https://github.com/rqlite/rqlite.git
synced 2026-01-25 04:16:26 +00:00
Simple write-to-disk sink
This commit is contained in:
@@ -1,19 +1,56 @@
|
||||
package snapshot2
|
||||
|
||||
import "github.com/hashicorp/raft"
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/hashicorp/raft"
|
||||
)
|
||||
|
||||
// Sink is a sink for writing snapshot data to a Snapshot store.
|
||||
type Sink struct {
|
||||
dir string
|
||||
meta *raft.SnapshotMeta
|
||||
|
||||
snapDirPath string
|
||||
snapTmpDirPath string
|
||||
dataFD *os.File
|
||||
opened bool
|
||||
}
|
||||
|
||||
// NewSink creates a new Sink object.
|
||||
func NewSink(meta *raft.SnapshotMeta) *Sink {
|
||||
func NewSink(dir string, meta *raft.SnapshotMeta) *Sink {
|
||||
return &Sink{
|
||||
dir: dir,
|
||||
meta: meta,
|
||||
}
|
||||
}
|
||||
|
||||
// Open opens the sink for writing.
|
||||
func (s *Sink) Open() error {
|
||||
if s.opened {
|
||||
return nil
|
||||
}
|
||||
s.opened = true
|
||||
|
||||
// Make temp snapshot directory
|
||||
s.snapDirPath = filepath.Join(s.dir, s.meta.ID)
|
||||
s.snapTmpDirPath = tmpName(s.snapDirPath)
|
||||
if err := os.MkdirAll(s.snapTmpDirPath, 0755); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dataPath := filepath.Join(s.snapTmpDirPath, "data")
|
||||
dataFD, err := os.Create(dataPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.dataFD = dataFD
|
||||
return nil
|
||||
}
|
||||
|
||||
// ID returns the ID of the snapshot.
|
||||
func (s *Sink) ID() string {
|
||||
return s.meta.ID
|
||||
@@ -21,15 +58,59 @@ func (s *Sink) ID() string {
|
||||
|
||||
// Write writes snapshot data to the sink.
|
||||
func (s *Sink) Write(p []byte) (n int, err error) {
|
||||
return 0, nil
|
||||
return s.dataFD.Write(p)
|
||||
}
|
||||
|
||||
// Close closes the sink.
|
||||
func (s *Sink) Close() error {
|
||||
return nil
|
||||
if !s.opened {
|
||||
return nil
|
||||
}
|
||||
s.opened = false
|
||||
|
||||
if err := s.dataFD.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := writeMeta(s.snapTmpDirPath, s.meta); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.Rename(s.snapTmpDirPath, s.snapDirPath); err != nil {
|
||||
return err
|
||||
}
|
||||
return syncDirMaybe(s.snapDirPath)
|
||||
}
|
||||
|
||||
// Cancel cancels the sink.
|
||||
func (s *Sink) Cancel() error {
|
||||
return nil
|
||||
if !s.opened {
|
||||
return nil
|
||||
}
|
||||
s.opened = false
|
||||
if err := s.dataFD.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
s.dataFD = nil
|
||||
return os.RemoveAll(s.snapTmpDirPath)
|
||||
}
|
||||
|
||||
// writeMeta is used to write the meta data in a given snapshot directory.
|
||||
func writeMeta(dir string, meta *raft.SnapshotMeta) error {
|
||||
fh, err := os.Create(metaPath(dir))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating meta file: %v", err)
|
||||
}
|
||||
defer fh.Close()
|
||||
|
||||
// Write out as JSON
|
||||
enc := json.NewEncoder(fh)
|
||||
if err = enc.Encode(meta); err != nil {
|
||||
return fmt.Errorf("failed to encode meta: %v", err)
|
||||
}
|
||||
|
||||
if err := fh.Sync(); err != nil {
|
||||
return err
|
||||
}
|
||||
return fh.Close()
|
||||
}
|
||||
|
||||
89
snapshot2/utils.go
Normal file
89
snapshot2/utils.go
Normal file
@@ -0,0 +1,89 @@
|
||||
package snapshot2
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func parentDir(dir string) string {
|
||||
return filepath.Dir(dir)
|
||||
}
|
||||
|
||||
func tmpName(path string) string {
|
||||
return path + tmpSuffix
|
||||
}
|
||||
|
||||
func nonTmpName(path string) string {
|
||||
return strings.TrimSuffix(path, tmpSuffix)
|
||||
}
|
||||
|
||||
func isTmpName(name string) bool {
|
||||
return filepath.Ext(name) == tmpSuffix
|
||||
}
|
||||
|
||||
func fileExists(path string) bool {
|
||||
_, err := os.Stat(path)
|
||||
return !os.IsNotExist(err)
|
||||
}
|
||||
|
||||
func dirExists(path string) bool {
|
||||
stat, err := os.Stat(path)
|
||||
return err == nil && stat.IsDir()
|
||||
}
|
||||
|
||||
func dirIsEmpty(dir string) (bool, error) {
|
||||
files, err := os.ReadDir(dir)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return len(files) == 0, nil
|
||||
}
|
||||
|
||||
func syncDir(dir string) error {
|
||||
fh, err := os.Open(dir)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer fh.Close()
|
||||
return fh.Sync()
|
||||
}
|
||||
|
||||
func removeDirSync(dir string) error {
|
||||
if err := os.RemoveAll(dir); err != nil {
|
||||
return err
|
||||
}
|
||||
return syncDirParentMaybe(dir)
|
||||
}
|
||||
|
||||
// syncDirParentMaybe syncs the parent directory of the given
|
||||
// directory, but only on non-Windows platforms.
|
||||
func syncDirParentMaybe(dir string) error {
|
||||
if runtime.GOOS == "windows" {
|
||||
return nil
|
||||
}
|
||||
return syncDir(parentDir(dir))
|
||||
}
|
||||
|
||||
// syncDirMaybe syncs the given directory, but only on non-Windows platforms.
|
||||
func syncDirMaybe(dir string) error {
|
||||
if runtime.GOOS == "windows" {
|
||||
return nil
|
||||
}
|
||||
return syncDir(dir)
|
||||
}
|
||||
|
||||
// removeAllPrefix removes all files in the given directory that have the given prefix.
|
||||
func removeAllPrefix(path, prefix string) error {
|
||||
files, err := filepath.Glob(filepath.Join(path, prefix) + "*")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, f := range files {
|
||||
if err := os.RemoveAll(f); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user