-
Notifications
You must be signed in to change notification settings - Fork 203
Add Impersonate Service Account argument #2015
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
cf3881c
8c2e093
35a0e62
a96b601
2086be4
3b6cd51
26b6528
868c5a3
25c4480
3891fdb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,12 @@ | ||
| #!/bin/bash | ||
| set -e | ||
|
|
||
| yarn install | ||
|
|
||
| bazel build //packages/@dataform/cli:bin | ||
|
|
||
| bazel run packages/@dataform/cli:package.pack | ||
| bazel run packages/@dataform/core:package.pack | ||
|
|
||
| mv dataform-cli-3.0.59.tgz ../hephaestus-worker-base/dataform/dataform-cli-3.0.59.tgz | ||
| mv dataform-core-3.0.59.tgz ../hephaestus-worker-base/dataform/dataform-core-3.0.59.tgz | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,5 @@ | ||
| import { BigQuery, GetTablesResponse, TableField, TableMetadata } from "@google-cloud/bigquery"; | ||
| import { GoogleAuth, Impersonated } from "google-auth-library"; | ||
| import Long from "long"; | ||
| import { PromisePoolExecutor } from "promise-pool-executor"; | ||
|
|
||
|
|
@@ -17,7 +18,9 @@ import { coerceAsError } from "df/common/errors/errors"; | |
| import { retry } from "df/common/promises"; | ||
| import { dataform } from "df/protos/ts"; | ||
|
|
||
| const GOOGLE_CLOUD_PLATFORM_SCOPE = "https://www.googleapis.com/auth/cloud-platform"; | ||
| const EXTRA_GOOGLE_SCOPES = ["https://www.googleapis.com/auth/drive"]; | ||
| const IMPERSONATION_GOOGLE_SCOPES = [GOOGLE_CLOUD_PLATFORM_SCOPE, ...EXTRA_GOOGLE_SCOPES]; | ||
|
|
||
| const BIGQUERY_DATE_RELATED_FIELDS = [ | ||
| "BigQueryDate", | ||
|
|
@@ -37,24 +40,40 @@ export interface IBigQueryExecutionOptions { | |
| reservation?: string; | ||
| } | ||
|
|
||
| export type BigQueryClientProvider = (projectId?: string) => BigQuery; | ||
| export type BigQueryClientProvider = (projectId?: string) => BigQuery | Promise<BigQuery>; | ||
|
|
||
| export function createBigQueryClientProvider( | ||
| credentials: dataform.IBigQuery | ||
| ): BigQueryClientProvider { | ||
| const clients = new Map<string, BigQuery>(); | ||
| return (projectId?: string) => { | ||
| return async (projectId?: string) => { | ||
| projectId = projectId || credentials.projectId; | ||
| if (!clients.has(projectId)) { | ||
| clients.set( | ||
| const clientConfig: any = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| projectId, | ||
| new BigQuery({ | ||
| scopes: EXTRA_GOOGLE_SCOPES, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are |
||
| location: credentials.location | ||
| }; | ||
|
|
||
| if (credentials.impersonateServiceAccount) { | ||
| const sourceAuth = new GoogleAuth({ | ||
| projectId, | ||
| scopes: EXTRA_GOOGLE_SCOPES, | ||
| location: credentials.location, | ||
| scopes: IMPERSONATION_GOOGLE_SCOPES, | ||
| credentials: credentials.credentials && JSON.parse(credentials.credentials) | ||
| }) | ||
| ); | ||
| }); | ||
|
|
||
| const authClient = await sourceAuth.getClient(); | ||
|
|
||
| clientConfig.authClient = new Impersonated({ | ||
| sourceClient: authClient, | ||
| targetPrincipal: credentials.impersonateServiceAccount, | ||
| targetScopes: IMPERSONATION_GOOGLE_SCOPES | ||
| }); | ||
| } else { | ||
| clientConfig.credentials = credentials.credentials && JSON.parse(credentials.credentials); | ||
| } | ||
|
|
||
| clients.set(projectId, new BigQuery(clientConfig)); | ||
| } | ||
| return clients.get(projectId); | ||
| }; | ||
|
|
@@ -143,7 +162,7 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| return this.pool | ||
| .addSingleTask({ | ||
| generator: async () => { | ||
| const [rows, , apiResponse] = await this.getClient().query({ | ||
| const [rows, , apiResponse] = await (await this.getClient()).query({ | ||
| ...this.prepareQueryOptions(statement, options.rowLimit, options.bigquery, options.params), | ||
| skipParsing: true | ||
| } as any); | ||
|
|
@@ -166,8 +185,8 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| try { | ||
| await this.pool | ||
| .addSingleTask({ | ||
| generator: () => | ||
| this.getClient().query({ | ||
| generator: async () => | ||
| (await this.getClient()).query({ | ||
| useLegacySql: false, | ||
| query, | ||
| dryRun: true | ||
|
|
@@ -193,11 +212,12 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
|
|
||
| public async tables(database: string, schema?: string): Promise<dataform.ITableMetadata[]> { | ||
| const datasetIds = schema ? [schema] : await this.schemas(database); | ||
| const client = await this.getClient(database); | ||
| const tablesMetadata: dataform.ITableMetadata[] = []; | ||
|
|
||
| await Promise.all( | ||
| datasetIds.map(async datasetId => { | ||
| const [tables] = await this.getClient(database) | ||
| const [tables] = await client | ||
| .dataset(datasetId) | ||
| .getTables({ autoPaginate: true, maxResults: 1000 }); | ||
| await Promise.all( | ||
|
|
@@ -290,14 +310,17 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| } | ||
|
|
||
| public async deleteTable(target: dataform.ITarget): Promise<void> { | ||
| await this.getClient(target.database) | ||
| await (await this.getClient(target.database)) | ||
| .dataset(target.schema) | ||
| .table(target.name) | ||
| .delete({ ignoreNotFound: true }); | ||
| } | ||
|
|
||
| public async schemas(database: string): Promise<string[]> { | ||
| const data = await this.getClient(database).getDatasets({ autoPaginate: true, maxResults: 1000 }); | ||
| const data = await (await this.getClient(database)).getDatasets({ | ||
| autoPaginate: true, | ||
| maxResults: 1000 | ||
| }); | ||
| return data[0].map(dataset => dataset.id); | ||
| } | ||
|
|
||
|
|
@@ -317,7 +340,7 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| metadata.schema.fields | ||
| ); | ||
|
|
||
| await this.getClient(target.database) | ||
| await (await this.getClient(target.database)) | ||
| .dataset(target.schema) | ||
| .table(target.name) | ||
| .setMetadata({ | ||
|
|
@@ -329,7 +352,7 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
|
|
||
| private async getMetadata(target: dataform.ITarget): Promise<TableMetadata> { | ||
| try { | ||
| const table = await this.getClient(target.database) | ||
| const table = await (await this.getClient(target.database)) | ||
| .dataset(target.schema) | ||
| .table(target.name) | ||
| .getMetadata(); | ||
|
|
@@ -344,8 +367,8 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| } | ||
| } | ||
|
|
||
| private getClient(projectId?: string) { | ||
| return this.clientProvider(projectId); | ||
| private async getClient(projectId?: string) { | ||
|
kolina marked this conversation as resolved.
|
||
| return await this.clientProvider(projectId); | ||
| } | ||
|
|
||
| private async runQuery( | ||
|
|
@@ -355,12 +378,12 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| byteLimit?: number, | ||
| location?: string | ||
| ): Promise<IExecutionResult> { | ||
| const results = await new Promise<any[]>((resolve, reject) => { | ||
| const results = await new Promise<any[]>(async (resolve, reject) => { | ||
| const allRows = new LimitedResultSet({ | ||
| rowLimit, | ||
| byteLimit | ||
| }); | ||
| const stream = this.getClient().createQueryStream({ | ||
| const stream = (await this.getClient()).createQueryStream({ | ||
| query, | ||
| params, | ||
| location | ||
|
|
@@ -416,7 +439,8 @@ export class BigQueryDbAdapter implements IDbAdapter { | |
| return retry( | ||
| async () => { | ||
| try { | ||
| const job = await this.getClient().createQueryJob( | ||
| const client = await this.getClient(); | ||
| const job = await client.createQueryJob( | ||
| this.prepareQueryOptions( | ||
| query, | ||
| rowLimit, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,6 @@ | ||
| import * as chokidar from "chokidar"; | ||
| import * as fs from "fs"; | ||
| import * as glob from "glob"; | ||
| import parseDuration from "parse-duration"; | ||
| import * as path from "path"; | ||
| import yargs from "yargs"; | ||
|
|
||
|
|
@@ -28,6 +27,7 @@ import { | |
| actuallyResolve, | ||
| assertPathExists, | ||
| compiledGraphHasErrors, | ||
| parseCliDuration, | ||
| promptForIcebergConfig, | ||
| } from "df/cli/util"; | ||
| import { createYargsCli, INamedOption } from "df/cli/yargswrapper"; | ||
|
|
@@ -174,7 +174,7 @@ const timeoutOption: INamedOption<yargs.Options> = { | |
| type: "string", | ||
| default: null, | ||
| coerce: (rawTimeoutString: string | null) => | ||
| rawTimeoutString ? parseDuration(rawTimeoutString) : null | ||
| rawTimeoutString ? parseCliDuration(rawTimeoutString) : null | ||
| } | ||
| }; | ||
|
|
||
|
|
@@ -207,6 +207,13 @@ const bigqueryJobLabelsOption: INamedOption<yargs.Options> = { | |
| } | ||
| }; | ||
|
|
||
| const impersonateServiceAccountOption: INamedOption<yargs.Options> = { | ||
| name: "impersonate-service-account", | ||
| option: { | ||
| describe: "Service account email to impersonate during authentication.", | ||
| type: "string" | ||
| } | ||
| }; | ||
| const quietCompileOption: INamedOption<yargs.Options> = { | ||
| name: "quiet", | ||
| option: { | ||
|
|
@@ -503,7 +510,13 @@ export function runCli() { | |
| format: `test [${projectDirMustExistOption.name}]`, | ||
| description: "Run the dataform project's unit tests.", | ||
| positionalOptions: [projectDirMustExistOption], | ||
| options: [credentialsOption, timeoutOption, jsonOutputOption, ...ProjectConfigOptions.allYargsOptions], | ||
| options: [ | ||
| credentialsOption, | ||
| impersonateServiceAccountOption, | ||
| timeoutOption, | ||
| jsonOutputOption, | ||
| ...ProjectConfigOptions.allYargsOptions | ||
| ], | ||
| processFn: async argv => { | ||
| if (!argv[jsonOutputOption.name]) { | ||
| print("Compiling...\n"); | ||
|
|
@@ -523,6 +536,10 @@ export function runCli() { | |
| const readCredentials = credentials.read( | ||
| getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name]) | ||
| ); | ||
| if (argv[impersonateServiceAccountOption.name]) { | ||
| (readCredentials as any).impersonateServiceAccount = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we extend |
||
| argv[impersonateServiceAccountOption.name]; | ||
| } | ||
|
|
||
| if (!compiledGraph.tests.length) { | ||
| printError("No unit tests found."); | ||
|
|
@@ -574,10 +591,10 @@ export function runCli() { | |
| }, | ||
| actionsOption, | ||
| credentialsOption, | ||
| impersonateServiceAccountOption, | ||
| fullRefreshOption, | ||
| includeDepsOption, | ||
| includeDependentsOption, | ||
| credentialsOption, | ||
| jsonOutputOption, | ||
| timeoutOption, | ||
| tagsOption, | ||
|
|
@@ -610,6 +627,10 @@ export function runCli() { | |
| const readCredentials = credentials.read( | ||
| getCredentialsPath(argv[projectDirOption.name], argv[credentialsOption.name]) | ||
| ); | ||
| if (argv[impersonateServiceAccountOption.name]) { | ||
| (readCredentials as any).impersonateServiceAccount = | ||
| argv[impersonateServiceAccountOption.name]; | ||
| } | ||
|
|
||
| const dbadapter = new BigQueryDbAdapter(readCredentials); | ||
| const executionGraph = await build( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Non-intended diff?