Skip to content

Commit ffe7a1a

Browse files
committed
Import Marcin changes
1 parent 50dd3ba commit ffe7a1a

File tree

5 files changed

+95
-31
lines changed

5 files changed

+95
-31
lines changed

protocol/communities/manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,8 @@ type ArchiveService interface {
214214
SetCodexConfig(*params.CodexConfig)
215215
StartTorrentClient() error
216216
StartCodexClient() error
217+
SetCodexClient(client *CodexClient)
218+
GetCodexClient() *CodexClient
217219
Stop() error
218220
IsReady() bool
219221
GetCommunityChatsFilters(communityID types.HexBytes) (messagingtypes.ChatFilters, error)

protocol/communities/manager_archive.go

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@ type ArchiveManager struct {
6363
codexConfig *params.CodexConfig
6464
isCodexClientStarted bool
6565
torrentTasks map[string]metainfo.Hash
66-
indexCidTasks map[string]string
6766
historyArchiveDownloadTasks map[string]*HistoryArchiveDownloadTask
6867
historyArchiveTasksWaitGroup sync.WaitGroup
6968
historyArchiveTasks sync.Map // stores `chan struct{}`
@@ -86,7 +85,6 @@ func NewArchiveManager(amc *ArchiveManagerConfig) *ArchiveManager {
8685
torrentConfig: amc.TorrentConfig,
8786
codexConfig: amc.CodexConfig,
8887
torrentTasks: make(map[string]metainfo.Hash),
89-
indexCidTasks: make(map[string]string),
9088
historyArchiveDownloadTasks: make(map[string]*HistoryArchiveDownloadTask),
9189

9290
logger: amc.Logger,
@@ -279,6 +277,16 @@ func (m *ArchiveManager) Stop() error {
279277
return nil
280278
}
281279

280+
func (m *ArchiveManager) GetCodexClient() *CodexClient {
281+
return m.codexClient
282+
}
283+
284+
func (m *ArchiveManager) SetCodexClient(client *CodexClient) {
285+
m.codexClient = client
286+
m.ArchiveFileManager.codexClient = client
287+
m.isCodexClientStarted = true
288+
}
289+
282290
func (m *ArchiveManager) torrentClientStarted() bool {
283291
return m.torrentClient != nil
284292
}
@@ -471,6 +479,7 @@ func (m *ArchiveManager) StartHistoryArchiveTasksInterval(community *Community,
471479
}
472480
case <-cancel:
473481
m.UnseedHistoryArchiveTorrent(community.ID())
482+
m.UnseedHistoryArchiveIndexCid(community.ID())
474483
m.historyArchiveTasks.Delete(id)
475484
m.historyArchiveTasksWaitGroup.Done()
476485
return
@@ -551,17 +560,25 @@ func (m *ArchiveManager) UnseedHistoryArchiveTorrent(communityID types.HexBytes)
551560
}
552561

553562
func (m *ArchiveManager) UnseedHistoryArchiveIndexCid(communityID types.HexBytes) {
554-
id := communityID.String()
563+
// Remove local index file
564+
err := m.removeCodexIndexFile(communityID)
565+
if err != nil {
566+
m.logger.Error("failed to remove local index file", zap.Error(err))
567+
}
555568

556-
if cid, exists := m.indexCidTasks[id]; exists {
557-
m.logger.Debug("Unseeding index CID for community", zap.String("id", id), zap.String("cid", cid))
558-
// ToDo: consider "unpinning" the index Cid, so that it is no longer advertised on DHT
559-
// For now, we remove it from tracking and could delete the local index file
560-
delete(m.indexCidTasks, id)
569+
// get currently advertised index Cid
570+
cid, err := m.GetHistoryArchiveIndexCid(communityID)
561571

562-
// Optional: Remove local index file if we want to clean up storage
563-
// indexFilePath := m.ArchiveFileManager.codexArchiveIndexFile(id)
564-
// os.Remove(indexFilePath)
572+
if err != nil {
573+
m.logger.Debug("failed to get history archive index CID", zap.Error(err))
574+
return
575+
}
576+
577+
m.logger.Debug("Unseeding index CID for community", zap.String("id", communityID.String()), zap.String("cid", cid))
578+
579+
err = m.codexClient.RemoveCid(cid)
580+
if err != nil {
581+
m.logger.Error("failed to remove CID from Codex", zap.Error(err))
565582
}
566583
}
567584

@@ -763,14 +780,13 @@ func (m *ArchiveManager) DownloadHistoryArchivesByIndexCid(communityID types.Hex
763780
Cancelled: false,
764781
}
765782

766-
m.indexCidTasks[id] = indexCid
767783
timeout := time.After(20 * time.Second)
768784

769785
// Create separate cancel channel for the index downloader to avoid channel competition
770786
indexDownloaderCancel := make(chan struct{})
771787

772788
// Create index downloader with path to index file using helper function
773-
indexFilePath := m.ArchiveFileManager.codexArchiveIndexFile(id)
789+
indexFilePath := m.codexArchiveIndexFilePath(communityID)
774790
indexDownloader := NewCodexIndexDownloader(m.codexClient, indexCid, indexFilePath, indexDownloaderCancel, m.logger)
775791

776792
m.logger.Debug("fetching history index from Codex", zap.String("indexCid", indexCid))
@@ -817,8 +833,13 @@ func (m *ArchiveManager) DownloadHistoryArchivesByIndexCid(communityID types.Hex
817833
}
818834

819835
if indexDownloader.IsDownloadComplete() {
836+
err := m.writeCodexIndexCidToFile(communityID, indexCid)
837+
if err != nil {
838+
m.logger.Error("failed to write Codex index CID to file", zap.Error(err))
839+
return nil, err
840+
}
820841

821-
index, err := m.ArchiveFileManager.CodexLoadHistoryArchiveIndexFromFile(m.identity, communityID)
842+
index, err := m.CodexLoadHistoryArchiveIndexFromFile(m.identity, communityID)
822843
if err != nil {
823844
return nil, err
824845
}

protocol/communities/manager_archive_file.go

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -347,9 +347,8 @@ func (m *ArchiveFileManager) createHistoryArchiveCodex(communityID types.HexByte
347347
return nil, errors.New("codex config is not initialized")
348348
}
349349

350-
codexArchiveDir := m.codexConfig.DataDir + "/codex/" + communityID.String()
351-
codexIndexPath := codexArchiveDir + "/index"
352-
codexIndexCidPath := codexArchiveDir + "/index-cid"
350+
codexArchiveDir := m.codexArchiveDirPath(communityID)
351+
codexIndexPath := m.codexArchiveIndexFilePath(communityID)
353352
// codexDataPath := codexArchiveDir + "/data"
354353

355354
m.logger.Debug("codexArchiveDir", zap.String("codexArchiveDir", codexArchiveDir))
@@ -514,12 +513,12 @@ func (m *ArchiveFileManager) createHistoryArchiveCodex(communityID types.HexByte
514513
return codexArchiveIDs, err
515514
}
516515

517-
err = os.WriteFile(codexIndexPath, codexIndexBytes, 0644) // nolint: gosec
516+
err = m.writeCodexIndexToFile(communityID, codexIndexBytes)
518517
if err != nil {
519518
return codexArchiveIDs, err
520519
}
521520

522-
err = os.WriteFile(codexIndexCidPath, []byte(cid), 0644) // nolint: gosec
521+
err = m.writeCodexIndexCidToFile(communityID, cid)
523522
if err != nil {
524523
return codexArchiveIDs, err
525524
}
@@ -568,6 +567,43 @@ func (m *ArchiveFileManager) codexArchiveIndexFile(communityID string) string {
568567
return path.Join(m.codexConfig.DataDir, "codex", communityID, "index")
569568
}
570569

570+
func (m *ArchiveFileManager) codexArchiveDirPath(communityID types.HexBytes) string {
571+
return path.Join(m.codexConfig.DataDir, communityID.String())
572+
}
573+
574+
func (m *ArchiveFileManager) codexArchiveIndexFilePath(communityID types.HexBytes) string {
575+
return path.Join(m.codexConfig.DataDir, communityID.String(), "index")
576+
}
577+
578+
func (m *ArchiveFileManager) codexArchiveIndexCidFilePath(communityID types.HexBytes) string {
579+
return path.Join(m.codexConfig.DataDir, communityID.String(), "index-cid")
580+
}
581+
582+
func (m *ArchiveFileManager) writeCodexIndexToFile(communityID types.HexBytes, bytes []byte) error {
583+
indexFilePath := m.codexArchiveIndexFilePath(communityID)
584+
return os.WriteFile(indexFilePath, bytes, 0644) // nolint: gosec
585+
}
586+
587+
func (m *ArchiveFileManager) readCodexIndexFromFile(communityID types.HexBytes) ([]byte, error) {
588+
indexFilePath := m.codexArchiveIndexFilePath(communityID)
589+
return os.ReadFile(indexFilePath)
590+
}
591+
592+
func (m *ArchiveFileManager) removeCodexIndexFile(communityID types.HexBytes) error {
593+
indexFilePath := m.codexArchiveIndexFilePath(communityID)
594+
return os.Remove(indexFilePath)
595+
}
596+
597+
func (m *ArchiveFileManager) writeCodexIndexCidToFile(communityID types.HexBytes, cid string) error {
598+
cidFilePath := m.codexArchiveIndexCidFilePath(communityID)
599+
return os.WriteFile(cidFilePath, []byte(cid), 0644) // nolint: gosec
600+
}
601+
602+
func (m *ArchiveFileManager) readCodexIndexCidFromFile(communityID types.HexBytes) ([]byte, error) {
603+
cidFilePath := m.codexArchiveIndexCidFilePath(communityID)
604+
return os.ReadFile(cidFilePath)
605+
}
606+
571607
func (m *ArchiveFileManager) createWakuMessageArchive(from time.Time, to time.Time, messages []messagingtypes.ReceivedMessage, topics [][]byte) *protobuf.WakuMessageArchive {
572608
var wakuMessages []*protobuf.WakuMessage
573609

@@ -643,8 +679,7 @@ func (m *ArchiveFileManager) GetHistoryArchiveMagnetlink(communityID types.HexBy
643679
}
644680

645681
func (m *ArchiveFileManager) GetHistoryArchiveIndexCid(communityID types.HexBytes) (string, error) {
646-
codexArchiveDir := m.torrentConfig.DataDir + "/codex/" + communityID.String()
647-
codexIndexCidPath := codexArchiveDir + "/index-cid"
682+
codexIndexCidPath := m.codexArchiveIndexCidFilePath(communityID)
648683

649684
cidData, err := os.ReadFile(codexIndexCidPath)
650685
if err != nil {
@@ -803,8 +838,7 @@ func (m *ArchiveFileManager) LoadHistoryArchiveIndexFromFile(myKey *ecdsa.Privat
803838
func (m *ArchiveFileManager) CodexLoadHistoryArchiveIndexFromFile(myKey *ecdsa.PrivateKey, communityID types.HexBytes) (*protobuf.CodexWakuMessageArchiveIndex, error) {
804839
codexWakuMessageArchiveIndexProto := &protobuf.CodexWakuMessageArchiveIndex{}
805840

806-
indexPath := m.codexArchiveIndexFile(communityID.String())
807-
indexData, err := os.ReadFile(indexPath)
841+
indexData, err := m.readCodexIndexFromFile(communityID)
808842
if err != nil {
809843
return nil, err
810844
}

protocol/communities/manager_archive_nop.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ func (tmm *ArchiveManagerNop) Stop() error {
4343
return nil
4444
}
4545

46+
func (tmm *ArchiveManagerNop) GetCodexClient() *CodexClient {
47+
return nil
48+
}
49+
50+
func (tmm *ArchiveManagerNop) SetCodexClient(client *CodexClient) {}
51+
4652
func (tmm *ArchiveManagerNop) IsReady() bool {
4753
return false
4854
}

protocol/communities/persistence.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2230,14 +2230,15 @@ func (p *Persistence) UpdateAndPruneEncryptionKeyRequests(communityID types.HexB
22302230
}
22312231

22322232
func (p *Persistence) GetArchiveDistributionPreference(communityID types.HexBytes) (string, error) {
2233-
var preference string
2234-
err := p.db.QueryRow(`SELECT preferred_distribution_method FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&preference)
2235-
if err == sql.ErrNoRows {
2236-
return "auto", nil // Default preference
2237-
} else if err != nil {
2238-
return "", err
2239-
}
2240-
return preference, nil
2233+
return "codex", nil
2234+
// var preference string
2235+
// err := p.db.QueryRow(`SELECT preferred_distribution_method FROM communities_archive_info WHERE community_id = ?`, communityID.String()).Scan(&preference)
2236+
// if err == sql.ErrNoRows {
2237+
// return "auto", nil // Default preference
2238+
// } else if err != nil {
2239+
// return "", err
2240+
// }
2241+
// return preference, nil
22412242
}
22422243

22432244
func (p *Persistence) SetArchiveDistributionPreference(communityID types.HexBytes, preference string) error {

0 commit comments

Comments
 (0)