diff --git a/.changeset/vault-improve-logging.md b/.changeset/vault-improve-logging.md new file mode 100644 index 00000000000..0783ad1fea1 --- /dev/null +++ b/.changeset/vault-improve-logging.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +Add structured vault request ID logging across capability, OCR plugin, and workflow secrets paths. #internal diff --git a/core/capabilities/vault/capability.go b/core/capabilities/vault/capability.go index c8d42037637..48b28e7fbae 100644 --- a/core/capabilities/vault/capability.go +++ b/core/capabilities/vault/capability.go @@ -121,20 +121,9 @@ func (s *Capability) Execute(ctx context.Context, request capabilities.Capabilit } } - // We need to generate sufficiently unique IDs accounting for two cases: - // 1. called during the subscription phase, in which case the executionID will be blank - // 2. called during execution, in which case it'll be present. - // The reference ID is unique per phase, so we need to differentiate when generating - // an ID. md := request.Metadata - phaseOrExecution := md.WorkflowExecutionID - if phaseOrExecution == "" { - phaseOrExecution = "subscription" - } - id := fmt.Sprintf("%s::%s::%s", md.WorkflowID, phaseOrExecution, md.ReferenceID) - - // Workflow DON reads populate secret identifiers explicitly; OCR paths do not rely on legacy - // top-level protobuf identity fields. + id := vaultutils.BuildWorkflowGetSecretsRequestID(md) + s.lggr.Debugw("received workflow get secrets request", "requestID", id, "request", r.String()) resp, err := s.handleRequest(ctx, id, r) if err != nil { @@ -160,7 +149,7 @@ func (s *Capability) Execute(ctx context.Context, request capabilities.Capabilit } func (s *Capability) CreateSecrets(ctx context.Context, request *vaultcommon.CreateSecretsRequest) (*vaulttypes.Response, error) { - s.lggr.Debugw("received create secrets request", "request", request.String()) + s.lggr.Debugw("received create secrets request", "requestID", request.RequestId, "request", request.String()) if err := validateEncryptedSecretsUniformOwners(request.EncryptedSecrets); err != nil { return nil, err } @@ -173,7 +162,7 @@ func (s *Capability) CreateSecrets(ctx context.Context, request *vaultcommon.Cre } func (s *Capability) UpdateSecrets(ctx context.Context, request *vaultcommon.UpdateSecretsRequest) (*vaulttypes.Response, error) { - s.lggr.Debugw("received update secrets request", "request", request.String()) + s.lggr.Debugw("received update secrets request", "requestID", request.RequestId, "request", request.String()) if err := validateEncryptedSecretsUniformOwners(request.EncryptedSecrets); err != nil { return nil, err } @@ -186,7 +175,7 @@ func (s *Capability) UpdateSecrets(ctx context.Context, request *vaultcommon.Upd } func (s *Capability) DeleteSecrets(ctx context.Context, request *vaultcommon.DeleteSecretsRequest) (*vaulttypes.Response, error) { - s.lggr.Debugw("received delete secrets request", "request", request.String()) + s.lggr.Debugw("received delete secrets request", "requestID", request.RequestId, "request", request.String()) err := s.ValidateDeleteSecretsRequest(ctx, request) if err != nil { s.lggr.Debugw("failed validation checks", "requestID", request.RequestId, "request", request.String(), "err", err) @@ -200,7 +189,7 @@ func (s *Capability) DeleteSecrets(ctx context.Context, request *vaultcommon.Del } func (s *Capability) GetSecrets(ctx context.Context, requestID string, request *vaultcommon.GetSecretsRequest) (*vaulttypes.Response, error) { - s.lggr.Debugw("received get secrets request", "request", request.String()) + s.lggr.Debugw("received get secrets request", "requestID", requestID, "request", request.String()) if err := s.ValidateGetSecretsRequest(ctx, request); err != nil { s.lggr.Debugw("failed validation checks", "requestID", requestID, "request", request.String(), "err", err) return nil, err @@ -211,7 +200,7 @@ func (s *Capability) GetSecrets(ctx context.Context, requestID string, request * } func (s *Capability) ListSecretIdentifiers(ctx context.Context, request *vaultcommon.ListSecretIdentifiersRequest) (*vaulttypes.Response, error) { - s.lggr.Debugw("received list secret identifiers request", "request", request.String()) + s.lggr.Debugw("received list secret identifiers request", "requestID", request.RequestId, "request", request.String()) err := s.ValidateListSecretIdentifiersRequest(ctx, request) if err != nil { s.lggr.Debugw("failed validation checks", "requestID", request.RequestId, "request", request.String(), "err", err) diff --git a/core/capabilities/vault/capability_test.go b/core/capabilities/vault/capability_test.go index 4b92a8eaeb3..e4ab19c2e8a 100644 --- a/core/capabilities/vault/capability_test.go +++ b/core/capabilities/vault/capability_test.go @@ -25,6 +25,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/settings/limits" coreCapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaulttypes" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaultutils" "github.com/smartcontractkit/chainlink/v2/core/logger" ) @@ -52,7 +53,11 @@ func TestCapability_CapabilityCall(t *testing.T) { workflowExecutionID := "test-workflow-execution-id" referenceID := "test-reference-id" - requestID := fmt.Sprintf("%s::%s::%s", workflowID, workflowExecutionID, referenceID) + requestID := vaultutils.BuildWorkflowGetSecretsRequestID(capabilities.RequestMetadata{ + WorkflowID: workflowID, + WorkflowExecutionID: workflowExecutionID, + ReferenceID: referenceID, + }) sid := &vault.SecretIdentifier{ Key: "Foo", @@ -148,7 +153,10 @@ func TestCapability_CapabilityCall_DuringSubscriptionPhase(t *testing.T) { workflowID := "test-workflow-id" referenceID := "0" - requestID := fmt.Sprintf("%s::%s::%s", workflowID, "subscription", referenceID) + requestID := vaultutils.BuildWorkflowGetSecretsRequestID(capabilities.RequestMetadata{ + WorkflowID: workflowID, + ReferenceID: referenceID, + }) sid := &vault.SecretIdentifier{ Key: "Foo", @@ -463,7 +471,11 @@ func TestCapability_CapabilityCall_SecretIdentifierOwnerMismatch(t *testing.T) { require.NoError(t, err) servicetest.Run(t, capability) - requestID := fmt.Sprintf("%s::%s::%s", "wf-id", "exec-id", "ref-id") + requestID := vaultutils.BuildWorkflowGetSecretsRequestID(capabilities.RequestMetadata{ + WorkflowID: "wf-id", + WorkflowExecutionID: "exec-id", + ReferenceID: "ref-id", + }) reqs := []*vault.SecretRequest{} for _, s := range tc.secretOwners { @@ -809,7 +821,11 @@ func TestCapability_CapabilityCall_ReturnsIncorrectType(t *testing.T) { workflowExecutionID := "test-workflow-execution-id" referenceID := "test-reference-id" - requestID := fmt.Sprintf("%s::%s::%s", workflowID, workflowExecutionID, referenceID) + requestID := vaultutils.BuildWorkflowGetSecretsRequestID(capabilities.RequestMetadata{ + WorkflowID: workflowID, + WorkflowExecutionID: workflowExecutionID, + ReferenceID: referenceID, + }) sid := &vault.SecretIdentifier{ Key: "Foo", @@ -883,7 +899,11 @@ func TestCapability_CapabilityCall_TimeOut(t *testing.T) { workflowExecutionID := "test-workflow-execution-id" referenceID := "test-reference-id" - requestID := fmt.Sprintf("%s::%s::%s", workflowID, workflowExecutionID, referenceID) + requestID := vaultutils.BuildWorkflowGetSecretsRequestID(capabilities.RequestMetadata{ + WorkflowID: workflowID, + WorkflowExecutionID: workflowExecutionID, + ReferenceID: referenceID, + }) sid := &vault.SecretIdentifier{ Key: "Foo", diff --git a/core/capabilities/vault/gw_handler.go b/core/capabilities/vault/gw_handler.go index c52e21a1aeb..22cfae0b01b 100644 --- a/core/capabilities/vault/gw_handler.go +++ b/core/capabilities/vault/gw_handler.go @@ -173,8 +173,13 @@ func (h *GatewayHandler) Methods() []string { return vaulttypes.Methods } +func (h *GatewayHandler) requestLogger(req *jsonrpc.Request[json.RawMessage], gatewayID string) logger.Logger { + return h.lggr.With("requestID", req.ID, "method", req.Method, "gatewayID", gatewayID) +} + func (h *GatewayHandler) HandleGatewayMessage(ctx context.Context, gatewayID string, req *jsonrpc.Request[json.RawMessage]) (err error) { - h.lggr.Debugw("received message from gateway", "gatewayID", gatewayID, "req", req, "requestID", req.ID) + reqLggr := h.requestLogger(req, gatewayID) + reqLggr.Debugw("received message from gateway", "req", req) var response *jsonrpc.Response[json.RawMessage] var authResult *AuthResult @@ -219,11 +224,11 @@ func (h *GatewayHandler) HandleGatewayMessage(ctx context.Context, gatewayID str } if err = h.gatewayConnector.SendToGateway(ctx, gatewayID, response); err != nil { - h.lggr.Errorw("Failed to send message to gateway", "gatewayID", gatewayID, "error", err) + reqLggr.Errorw("Failed to send message to gateway", "error", err) return err } - h.lggr.Infow("Sent message to gateway", "gatewayID", gatewayID, "resp", response, "requestID", req.ID) + reqLggr.Infow("Sent message to gateway", "resp", response) h.metrics.requestSuccess.Add(ctx, 1, metric.WithAttributes( attribute.String("gateway_id", gatewayID), )) @@ -387,7 +392,7 @@ func (h *GatewayHandler) errorResponse( errorCode api.ErrorCode, err error, ) *jsonrpc.Response[json.RawMessage] { - h.lggr.Errorw("gateway handler error response", "gatewayID", gatewayID, "requestID", req.ID, "method", req.Method, "errorCode", errorCode, "error", err) + h.requestLogger(req, gatewayID).Errorw("gateway handler error response", "errorCode", errorCode, "error", err) h.metrics.requestInternalError.Add(ctx, 1, metric.WithAttributes( attribute.String("gateway_id", gatewayID), attribute.String("error", errorCode.String()), diff --git a/core/capabilities/vault/vaultutils/request_id.go b/core/capabilities/vault/vaultutils/request_id.go new file mode 100644 index 00000000000..3769227d7df --- /dev/null +++ b/core/capabilities/vault/vaultutils/request_id.go @@ -0,0 +1,25 @@ +package vaultutils + +import ( + "fmt" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" +) + +const subscriptionPhaseKey = "subscription" + +// BuildWorkflowGetSecretsRequestID returns the pending-queue / OCR request ID for a +// workflow GetSecrets call. This is the only place this format should be defined. +// +// We need to generate sufficiently unique IDs accounting for two cases: +// 1. called during the subscription phase, in which case the executionID will be blank +// 2. called during execution, in which case it'll be present. +// The reference ID is unique per phase, so we need to differentiate when generating +// an ID. +func BuildWorkflowGetSecretsRequestID(md capabilities.RequestMetadata) string { + phaseOrExecution := md.WorkflowExecutionID + if phaseOrExecution == "" { + phaseOrExecution = subscriptionPhaseKey + } + return fmt.Sprintf("%s::%s::%s", md.WorkflowID, phaseOrExecution, md.ReferenceID) +} diff --git a/core/capabilities/vault/vaultutils/request_id_test.go b/core/capabilities/vault/vaultutils/request_id_test.go new file mode 100644 index 00000000000..7b5585403bd --- /dev/null +++ b/core/capabilities/vault/vaultutils/request_id_test.go @@ -0,0 +1,52 @@ +package vaultutils + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" +) + +func TestBuildWorkflowGetSecretsRequestID(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + md capabilities.RequestMetadata + want string + }{ + { + name: "execution path", + md: capabilities.RequestMetadata{ + WorkflowID: "wf-1", + WorkflowExecutionID: "abc123sha", + ReferenceID: "42", + }, + want: "wf-1::abc123sha::42", + }, + { + name: "subscription path", + md: capabilities.RequestMetadata{ + WorkflowID: "wf-1", + ReferenceID: "7", + }, + want: "wf-1::subscription::7", + }, + { + name: "gate off empty workflow ID", + md: capabilities.RequestMetadata{ + WorkflowExecutionID: "abc123sha", + ReferenceID: "42", + }, + want: "::abc123sha::42", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tt.want, BuildWorkflowGetSecretsRequestID(tt.md)) + }) + } +} diff --git a/core/services/ocr2/plugins/vault/plugin.go b/core/services/ocr2/plugins/vault/plugin.go index dbd2ff3adb5..25ec3666619 100644 --- a/core/services/ocr2/plugins/vault/plugin.go +++ b/core/services/ocr2/plugins/vault/plugin.go @@ -589,9 +589,10 @@ type pendingQueueStore interface { } func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, keyValueReader ocr3_1types.KeyValueStateReader, blobBroadcastFetcher ocr3_1types.BlobBroadcastFetcher) (types.Observation, error) { + l := r.roundLggr(seqNr) start := time.Now() defer func() { - r.lggr.Debugw("observation finished", "seqNr", seqNr, "elapsed", time.Since(start)) + l.Debugw("observation finished", "elapsed", time.Since(start)) }() readKV := NewReadStore(keyValueReader, r.metrics) @@ -609,7 +610,7 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type // Avoid log spam by only logging if we have any requests to process. if len(currentPendingQueueItems) > 0 { - r.lggr.Debugw("observation started", "seqNr", seqNr, "batchSize", len(currentPendingQueueItems)) + l.Debugw("observation started", "batchSize", len(currentPendingQueueItems)) } obspb := &vaultcommon.Observations{} @@ -707,7 +708,7 @@ func (r *ReportingPlugin) Observation(ctx context.Context, seqNr uint64, aq type // Avoid log spam by only logging if we have any requests to process. if len(currentPendingQueueItems) > 0 { - r.lggr.Debugw("observation complete", "ids", observedIDs, "batchSize", len(currentPendingQueueItems)) + l.Debugw("observation complete", "ids", observedIDs, "batchSize", len(currentPendingQueueItems)) } return types.Observation(obsb), nil } @@ -730,32 +731,30 @@ func (r *ReportingPlugin) appendPendingQueueObservations( payload, err := req.Item.UnmarshalNew() if err != nil { - r.lggr.Errorw("failed to unmarshal request payload", "id", req.Id, "error", err) + r.roundLggr(seqNr).Errorw("failed to unmarshal request payload", "requestID", req.Id, "error", err) continue } switch tp := payload.(type) { case *vaultcommon.GetSecretsRequest: - r.observeGetSecrets(ctx, readKV, tp, o) + r.observeGetSecrets(ctx, seqNr, req.Id, readKV, tp, o) case *vaultcommon.CreateSecretsRequest: - r.observeCreateSecrets(ctx, readKV, tp, o) + r.observeCreateSecrets(ctx, seqNr, req.Id, readKV, tp, o) case *vaultcommon.UpdateSecretsRequest: - r.observeUpdateSecrets(ctx, readKV, tp, o) + r.observeUpdateSecrets(ctx, seqNr, req.Id, readKV, tp, o) case *vaultcommon.DeleteSecretsRequest: - r.observeDeleteSecrets(ctx, readKV, tp, o) + r.observeDeleteSecrets(ctx, seqNr, req.Id, readKV, tp, o) case *vaultcommon.ListSecretIdentifiersRequest: - r.observeListSecretIdentifiers(ctx, readKV, tp, o) + r.observeListSecretIdentifiers(ctx, seqNr, req.Id, readKV, tp, o) default: - r.lggr.Errorw("unknown request type, skipping...", "requestType", fmt.Sprintf("%T", payload), "id", req.Id) + r.typedRequestLggr(seqNr, req.Id, fmt.Sprintf("%T", payload)).Errorw("unknown request type, skipping...") continue } obspb.Observations = append(obspb.Observations, o) if applyWireCap && proto.Size(obspb) > r.maxObservationBytes { obspb.Observations = obspb.Observations[:len(obspb.Observations)-1] - r.lggr.Warnw("observation proto would exceed max observation bytes; stopping pending-queue observation pack", - "seqNr", seqNr, - "id", req.Id, + r.requestLggr(seqNr, req.Id).Warnw("observation proto would exceed max observation bytes; stopping pending-queue observation pack", "maxObservationBytes", r.maxObservationBytes, "packedObservationCount", len(obspb.Observations), "pendingQueueItemCount", len(currentPendingQueueItems), @@ -840,7 +839,8 @@ func (r *ReportingPlugin) broadcastBlobPayloads( return filtered, nil } -func (r *ReportingPlugin) observeGetSecrets(ctx context.Context, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) { +func (r *ReportingPlugin) observeGetSecrets(ctx context.Context, seqNr uint64, requestID string, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) { + l := r.typedRequestLggr(seqNr, requestID, "GetSecrets") tp := req.(*vaultcommon.GetSecretsRequest) o.RequestType = vaultcommon.RequestType_GET_SECRETS if !r.optimizationsEnabled(ctx) { @@ -864,7 +864,7 @@ func (r *ReportingPlugin) observeGetSecrets(ctx context.Context, reader ReadKVSt for _, secretRequest := range tp.Requests { resp, ierr := r.observeGetSecretsRequest(ctx, reader, secretRequest, requestsCountForID) if ierr != nil { - logUserErrorAware(r.lggr, "failed to observe get secret request item", ierr, "id", secretRequest.Id) + logUserErrorAware(l, "failed to observe get secret request item", ierr, "id", secretRequest.Id) errorMsg := userFacingError(ierr, "failed to handle get secret request") resps = append(resps, &vaultcommon.SecretResponse{ Id: secretRequest.Id, @@ -873,7 +873,7 @@ func (r *ReportingPlugin) observeGetSecrets(ctx context.Context, reader ReadKVSt }, }) } else { - r.lggr.Debugw("observed get secret request item", "id", resp.Id) + l.Debugw("observed get secret request item", "id", resp.Id) resps = append(resps, resp) } } @@ -991,13 +991,13 @@ func (r *ReportingPlugin) observeGetSecretsRequest(ctx context.Context, reader R }, nil } -func (r *ReportingPlugin) observeCreateSecrets(ctx context.Context, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) { +func (r *ReportingPlugin) observeCreateSecrets(ctx context.Context, seqNr uint64, requestID string, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) { + l := r.typedRequestLggr(seqNr, requestID, "CreateSecrets") tp := req.(*vaultcommon.CreateSecretsRequest) o.RequestType = vaultcommon.RequestType_CREATE_SECRETS o.Request = &vaultcommon.Observation_CreateSecretsRequest{ CreateSecretsRequest: tp, } - l := r.lggr.With("requestID", tp.RequestId, "requestType", "CreateSecrets") requestsCountForID := map[string]int{} for _, sr := range tp.EncryptedSecrets { @@ -1070,13 +1070,13 @@ func (r *ReportingPlugin) observeCreateSecretRequest(ctx context.Context, _ Read return id, nil } -func (r *ReportingPlugin) observeUpdateSecrets(ctx context.Context, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) { +func (r *ReportingPlugin) observeUpdateSecrets(ctx context.Context, seqNr uint64, requestID string, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) { + l := r.typedRequestLggr(seqNr, requestID, "UpdateSecrets") tp := req.(*vaultcommon.UpdateSecretsRequest) o.RequestType = vaultcommon.RequestType_UPDATE_SECRETS o.Request = &vaultcommon.Observation_UpdateSecretsRequest{ UpdateSecretsRequest: tp, } - l := r.lggr.With("requestID", tp.RequestId, "requestType", "UpdateSecrets") requestsCountForID := map[string]int{} for _, sr := range tp.EncryptedSecrets { @@ -1129,15 +1129,15 @@ func (r *ReportingPlugin) observeUpdateSecretRequest(ctx context.Context, reader return r.observeCreateSecretRequest(ctx, reader, secretRequest, requestsCountForID) } -func (r *ReportingPlugin) observeListSecretIdentifiers(ctx context.Context, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) { +func (r *ReportingPlugin) observeListSecretIdentifiers(ctx context.Context, seqNr uint64, requestID string, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) { tp := req.(*vaultcommon.ListSecretIdentifiersRequest) + l := r.typedRequestLggr(seqNr, requestID, "ListSecretIdentifiers").With("owner", tp.Owner) o.RequestType = vaultcommon.RequestType_LIST_SECRET_IDENTIFIERS o.Request = &vaultcommon.Observation_ListSecretIdentifiersRequest{ ListSecretIdentifiersRequest: tp, } - l := r.lggr.With("requestId", tp.RequestId, "requestType", "ListSecretIdentifiers", "owner", tp.Owner) - resp, err := r.processListSecretIdentifiersRequest(ctx, l, reader, tp) + resp, err := r.processListSecretIdentifiersRequest(ctx, seqNr, requestID, reader, tp) if err != nil { l.Debugw("failed to process list secret identifiers request", "error", err) o.Response = &vaultcommon.Observation_ListSecretIdentifiersResponse{ @@ -1155,7 +1155,7 @@ func (r *ReportingPlugin) observeListSecretIdentifiers(ctx context.Context, read } } -func (r *ReportingPlugin) processListSecretIdentifiersRequest(ctx context.Context, l logger.Logger, reader ReadKVStore, req *vaultcommon.ListSecretIdentifiersRequest) (*vaultcommon.ListSecretIdentifiersResponse, error) { +func (r *ReportingPlugin) processListSecretIdentifiersRequest(ctx context.Context, seqNr uint64, requestID string, reader ReadKVStore, req *vaultcommon.ListSecretIdentifiersRequest) (*vaultcommon.ListSecretIdentifiersResponse, error) { if req.Owner == "" { return nil, errors.New("invalid request: owner cannot be empty") } @@ -1168,7 +1168,7 @@ func (r *ReportingPlugin) processListSecretIdentifiersRequest(ctx context.Contex if md == nil { // No metadata, so the list is empty. // The user hasn't added any items to the vault DON yet. - l.Debugw("successfully read metadata for owner: no metadata found, returning empty list") + r.typedRequestLggr(seqNr, requestID, "ListSecretIdentifiers").With("owner", req.Owner).Debugw("successfully read metadata for owner: no metadata found, returning empty list") return &vaultcommon.ListSecretIdentifiersResponse{Identifiers: []*vaultcommon.SecretIdentifier{}, Success: true}, nil } @@ -1196,13 +1196,13 @@ func (r *ReportingPlugin) processListSecretIdentifiersRequest(ctx context.Contex }, nil } -func (r *ReportingPlugin) observeDeleteSecrets(ctx context.Context, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) { +func (r *ReportingPlugin) observeDeleteSecrets(ctx context.Context, seqNr uint64, requestID string, reader ReadKVStore, req proto.Message, o *vaultcommon.Observation) { + l := r.typedRequestLggr(seqNr, requestID, "DeleteSecrets") tp := req.(*vaultcommon.DeleteSecretsRequest) o.RequestType = vaultcommon.RequestType_DELETE_SECRETS o.Request = &vaultcommon.Observation_DeleteSecretsRequest{ DeleteSecretsRequest: tp, } - l := r.lggr.With("requestId", tp.RequestId, "requestType", "DeleteSecrets") requestsCountForID := map[string]int{} for _, sr := range tp.Ids { @@ -1330,8 +1330,10 @@ func logUserErrorAware(l logger.Logger, msg string, err error, keysAndValues ... } func (r *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64, aq types.AttributedQuery, ao types.AttributedObservation, keyValueReader ocr3_1types.KeyValueStateReader, blobFetcher ocr3_1types.BlobFetcher) error { + valLggr := r.roundLggr(seqNr).With("oracleID", ao.Observer) obs := &vaultcommon.Observations{} if err := proto.Unmarshal([]byte(ao.Observation), obs); err != nil { + valLggr.Debugw("validate observation failed", "error", err) return errors.New("failed to unmarshal observations: " + err.Error()) } @@ -1360,6 +1362,7 @@ func (r *ReportingPlugin) ValidateObservation(ctx context.Context, seqNr uint64, for _, o := range obs.Observations { err = r.validateObservation(ctx, o, pendingGetSecretsByID) if err != nil { + valLggr.Debugw("validate observation failed", "requestID", o.Id, "error", err) return errors.New("invalid observation: " + err.Error()) } @@ -1714,6 +1717,7 @@ func (r *ReportingPlugin) validateListSecretIdentifiersObservation(ctx context.C } func (r *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq types.AttributedQuery, aos []types.AttributedObservation, keyValueReadWriter ocr3_1types.KeyValueStateReadWriter, blobFetcher ocr3_1types.BlobFetcher) (ocr3_1types.ReportsPlusPrecursor, error) { + l := r.roundLggr(seqNr) writeKV := NewWriteStore(keyValueReadWriter, r.metrics) marshalledObs := map[uint8]*vaultcommon.Observations{} @@ -1750,7 +1754,7 @@ func (r *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq } } - r.lggr.Debugw("stateTransition started", "oracleIDsToRequestIDs", oidsToReqIDs) + l.Debugw("stateTransition started", "oracleIDsToRequestIDs", oidsToReqIDs) os := &vaultcommon.Outcomes{ Outcomes: []*vaultcommon.Outcome{}, @@ -1790,14 +1794,14 @@ func (r *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq } else { chosen = shaToObs[sha] } - r.lggr.Debugw("sufficient observations for sha", "sha", sha, "requestType", "GetSecrets", "count", len(obs), "threshold", 2*r.onchainCfg.F+1, "id", id) + l.Debugw("sufficient observations for sha", "sha", sha, "requestType", "GetSecrets", "count", len(obs), "threshold", 2*r.onchainCfg.F+1, "requestID", id) case o.RequestType != vaultcommon.RequestType_GET_SECRETS && len(obs) >= r.onchainCfg.F+1: // F+1 means that at least 1 honest node has provided this observation, so that's enough for all other request // types. // Technically we could have two shas with F+1 observations. If that happens we'll pick the last one. // This is deterministic since we're sorting by shas above. chosen = shaToObs[sha] - r.lggr.Debugw("sufficient observations for sha", "sha", sha, "count", len(obs), "threshold", r.onchainCfg.F+1, "id", id) + l.Debugw("sufficient observations for sha", "sha", sha, "count", len(obs), "threshold", r.onchainCfg.F+1, "requestID", id) } } @@ -1806,7 +1810,7 @@ func (r *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq for sha, obs := range shaToObs { shaToObsCount[sha] = len(obs) } - r.lggr.Warnw("insufficient observations found for id", "id", id, "shaToObsCount", shaToObsCount) + l.Warnw("insufficient observations found for requestID", "requestID", id, "shaToObsCount", shaToObsCount) continue } @@ -1836,8 +1840,8 @@ func (r *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq os.Outcomes = append(os.Outcomes, o) if r.optimizationsEnabled(ctx) && proto.Size(os) > r.maxReportsPlusPrecursorBytes { os.Outcomes = os.Outcomes[:len(os.Outcomes)-1] - r.lggr.Warnw("state transition: more observations than can be included in response", - "id", id, + l.Warnw("state transition: more observations than can be included in response", + "requestID", id, "maxReportsPlusPrecursorBytes", r.maxReportsPlusPrecursorBytes, "packedOutcomeCount", len(os.Outcomes), "scheduledRequestIDs", len(obsMap), @@ -1865,7 +1869,7 @@ func (r *ReportingPlugin) StateTransition(ctx context.Context, seqNr uint64, aq } if len(os.Outcomes) > 0 { - r.lggr.Debugw("State transition complete", "count", len(os.Outcomes), "err", err) + l.Debugw("State transition complete", "outcomeCount", len(os.Outcomes)) } return ocr3_1types.ReportsPlusPrecursor(ospb), nil } @@ -2437,6 +2441,7 @@ func (r *ReportingPlugin) Committed(ctx context.Context, seqNr uint64, keyValueR } func (r *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlusPrecursor ocr3_1types.ReportsPlusPrecursor) ([]ocr3types.ReportPlus[[]byte], error) { + l := r.roundLggr(seqNr) outcomes := &vaultcommon.Outcomes{} err := proto.Unmarshal([]byte(reportsPlusPrecursor), outcomes) if err != nil { @@ -2449,7 +2454,7 @@ func (r *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlus case vaultcommon.RequestType_GET_SECRETS: rep, err := r.generateProtoReport(o.Id, o.RequestType, o.GetGetSecretsResponse()) if err != nil { - r.lggr.Errorw("failed to generate Proto report", "error", err, "id", o.Id) + l.Errorw("failed to generate Proto report", "error", err, "requestID", o.Id) continue } @@ -2463,7 +2468,7 @@ func (r *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlus } rep, err := r.generateJSONReport(o.Id, o.RequestType, createResp, r.signedResponseRequestIDEnabled(ctx)) if err != nil { - r.lggr.Errorw("failed to generate JSON report", "error", err, "id", o.Id) + l.Errorw("failed to generate JSON report", "error", err, "requestID", o.Id) continue } @@ -2477,7 +2482,7 @@ func (r *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlus } rep, err := r.generateJSONReport(o.Id, o.RequestType, updateResp, r.signedResponseRequestIDEnabled(ctx)) if err != nil { - r.lggr.Errorw("failed to generate JSON report", "error", err, "id", o.Id) + l.Errorw("failed to generate JSON report", "error", err, "requestID", o.Id) continue } @@ -2491,7 +2496,7 @@ func (r *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlus } rep, err := r.generateJSONReport(o.Id, o.RequestType, deleteResp, r.signedResponseRequestIDEnabled(ctx)) if err != nil { - r.lggr.Errorw("failed to generate JSON report", "error", err, "id", o.Id) + l.Errorw("failed to generate JSON report", "error", err, "requestID", o.Id) continue } @@ -2505,7 +2510,7 @@ func (r *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlus } rep, err := r.generateJSONReport(o.Id, o.RequestType, listResp, r.signedResponseRequestIDEnabled(ctx)) if err != nil { - r.lggr.Errorw("failed to generate JSON report", "error", err, "id", o.Id) + l.Errorw("failed to generate JSON report", "error", err, "requestID", o.Id) continue } @@ -2517,7 +2522,7 @@ func (r *ReportingPlugin) Reports(ctx context.Context, seqNr uint64, reportsPlus } if len(reports) > 0 { - r.lggr.Debugw("Reports complete", "count", len(reports)) + l.Debugw("Reports complete", "reportCount", len(reports)) } return reports, nil } diff --git a/core/services/ocr2/plugins/vault/plugin_test.go b/core/services/ocr2/plugins/vault/plugin_test.go index 68b8b0b109e..1b27096a70b 100644 --- a/core/services/ocr2/plugins/vault/plugin_test.go +++ b/core/services/ocr2/plugins/vault/plugin_test.go @@ -2844,7 +2844,7 @@ func TestPlugin_StateTransition_InsufficientObservations(t *testing.T) { assert.Empty(t, os.Outcomes, 0) - assert.Equal(t, 1, observed.FilterMessage("insufficient observations found for id").Len()) + assert.Equal(t, 1, observed.FilterMessage("insufficient observations found for requestID").Len()) } func TestPlugin_StateTransition_GetSecretsRequest_ResponseSizeWithinLimit(t *testing.T) { @@ -3202,7 +3202,7 @@ func TestPlugin_StateTransition_ShasDontMatch(t *testing.T) { assert.Empty(t, os.Outcomes) - assert.Equal(t, 1, observed.FilterMessage("insufficient observations found for id").Len()) + assert.Equal(t, 1, observed.FilterMessage("insufficient observations found for requestID").Len()) } func TestPlugin_StateTransition_AggregatesValidationErrors(t *testing.T) { diff --git a/core/services/ocr2/plugins/vault/plugin_utils.go b/core/services/ocr2/plugins/vault/plugin_utils.go index 0afdd4426ad..8fee27d5236 100644 --- a/core/services/ocr2/plugins/vault/plugin_utils.go +++ b/core/services/ocr2/plugins/vault/plugin_utils.go @@ -199,3 +199,15 @@ func initializePluginLimits(ctx context.Context, limitsFactory limits.Factory) ( MaxPerOracleUnexpiredBlobCount: maxPerOracleUnexpiredBlobCount, }, nil } + +func (r *ReportingPlugin) roundLggr(seqNr uint64) logger.Logger { + return r.lggr.With("seqNr", seqNr) +} + +func (r *ReportingPlugin) requestLggr(seqNr uint64, requestID string) logger.Logger { + return r.roundLggr(seqNr).With("requestID", requestID) +} + +func (r *ReportingPlugin) typedRequestLggr(seqNr uint64, requestID, requestType string) logger.Logger { + return r.requestLggr(seqNr, requestID).With("requestType", requestType) +} diff --git a/core/services/ocr2/plugins/vault/transmitter.go b/core/services/ocr2/plugins/vault/transmitter.go index 52181ec6171..9eb4e5551a8 100644 --- a/core/services/ocr2/plugins/vault/transmitter.go +++ b/core/services/ocr2/plugins/vault/transmitter.go @@ -87,7 +87,7 @@ func (c *Transmitter) Transmit(ctx context.Context, cd types.ConfigDigest, seqNr signatures[i] = s.Signature } - c.lggr.Debugw("transmitting report", "requestID", info.Id, "requestType", info.Format.String()) + c.lggr.Debugw("transmitting report", "seqNr", seqNr, "requestID", info.Id, "requestType", info.Format.String()) c.lifecycle.RecordTransmitted(ctx, info.Id, seqNr, time.Now()) c.handler.SendResponse(ctx, &vaulttypes.Response{ ID: info.Id, diff --git a/core/services/workflows/v2/secrets.go b/core/services/workflows/v2/secrets.go index 82ee7ef124a..4234ddf9b4a 100644 --- a/core/services/workflows/v2/secrets.go +++ b/core/services/workflows/v2/secrets.go @@ -28,6 +28,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" sdkpb "github.com/smartcontractkit/chainlink-protos/cre/go/sdk" "github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaulttypes" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/vault/vaultutils" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/types" "github.com/smartcontractkit/chainlink-common/keystore/corekeys/workflowkey" @@ -118,12 +119,34 @@ func keyFor(owner, namespace, id string) string { return fmt.Sprintf("%s::%s::%s", owner, namespace, id) } +func (s *secretsFetcher) vaultGetSecretsMetadata(ctx context.Context, callbackID int64) capabilities.RequestMetadata { + metadata := capabilities.RequestMetadata{ + WorkflowOwner: s.workflowOwner, + WorkflowName: s.workflowName, + WorkflowExecutionID: sha(s.phaseID, strconv.FormatInt(callbackID, 10)), + ReferenceID: strconv.FormatInt(callbackID, 10), + } + if propagateOrgIDMeta, _ := cresettings.Default.PropagateOrgIDInRequestMetadata.GetOrDefault(ctx, s.creSettingsGetter); propagateOrgIDMeta && s.orgID != "" { + metadata.OrgID = s.orgID + // WorkflowID is under this gate because we previously skipped setting workflowID on SecretsFetcher entirely. Now setting it safely. + metadata.WorkflowID = s.workflowID + } + return metadata +} + func (s *secretsFetcher) GetSecrets(ctx context.Context, request *sdkpb.GetSecretsRequest) ([]*sdkpb.SecretResponse, error) { ctx = contexts.WithCRE(ctx, contexts.CRE{ Org: s.orgID, Owner: s.workflowOwner, Workflow: s.workflowID, }) + var callbackID int64 + if request != nil { + callbackID = int64(request.CallbackId) + } + metadata := s.vaultGetSecretsMetadata(ctx, callbackID) + vaultRequestID := vaultutils.BuildWorkflowGetSecretsRequestID(metadata) + s.lggr.Debugw("get secrets request received", "vaultRequestID", vaultRequestID, "metadata", metadata) s.callCounter.mu.Lock() secretsCalled := s.callCounter.called + 1 if err := s.secretsCallsLimit.Check(ctx, secretsCalled); err != nil { @@ -143,8 +166,12 @@ func (s *secretsFetcher) GetSecrets(ctx context.Context, request *sdkpb.GetSecre }() getSecretsDuration := time.Since(start).Milliseconds() if err != nil { - // Log errors when secrets fetching fails, for troubleshooting and debugging - s.lggr.Warnw("Secrets fetching failed for request", "request", request, "error", err, "requestLatency", getSecretsDuration) + s.lggr.Warnw("Secrets fetching failed for request", + "vaultRequestID", vaultRequestID, + "metadata", metadata, + "error", err, + "requestLatency", getSecretsDuration, + ) } s.metrics.With( "workflowOwner", s.workflowOwner, @@ -258,17 +285,8 @@ func (s *secretsFetcher) GetRawSecrets(ctx context.Context, request *sdkpb.GetSe if err != nil { return nil, fmt.Errorf("failed to get encryption keys: %w", err) } - metadata := capabilities.RequestMetadata{ - WorkflowOwner: s.workflowOwner, - WorkflowName: s.workflowName, - WorkflowExecutionID: sha(s.phaseID, strconv.FormatInt(int64(request.CallbackId), 10)), - ReferenceID: strconv.FormatInt(int64(request.CallbackId), 10), - } - if propagateOrgIDMeta, _ := cresettings.Default.PropagateOrgIDInRequestMetadata.GetOrDefault(ctx, s.creSettingsGetter); propagateOrgIDMeta && s.orgID != "" { - metadata.OrgID = s.orgID - // WorkflowID is under this gate because we previously skipped setting workflowID on SecretsFetcher entirely. Now setting it safely. - metadata.WorkflowID = s.workflowID - } + metadata := s.vaultGetSecretsMetadata(ctx, int64(request.CallbackId)) + vaultRequestID := vaultutils.BuildWorkflowGetSecretsRequestID(metadata) vp := &vault.GetSecretsRequest{ Requests: make([]*vault.SecretRequest, 0), } @@ -300,8 +318,8 @@ func (s *secretsFetcher) GetRawSecrets(ctx context.Context, request *sdkpb.GetSe return nil, fmt.Errorf("failed to convert vault request to any: %w", err) } - lggr := logger.With(s.lggr, "requestedKeys", logKeys, "metadata", metadata) - lggr.Debug("fetching secrets...") + lggr := logger.With(s.lggr, "requestedKeys", logKeys, "vaultRequestID", vaultRequestID, "metadata", metadata) + lggr.Debugw("fetching secrets from vault") capabilityResponse, err := vaultCap.Execute(ctx, capabilities.CapabilityRequest{ Payload: anypbReq, @@ -314,7 +332,7 @@ func (s *secretsFetcher) GetRawSecrets(ctx context.Context, request *sdkpb.GetSe return nil, fmt.Errorf("failed to execute vault.GetSecrets: %w", err) } - lggr.Debug("successfully fetched secrets from vault capability") + lggr.Debugw("successfully fetched secrets from vault capability") batchedVaultResponse := &vault.GetSecretsResponse{} err = capabilityResponse.Payload.UnmarshalTo(batchedVaultResponse)