Skip to content

Commit 1dfebed

Browse files
fix: pipelines postgres support and multitenancy (#7371)
* fix: pipelines postgres support and multitenancy * fix: minor fixes * fix: address minor comments * fix: rename package pipelinetypes
1 parent b36d2ec commit 1dfebed

32 files changed

+816
-629
lines changed

ee/query-service/app/server.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) {
217217

218218
// ingestion pipelines manager
219219
logParsingPipelineController, err := logparsingpipeline.NewLogParsingPipelinesController(
220-
serverOptions.SigNoz.SQLStore.SQLxDB(), integrationsController.GetPipelinesForInstalledIntegrations,
220+
serverOptions.SigNoz.SQLStore, integrationsController.GetPipelinesForInstalledIntegrations,
221221
)
222222
if err != nil {
223223
return nil, err

pkg/query-service/agentConf/manager.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,7 @@ func (m *Manager) RecommendAgentConfig(currentConfYaml []byte) (
106106
return nil, "", errors.Wrap(apiErr.ToError(), "failed to get latest agent config version")
107107
}
108108

109-
updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig(
110-
recommendation, latestConfig,
111-
)
109+
updatedConf, serializedSettingsUsed, apiErr := feature.RecommendAgentConfig(recommendation, latestConfig)
112110
if apiErr != nil {
113111
return nil, "", errors.Wrap(apiErr.ToError(), fmt.Sprintf(
114112
"failed to generate agent config recommendation for %s", featureType,

pkg/query-service/app/http_handler.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ import (
6060
"github.com/SigNoz/signoz/pkg/query-service/postprocess"
6161
"github.com/SigNoz/signoz/pkg/types"
6262
"github.com/SigNoz/signoz/pkg/types/authtypes"
63+
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
6364

6465
"go.uber.org/zap"
6566

@@ -4462,6 +4463,11 @@ func (aH *APIHandler) PreviewLogsPipelinesHandler(w http.ResponseWriter, r *http
44624463
}
44634464

44644465
func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Request) {
4466+
claims, ok := authtypes.ClaimsFromContext(r.Context())
4467+
if !ok {
4468+
render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
4469+
return
4470+
}
44654471

44664472
version, err := parseAgentConfigVersion(r)
44674473
if err != nil {
@@ -4473,9 +4479,9 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
44734479
var apierr *model.ApiError
44744480

44754481
if version != -1 {
4476-
payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), version)
4482+
payload, apierr = aH.listLogsPipelinesByVersion(context.Background(), claims.OrgID, version)
44774483
} else {
4478-
payload, apierr = aH.listLogsPipelines(context.Background())
4484+
payload, apierr = aH.listLogsPipelines(context.Background(), claims.OrgID)
44794485
}
44804486

44814487
if apierr != nil {
@@ -4486,7 +4492,7 @@ func (aH *APIHandler) ListLogsPipelinesHandler(w http.ResponseWriter, r *http.Re
44864492
}
44874493

44884494
// listLogsPipelines lists logs piplines for latest version
4489-
func (aH *APIHandler) listLogsPipelines(ctx context.Context) (
4495+
func (aH *APIHandler) listLogsPipelines(ctx context.Context, orgID string) (
44904496
*logparsingpipeline.PipelinesResponse, *model.ApiError,
44914497
) {
44924498
// get lateset agent config
@@ -4516,7 +4522,7 @@ func (aH *APIHandler) listLogsPipelines(ctx context.Context) (
45164522
}
45174523

45184524
// listLogsPipelinesByVersion lists pipelines along with config version history
4519-
func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version int) (
4525+
func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, orgID string, version int) (
45204526
*logparsingpipeline.PipelinesResponse, *model.ApiError,
45214527
) {
45224528
payload, err := aH.LogsParsingPipelineController.GetPipelinesByVersion(ctx, version)
@@ -4537,7 +4543,13 @@ func (aH *APIHandler) listLogsPipelinesByVersion(ctx context.Context, version in
45374543

45384544
func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request) {
45394545

4540-
req := logparsingpipeline.PostablePipelines{}
4546+
claims, ok := authtypes.ClaimsFromContext(r.Context())
4547+
if !ok {
4548+
render.Error(w, errorsV2.Newf(errorsV2.TypeUnauthenticated, errorsV2.CodeUnauthenticated, "unauthenticated"))
4549+
return
4550+
}
4551+
4552+
req := pipelinetypes.PostablePipelines{}
45414553

45424554
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
45434555
RespondError(w, model.BadRequest(err), nil)
@@ -4546,7 +4558,7 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
45464558

45474559
createPipeline := func(
45484560
ctx context.Context,
4549-
postable []logparsingpipeline.PostablePipeline,
4561+
postable []pipelinetypes.PostablePipeline,
45504562
) (*logparsingpipeline.PipelinesResponse, *model.ApiError) {
45514563
if len(postable) == 0 {
45524564
zap.L().Warn("found no pipelines in the http request, this will delete all the pipelines")
@@ -4557,7 +4569,7 @@ func (aH *APIHandler) CreateLogsPipeline(w http.ResponseWriter, r *http.Request)
45574569
return nil, validationErr
45584570
}
45594571

4560-
return aH.LogsParsingPipelineController.ApplyPipelines(ctx, postable)
4572+
return aH.LogsParsingPipelineController.ApplyPipelines(ctx, claims.OrgID, postable)
45614573
}
45624574

45634575
res, err := createPipeline(r.Context(), req.Pipelines)

pkg/query-service/app/integrations/controller.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@ import (
55
"fmt"
66

77
"github.com/SigNoz/signoz/pkg/query-service/agentConf"
8-
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
98
"github.com/SigNoz/signoz/pkg/query-service/model"
109
"github.com/SigNoz/signoz/pkg/sqlstore"
1110
"github.com/SigNoz/signoz/pkg/types"
11+
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
1212
)
1313

1414
type Controller struct {
@@ -124,7 +124,7 @@ func (c *Controller) Uninstall(
124124

125125
func (c *Controller) GetPipelinesForInstalledIntegrations(
126126
ctx context.Context,
127-
) ([]logparsingpipeline.Pipeline, *model.ApiError) {
127+
) ([]pipelinetypes.GettablePipeline, *model.ApiError) {
128128
return c.mgr.GetPipelinesForInstalledIntegrations(ctx)
129129
}
130130

pkg/query-service/app/integrations/manager.go

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ import (
77
"strings"
88
"time"
99

10-
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
1110
"github.com/SigNoz/signoz/pkg/query-service/model"
1211
"github.com/SigNoz/signoz/pkg/query-service/rules"
1312
"github.com/SigNoz/signoz/pkg/query-service/utils"
1413
"github.com/SigNoz/signoz/pkg/types"
14+
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
1515
"github.com/google/uuid"
1616
"github.com/jmoiron/sqlx"
1717
)
@@ -39,7 +39,7 @@ type IntegrationAssets struct {
3939
}
4040

4141
type LogsAssets struct {
42-
Pipelines []logparsingpipeline.PostablePipeline `json:"pipelines"`
42+
Pipelines []pipelinetypes.PostablePipeline `json:"pipelines"`
4343
}
4444

4545
type IntegrationConfigStep struct {
@@ -257,33 +257,34 @@ func (m *Manager) UninstallIntegration(
257257

258258
func (m *Manager) GetPipelinesForInstalledIntegrations(
259259
ctx context.Context,
260-
) ([]logparsingpipeline.Pipeline, *model.ApiError) {
260+
) ([]pipelinetypes.GettablePipeline, *model.ApiError) {
261261
installedIntegrations, apiErr := m.getInstalledIntegrations(ctx)
262262
if apiErr != nil {
263263
return nil, apiErr
264264
}
265265

266-
pipelines := []logparsingpipeline.Pipeline{}
266+
gettablePipelines := []pipelinetypes.GettablePipeline{}
267267
for _, ii := range installedIntegrations {
268268
for _, p := range ii.Assets.Logs.Pipelines {
269-
pp := logparsingpipeline.Pipeline{
269+
gettablePipelines = append(gettablePipelines, pipelinetypes.GettablePipeline{
270270
// Alias is used for identifying integration pipelines. Id can't be used for this
271271
// since versioning while saving pipelines requires a new id for each version
272272
// to avoid altering history when pipelines are edited/reordered etc
273-
Alias: AliasForIntegrationPipeline(ii.Id, p.Alias),
274-
Id: uuid.NewString(),
275-
OrderId: p.OrderId,
276-
Enabled: p.Enabled,
277-
Name: p.Name,
278-
Description: &p.Description,
279-
Filter: p.Filter,
280-
Config: p.Config,
281-
}
282-
pipelines = append(pipelines, pp)
273+
StoreablePipeline: pipelinetypes.StoreablePipeline{
274+
Alias: AliasForIntegrationPipeline(ii.Id, p.Alias),
275+
ID: uuid.NewString(),
276+
OrderID: p.OrderID,
277+
Enabled: p.Enabled,
278+
Name: p.Name,
279+
Description: p.Description,
280+
},
281+
Filter: p.Filter,
282+
Config: p.Config,
283+
})
283284
}
284285
}
285286

286-
return pipelines, nil
287+
return gettablePipelines, nil
287288
}
288289

289290
func (m *Manager) dashboardUuid(integrationId string, dashboardId string) string {

pkg/query-service/app/integrations/pipeline_utils.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package integrations
33
import (
44
"strings"
55

6-
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
76
"github.com/SigNoz/signoz/pkg/query-service/constants"
7+
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
88
)
99

1010
const IntegrationPipelineIdSeparator string = "--"
@@ -20,7 +20,7 @@ func AliasForIntegrationPipeline(
2020

2121
// Returns ptr to integration_id string if `p` is a pipeline for an installed integration.
2222
// Returns null otherwise.
23-
func IntegrationIdForPipeline(p logparsingpipeline.Pipeline) *string {
23+
func IntegrationIdForPipeline(p pipelinetypes.GettablePipeline) *string {
2424
if strings.HasPrefix(p.Alias, constants.IntegrationPipelineIdPrefix) {
2525
parts := strings.Split(p.Alias, IntegrationPipelineIdSeparator)
2626
if len(parts) < 2 {

pkg/query-service/app/integrations/test_utils.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ import (
55
"slices"
66
"testing"
77

8-
"github.com/SigNoz/signoz/pkg/query-service/app/logparsingpipeline"
98
"github.com/SigNoz/signoz/pkg/query-service/model"
109
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
1110
"github.com/SigNoz/signoz/pkg/query-service/rules"
1211
"github.com/SigNoz/signoz/pkg/query-service/utils"
1312
"github.com/SigNoz/signoz/pkg/types"
13+
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
1414
)
1515

1616
func NewTestIntegrationsManager(t *testing.T) *Manager {
@@ -59,7 +59,7 @@ func (t *TestAvailableIntegrationsRepo) list(
5959
},
6060
Assets: IntegrationAssets{
6161
Logs: LogsAssets{
62-
Pipelines: []logparsingpipeline.PostablePipeline{
62+
Pipelines: []pipelinetypes.PostablePipeline{
6363
{
6464
Name: "pipeline1",
6565
Alias: "pipeline1",
@@ -78,7 +78,7 @@ func (t *TestAvailableIntegrationsRepo) list(
7878
},
7979
},
8080
},
81-
Config: []logparsingpipeline.PipelineOperator{
81+
Config: []pipelinetypes.PipelineOperator{
8282
{
8383
OrderId: 1,
8484
ID: "add",
@@ -127,7 +127,7 @@ func (t *TestAvailableIntegrationsRepo) list(
127127
},
128128
Assets: IntegrationAssets{
129129
Logs: LogsAssets{
130-
Pipelines: []logparsingpipeline.PostablePipeline{
130+
Pipelines: []pipelinetypes.PostablePipeline{
131131
{
132132
Name: "pipeline2",
133133
Alias: "pipeline2",
@@ -146,7 +146,7 @@ func (t *TestAvailableIntegrationsRepo) list(
146146
},
147147
},
148148
},
149-
Config: []logparsingpipeline.PipelineOperator{
149+
Config: []pipelinetypes.PipelineOperator{
150150
{
151151
OrderId: 1,
152152
ID: "add",

pkg/query-service/app/logparsingpipeline/collector_config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/SigNoz/signoz/pkg/query-service/constants"
1212
coreModel "github.com/SigNoz/signoz/pkg/query-service/model"
13+
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
1314
"github.com/pkg/errors"
1415
"go.uber.org/zap"
1516
)
@@ -164,7 +165,7 @@ func checkDuplicateString(pipeline []string) bool {
164165

165166
func GenerateCollectorConfigWithPipelines(
166167
config []byte,
167-
pipelines []Pipeline,
168+
pipelines []pipelinetypes.GettablePipeline,
168169
) ([]byte, *coreModel.ApiError) {
169170
var collectorConf map[string]interface{}
170171
err := yaml.Unmarshal([]byte(config), &collectorConf)

pkg/query-service/app/logparsingpipeline/collector_config_test.go

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/SigNoz/signoz/pkg/query-service/constants"
99
"github.com/SigNoz/signoz/pkg/query-service/model"
1010
v3 "github.com/SigNoz/signoz/pkg/query-service/model/v3"
11+
"github.com/SigNoz/signoz/pkg/types/pipelinetypes"
1112
. "github.com/smartystreets/goconvey/convey"
1213
"github.com/stretchr/testify/require"
1314
"gopkg.in/yaml.v3"
@@ -230,12 +231,14 @@ func TestPipelineAliasCollisionsDontResultInDuplicateCollectorProcessors(t *test
230231
- memory
231232
`)
232233

233-
makeTestPipeline := func(name string, alias string) Pipeline {
234-
return Pipeline{
235-
OrderId: 1,
236-
Name: name,
237-
Alias: alias,
238-
Enabled: true,
234+
makeTestPipeline := func(name string, alias string) pipelinetypes.GettablePipeline {
235+
return pipelinetypes.GettablePipeline{
236+
StoreablePipeline: pipelinetypes.StoreablePipeline{
237+
OrderID: 1,
238+
Name: name,
239+
Alias: alias,
240+
Enabled: true,
241+
},
239242
Filter: &v3.FilterSet{
240243
Operator: "AND",
241244
Items: []v3.FilterItem{
@@ -250,7 +253,7 @@ func TestPipelineAliasCollisionsDontResultInDuplicateCollectorProcessors(t *test
250253
},
251254
},
252255
},
253-
Config: []PipelineOperator{
256+
Config: []pipelinetypes.PipelineOperator{
254257
{
255258
ID: "regex",
256259
Type: "regex_parser",
@@ -264,7 +267,7 @@ func TestPipelineAliasCollisionsDontResultInDuplicateCollectorProcessors(t *test
264267
}
265268
}
266269

267-
testPipelines := []Pipeline{
270+
testPipelines := []pipelinetypes.GettablePipeline{
268271
makeTestPipeline("test pipeline 1", "pipeline-alias"),
269272
makeTestPipeline("test pipeline 2", "pipeline-alias"),
270273
}
@@ -299,12 +302,14 @@ func TestPipelineAliasCollisionsDontResultInDuplicateCollectorProcessors(t *test
299302
func TestPipelineRouterWorksEvenIfFirstOpIsDisabled(t *testing.T) {
300303
require := require.New(t)
301304

302-
testPipelines := []Pipeline{
305+
testPipelines := []pipelinetypes.GettablePipeline{
303306
{
304-
OrderId: 1,
305-
Name: "pipeline1",
306-
Alias: "pipeline1",
307-
Enabled: true,
307+
StoreablePipeline: pipelinetypes.StoreablePipeline{
308+
OrderID: 1,
309+
Name: "pipeline1",
310+
Alias: "pipeline1",
311+
Enabled: true,
312+
},
308313
Filter: &v3.FilterSet{
309314
Operator: "AND",
310315
Items: []v3.FilterItem{
@@ -319,7 +324,7 @@ func TestPipelineRouterWorksEvenIfFirstOpIsDisabled(t *testing.T) {
319324
},
320325
},
321326
},
322-
Config: []PipelineOperator{
327+
Config: []pipelinetypes.PipelineOperator{
323328
{
324329
OrderId: 1,
325330
ID: "add",
@@ -370,12 +375,14 @@ func TestPipelineRouterWorksEvenIfFirstOpIsDisabled(t *testing.T) {
370375
func TestPipeCharInAliasDoesntBreakCollectorConfig(t *testing.T) {
371376
require := require.New(t)
372377

373-
testPipelines := []Pipeline{
378+
testPipelines := []pipelinetypes.GettablePipeline{
374379
{
375-
OrderId: 1,
376-
Name: "test | pipeline",
377-
Alias: "test|pipeline",
378-
Enabled: true,
380+
StoreablePipeline: pipelinetypes.StoreablePipeline{
381+
OrderID: 1,
382+
Name: "test | pipeline",
383+
Alias: "test|pipeline",
384+
Enabled: true,
385+
},
379386
Filter: &v3.FilterSet{
380387
Operator: "AND",
381388
Items: []v3.FilterItem{
@@ -390,7 +397,7 @@ func TestPipeCharInAliasDoesntBreakCollectorConfig(t *testing.T) {
390397
},
391398
},
392399
},
393-
Config: []PipelineOperator{
400+
Config: []pipelinetypes.PipelineOperator{
394401
{
395402
OrderId: 1,
396403
ID: "add",

0 commit comments

Comments
 (0)