diff --git a/cmd/f3/observer.go b/cmd/f3/observer.go index c2eca8f6..948622c5 100644 --- a/cmd/f3/observer.go +++ b/cmd/f3/observer.go @@ -66,6 +66,11 @@ var observerCmd = cli.Command{ Usage: "The maximum length of time to keep the rotated files.", Value: 2 * 7 * 24 * time.Hour, }, + &cli.Uint64Flag{ + Name: "retentionSize", + Usage: "The maximum size of the rotated files in megabytes. If not set, no limit is applied.", + Value: 0, + }, &cli.StringFlag{ Name: "dataSourceName", Usage: "The observer database DSN", @@ -144,7 +149,9 @@ var observerCmd = cli.Command{ observer.WithMaxBatchSize(cctx.Int("maxBatchSize")), observer.WithMaxBatchDelay(cctx.Duration("maxBatchDelay")), observer.WithChainExchangeMaxMessageAge(cctx.Duration("chainExchangeMaxMessageAge")), + observer.WithMaxRetentionSize(cctx.Uint64("retentionSize") * 1024 * 1024), } + var identity crypto.PrivKey if cctx.IsSet("identity") { marshaledKey, err := os.ReadFile(cctx.String("identity")) diff --git a/observer/observer.go b/observer/observer.go index c10df2ab..2dcbeff2 100644 --- a/observer/observer.go +++ b/observer/observer.go @@ -8,10 +8,12 @@ import ( "encoding/json" "errors" "fmt" + "io/fs" "net/http" "os" "path" "path/filepath" + "sort" "sync" "time" @@ -527,7 +529,8 @@ func (o *Observer) rotateMessages(ctx context.Context) error { if err != nil { return err } - var foundAtLeastOneParquet bool + retainedSize := int64(0) + var retained []fs.FileInfo for _, entry := range dir { if !entry.IsDir() && filepath.Ext(entry.Name()) == ".parquet" { info, err := entry.Info() @@ -538,15 +541,42 @@ func (o *Observer) rotateMessages(ctx context.Context) error { if err := os.Remove(filepath.Join(o.rotatePath, entry.Name())); err != nil { logger.Errorw("Failed to remove retention policy for file", "file", entry.Name(), "err", err) } else { - logger.Infow("Removed old file", "olderThan", o.retention, "file", entry.Name()) + logger.Infow("Removed file due to time retention policy", "olderThan", o.retention, "file", entry.Name()) } } else { - foundAtLeastOneParquet = true + retainedSize += info.Size() + retained = append(retained, info) } } } - return o.createOrReplaceMessagesView(ctx, foundAtLeastOneParquet) + logger.Infow("Retention size", "retainedSize", retainedSize, "maxRetentionSize", o.maxRetentionSize) + + if o.maxRetentionSize > 0 && retainedSize > o.maxRetentionSize { + logger.Infow("Retention size exceeded, deleting oldest files", "retainedSize", retainedSize, "maxRetentionSize", o.maxRetentionSize) + // sort retained by modification time, oldest last + sort.Slice(retained, func(i, j int) bool { + return retained[i].ModTime().After(retained[j].ModTime()) + }) + // iterate in reverse order to delete oldest first + for i := len(retained) - 1; i >= 0; i-- { + fi := retained[i] + if retainedSize < o.maxRetentionSize { + break + } + if err := os.Remove(filepath.Join(o.rotatePath, fi.Name())); err != nil { + logger.Errorw("Failed to remove retention policy for file", + "file", fi.Name(), "err", err) + } else { + logger.Infow("Removed file due to size retention policy", + "size", fi.Size(), "file", fi.Name()) + retainedSize -= fi.Size() + retained = retained[:i] + } + } + } + + return o.createOrReplaceMessagesView(ctx, len(retained) > 0) } func (o *Observer) listenAndServeQueries() error { diff --git a/observer/options.go b/observer/options.go index 9302d3f9..680d24c3 100644 --- a/observer/options.go +++ b/observer/options.go @@ -3,6 +3,7 @@ package observer import ( "context" "fmt" + "math" "time" "github.com/filecoin-project/go-f3/blssig" @@ -13,7 +14,7 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" - "github.com/multiformats/go-multiaddr-dns" + madns "github.com/multiformats/go-multiaddr-dns" ) type Option func(*options) error @@ -35,9 +36,10 @@ type options struct { queryServerListenAddress string queryServerReadTimeout time.Duration - rotatePath string - rotateInterval time.Duration - retention time.Duration + rotatePath string + rotateInterval time.Duration + retention time.Duration + maxRetentionSize int64 pubSub *pubsub.PubSub pubSubValidatorDisabled bool @@ -81,6 +83,7 @@ func newOptions(opts ...Option) (*options, error) { finalityCertsMaxPollInterval: 2 * time.Minute, chainExchangeBufferSize: 1000, chainExchangeMaxMessageAge: 3 * time.Minute, + maxRetentionSize: 0, } for _, apply := range opts { if err := apply(&opt); err != nil { @@ -270,6 +273,20 @@ func WithRetention(retention time.Duration) Option { } } +// WithMaxRetentionSize sets the maximum size of the retention directory. +// This is weakly enforced, and the directory may grow larger than this +// size. If the directory grows larger than this size, the oldest files +// will be deleted until the directory size is below this size. +func WithMaxRetentionSize(size uint64) Option { + return func(o *options) error { + if size > math.MaxInt64 { + return fmt.Errorf("max retention size must be less than or equal to %d", math.MaxInt64) + } + o.maxRetentionSize = int64(size) + return nil + } +} + func WithDataSourceName(dataSourceName string) Option { return func(o *options) error { o.dataSourceName = dataSourceName