Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions cmd/f3/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ var observerCmd = cli.Command{
Usage: "The maximum length of time to keep the rotated files.",
Value: 2 * 7 * 24 * time.Hour,
},
&cli.Uint64Flag{
Name: "retentionSize",
Usage: "The maximum size of the rotated files in megabytes. If not set, no limit is applied.",
Value: 0,
},
&cli.StringFlag{
Name: "dataSourceName",
Usage: "The observer database DSN",
Expand Down Expand Up @@ -144,7 +149,9 @@ var observerCmd = cli.Command{
observer.WithMaxBatchSize(cctx.Int("maxBatchSize")),
observer.WithMaxBatchDelay(cctx.Duration("maxBatchDelay")),
observer.WithChainExchangeMaxMessageAge(cctx.Duration("chainExchangeMaxMessageAge")),
observer.WithMaxRetentionSize(cctx.Uint64("retentionSize") * 1024 * 1024),
}

var identity crypto.PrivKey
if cctx.IsSet("identity") {
marshaledKey, err := os.ReadFile(cctx.String("identity"))
Expand Down
38 changes: 34 additions & 4 deletions observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (
"encoding/json"
"errors"
"fmt"
"io/fs"
"net/http"
"os"
"path"
"path/filepath"
"sort"
"sync"
"time"

Expand Down Expand Up @@ -527,7 +529,8 @@ func (o *Observer) rotateMessages(ctx context.Context) error {
if err != nil {
return err
}
var foundAtLeastOneParquet bool
retainedSize := int64(0)
var retained []fs.FileInfo
for _, entry := range dir {
if !entry.IsDir() && filepath.Ext(entry.Name()) == ".parquet" {
info, err := entry.Info()
Expand All @@ -538,15 +541,42 @@ func (o *Observer) rotateMessages(ctx context.Context) error {
if err := os.Remove(filepath.Join(o.rotatePath, entry.Name())); err != nil {
logger.Errorw("Failed to remove retention policy for file", "file", entry.Name(), "err", err)
} else {
logger.Infow("Removed old file", "olderThan", o.retention, "file", entry.Name())
logger.Infow("Removed file due to time retention policy", "olderThan", o.retention, "file", entry.Name())
}
} else {
foundAtLeastOneParquet = true
retainedSize += info.Size()
retained = append(retained, info)
}
}
}

return o.createOrReplaceMessagesView(ctx, foundAtLeastOneParquet)
logger.Infow("Retention size", "retainedSize", retainedSize, "maxRetentionSize", o.maxRetentionSize)

if o.maxRetentionSize > 0 && retainedSize > o.maxRetentionSize {
logger.Infow("Retention size exceeded, deleting oldest files", "retainedSize", retainedSize, "maxRetentionSize", o.maxRetentionSize)
// sort retained by modification time, oldest last
sort.Slice(retained, func(i, j int) bool {
return retained[i].ModTime().After(retained[j].ModTime())
})
// iterate in reverse order to delete oldest first
for i := len(retained) - 1; i >= 0; i-- {
fi := retained[i]
if retainedSize < o.maxRetentionSize {
break
}
if err := os.Remove(filepath.Join(o.rotatePath, fi.Name())); err != nil {
logger.Errorw("Failed to remove retention policy for file",
"file", fi.Name(), "err", err)
} else {
logger.Infow("Removed file due to size retention policy",
"size", fi.Size(), "file", fi.Name())
retainedSize -= fi.Size()
retained = retained[:i]
}
}
}

return o.createOrReplaceMessagesView(ctx, len(retained) > 0)
}

func (o *Observer) listenAndServeQueries() error {
Expand Down
25 changes: 21 additions & 4 deletions observer/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package observer
import (
"context"
"fmt"
"math"
"time"

"github.com/filecoin-project/go-f3/blssig"
Expand All @@ -13,7 +14,7 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/multiformats/go-multiaddr-dns"
madns "github.com/multiformats/go-multiaddr-dns"
)

type Option func(*options) error
Expand All @@ -35,9 +36,10 @@ type options struct {
queryServerListenAddress string
queryServerReadTimeout time.Duration

rotatePath string
rotateInterval time.Duration
retention time.Duration
rotatePath string
rotateInterval time.Duration
retention time.Duration
maxRetentionSize int64

pubSub *pubsub.PubSub
pubSubValidatorDisabled bool
Expand Down Expand Up @@ -81,6 +83,7 @@ func newOptions(opts ...Option) (*options, error) {
finalityCertsMaxPollInterval: 2 * time.Minute,
chainExchangeBufferSize: 1000,
chainExchangeMaxMessageAge: 3 * time.Minute,
maxRetentionSize: 0,
}
for _, apply := range opts {
if err := apply(&opt); err != nil {
Expand Down Expand Up @@ -270,6 +273,20 @@ func WithRetention(retention time.Duration) Option {
}
}

// WithMaxRetentionSize sets the maximum size of the retention directory.
// This is weakly enforced, and the directory may grow larger than this
// size. If the directory grows larger than this size, the oldest files
// will be deleted until the directory size is below this size.
func WithMaxRetentionSize(size uint64) Option {
return func(o *options) error {
if size > math.MaxInt64 {
return fmt.Errorf("max retention size must be less than or equal to %d", math.MaxInt64)
}
o.maxRetentionSize = int64(size)
return nil
}
}

func WithDataSourceName(dataSourceName string) Option {
return func(o *options) error {
o.dataSourceName = dataSourceName
Expand Down
Loading