次の方法で共有


SDK を使用して外部アプリを Lakebase に接続する

Important

Lakebase 自動スケールは、ベータ版としてeastus2westeuropewestusの各リージョンで利用できます。

Lakebase 自動スケーリングは、自動スケール コンピューティング、ゼロへのスケーリング、分岐、インスタント リストアを備えた最新バージョンの Lakebase です。 Lakebase Provisioned との機能の比較については、バージョンの選択を参照してください。

このガイドでは、OAuth トークンローテーションで標準の Postgres ドライバー (psycopg、pgx、JDBC) を使用して、外部アプリケーションを Lakebase 自動スケーリングに接続する方法について説明します。 Azure Databricks SDK は、各新しい接続を開くときに generate_database_credential() を呼び出すサービス プリンシパルと接続プールと共に使用するため、接続するたびに新しいトークン (60 分の有効期間) を取得します。 Python、Java、Go の例を示します。 自動資格情報管理を使用して簡単にセットアップできるように、 代わりに Azure Databricks Apps を検討してください。

ビルドする内容: OAuth トークンローテーションを使用して外部アプリケーションから Lakebase 自動スケーリングに接続し、接続が機能することを確認する接続パターン。

Databricks SDK (Python v0.89.0 以降、Java v0.73.0 以降、または Go v0.109.0 以降) が必要です。 次の手順を実行します。

:::tip その他の言語 Databricks SDK がサポートされていない言語 (Node.js、Ruby、PHP、Elixir、Rust など) については、「 API を使用して外部アプリを Lakebase に接続する」を参照してください。 :::

トークンスコープ: データベース資格情報トークンはワークスペース スコープです。 endpoint パラメーターが必要ですが、返されたトークンは、サービス プリンシパルにアクセス許可があるワークスペース内の任意のデータベースまたはプロジェクトにアクセスできます。

動作方法

Databricks SDK は、ワークスペース トークン管理を自動的に処理することで OAuth 認証を簡素化します。

SDK OAuth フロー

アプリケーションは、エンドポイント パラメーターを使用して generate_database_credential() を呼び出します。 SDK は、ワークスペース OAuth トークンを内部的に取得し (コードは必要ありません)、Lakebase API からデータベース資格情報を要求して、アプリケーションに返します。 その後、Postgres に接続するときに、この資格情報をパスワードとして使用します。

ワークスペース OAuth トークンとデータベース資格情報の両方が 60 分後に期限切れになります。 接続プールは、新しい接続を作成するときに generate_database_credential() を呼び出すことによって自動更新を処理します。

1. OAuth シークレットを使用してサービス プリンシパルを作成する

OAuth シークレットを使用して Azure Databricks サービス プリンシパルを作成します。 詳細については、「 サービス プリンシパル アクセスの承認」を参照してください。 外部アプリをビルドする場合は、次の点に注意してください。

  • シークレットを任意の有効期間 (最大 730 日間) に設定します。 これにより、シークレットを更新する必要がある頻度が定義されます。これは、ローテーションを使用してデータベース資格情報を生成するために使用されます。
  • サービス プリンシパルに対して "ワークスペース アクセス" を有効にします ([設定] → [ID とアクセス] → [サービス プリンシパル] → {name} → [構成] タブ)。 新しいデータベース資格情報を生成するために必要です。
  • クライアント ID (UUID) をメモします。 これは、アプリのセットアップと PGUSERで一致する Postgres ロールを作成するときに使用します。

2. サービス プリンシパルの Postgres ロールを作成する

Lakebase UI では、パスワードベースのロールのみがサポートされます。 手順 1 のクライアント ID を使用して、Lakebase SQL エディターで OAuth ロールを作成します (表示名ではありません。ロール名では大文字と小文字が区別されます)。

-- Enable the auth extension (if not already enabled)
CREATE EXTENSION IF NOT EXISTS databricks_auth;

-- Create OAuth role using the service principal client ID
SELECT databricks_create_role('{client-id}', 'SERVICE_PRINCIPAL');

-- Grant database permissions
GRANT CONNECT ON DATABASE databricks_postgres TO "{client-id}";
GRANT USAGE ON SCHEMA public TO "{client-id}";
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO "{client-id}";
ALTER DEFAULT PRIVILEGES IN SCHEMA public
  GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO "{client-id}";

{client-id}をサービス プリンシパル クライアント ID に置き換えます。 OAuth ロールの作成を参照してください。

3. 接続の詳細を取得する

Lakebase コンソールでプロジェクトから [ 接続] をクリックし、[ブランチとエンドポイント] を選択し、 ホストデータベース (通常は databricks_postgres)、 エンドポイント名 (形式: projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>) をメモします。

または、CLI を使用します。

databricks postgres list-endpoints projects/<project-id>/branches/<branch-id>

詳細については、 接続文字列 を参照してください。

4. 環境変数を設定する

アプリケーションを実行する前に、次の環境変数を設定します。

# Databricks workspace authentication
export DATABRICKS_HOST="https://your-workspace.databricks.com"
export DATABRICKS_CLIENT_ID="<service-principal-client-id>"
export DATABRICKS_CLIENT_SECRET="<your-oauth-secret>"

# Lakebase connection details (from step 3)
export ENDPOINT_NAME="projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>"
export PGHOST="<endpoint-id>.database.<region>.databricks.com"
export PGDATABASE="databricks_postgres"
export PGUSER="<service-principal-client-id>"   # Same UUID as step 1
export PGPORT="5432"
export PGSSLMODE="require"   # Python only

5. 接続コードを追加する

Python

この例では、psycopg3 をカスタム接続クラスと共に使用し、プールが新しい接続を作成するときに新しいトークンを生成します。

import os
from databricks.sdk import WorkspaceClient
import psycopg
from psycopg_pool import ConnectionPool

# Initialize Databricks SDK
workspace_client = None

def _get_workspace_client():
    """Get or create the workspace client for OAuth."""
    global workspace_client
    if workspace_client is None:
        workspace_client = WorkspaceClient(
            host=os.environ["DATABRICKS_HOST"],
            client_id=os.environ["DATABRICKS_CLIENT_ID"],
            client_secret=os.environ["DATABRICKS_CLIENT_SECRET"],
        )
    return workspace_client

def _get_endpoint_name():
    """Get endpoint name from environment."""
    name = os.environ.get("ENDPOINT_NAME")
    if not name:
        raise ValueError(
            "ENDPOINT_NAME must be set (format: projects/<id>/branches/<id>/endpoints/<id>)"
        )
    return name

class OAuthConnection(psycopg.Connection):
    """Custom connection class that generates a fresh OAuth token per connection."""

    @classmethod
    def connect(cls, conninfo="", **kwargs):
        endpoint_name = _get_endpoint_name()
        client = _get_workspace_client()
        # Generate database credential (tokens are workspace-scoped)
        credential = client.postgres.generate_database_credential(
            endpoint=endpoint_name
        )
        kwargs["password"] = credential.token
        return super().connect(conninfo, **kwargs)

# Create connection pool with OAuth token rotation
def get_connection_pool():
    """Get or create the connection pool."""
    database = os.environ["PGDATABASE"]
    user = os.environ["PGUSER"]
    host = os.environ["PGHOST"]
    port = os.environ.get("PGPORT", "5432")
    sslmode = os.environ.get("PGSSLMODE", "require")

    conninfo = f"dbname={database} user={user} host={host} port={port} sslmode={sslmode}"

    return ConnectionPool(
        conninfo=conninfo,
        connection_class=OAuthConnection,
        min_size=1,
        max_size=10,
        open=True,
    )

# Use the pool in your application
pool = get_connection_pool()
with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT current_user, current_database()")
        print(cur.fetchone())

依存関係:databricks-sdk>=0.89.0psycopg[binary,pool]>=3.1.0

Go

この例では、新しい接続ごとに新しいトークンを生成する BeforeConnect コールバックで pgxpool を使用します。

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/databricks/databricks-sdk-go"
	"github.com/databricks/databricks-sdk-go/service/postgres"
	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
)

func createConnectionPool(ctx context.Context) (*pgxpool.Pool, error) {
	// Initialize Databricks workspace client
	w, err := databricks.NewWorkspaceClient(&databricks.Config{
		Host:         os.Getenv("DATABRICKS_HOST"),
		ClientID:     os.Getenv("DATABRICKS_CLIENT_ID"),
		ClientSecret: os.Getenv("DATABRICKS_CLIENT_SECRET"),
	})
	if err != nil {
		return nil, err
	}

	// Build connection string
	connStr := fmt.Sprintf("host=%s port=%s dbname=%s user=%s sslmode=require",
		os.Getenv("PGHOST"),
		os.Getenv("PGPORT"),
		os.Getenv("PGDATABASE"),
		os.Getenv("PGUSER"))

	config, err := pgxpool.ParseConfig(connStr)
	if err != nil {
		return nil, err
	}

	// Configure pool
	config.MaxConns = 10
	config.MinConns = 1
	config.MaxConnLifetime = 45 * time.Minute
	config.MaxConnIdleTime = 15 * time.Minute

	// Generate fresh token for each new connection
	config.BeforeConnect = func(ctx context.Context, connConfig *pgx.ConnConfig) error {
		credential, err := w.Postgres.GenerateDatabaseCredential(ctx,
			postgres.GenerateDatabaseCredentialRequest{
				Endpoint: os.Getenv("ENDPOINT_NAME"),
			})
		if err != nil {
			return err
		}
		connConfig.Password = credential.Token
		return nil
	}

	return pgxpool.NewWithConfig(ctx, config)
}

func main() {
	ctx := context.Background()
	pool, err := createConnectionPool(ctx)
	if err != nil {
		log.Fatal(err)
	}
	defer pool.Close()

	var user, database string
	err = pool.QueryRow(ctx, "SELECT current_user, current_database()").Scan(&user, &database)
	if err != nil {
		log.Fatal(err)
	}

	fmt.Printf("Connected as: %s to database: %s\n", user, database)
}

依存 関係: Databricks SDK for Go v0.109.0 以降 (github.com/databricks/databricks-sdk-go)、pgx ドライバー (github.com/jackc/pgx/v5)

メモ:BeforeConnectコールバックにより、新しい接続ごとに新しい OAuth トークンが確保され、実行時間の長いアプリケーションの自動トークン ローテーションが処理されます。

Java

この例では、HikariCP で JDBC を使用し、プールが新しい接続を作成するたびに新しいトークンを生成するカスタム DataSource を使用します。

import java.sql.*;
import javax.sql.DataSource;
import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.core.DatabricksConfig;
import com.databricks.sdk.service.postgres.*;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

public class LakebaseConnection {

    private static WorkspaceClient workspaceClient() {
        String host = System.getenv("DATABRICKS_HOST");
        String clientId = System.getenv("DATABRICKS_CLIENT_ID");
        String clientSecret = System.getenv("DATABRICKS_CLIENT_SECRET");

        return new WorkspaceClient(new DatabricksConfig()
            .setHost(host)
            .setClientId(clientId)
            .setClientSecret(clientSecret));
    }

    private static DataSource createDataSource() {
        WorkspaceClient w = workspaceClient();
        String endpointName = System.getenv("ENDPOINT_NAME");
        String host = System.getenv("PGHOST");
        String database = System.getenv("PGDATABASE");
        String user = System.getenv("PGUSER");
        String port = System.getenv().getOrDefault("PGPORT", "5432");

        String jdbcUrl = "jdbc:postgresql://" + host + ":" + port +
                         "/" + database + "?sslmode=require";

        // DataSource that returns a new connection with a fresh token (tokens are workspace-scoped)
        DataSource tokenDataSource = new DataSource() {
            @Override
            public Connection getConnection() throws SQLException {
                DatabaseCredential cred = w.postgres().generateDatabaseCredential(
                    new GenerateDatabaseCredentialRequest().setEndpoint(endpointName)
                );
                return DriverManager.getConnection(jdbcUrl, user, cred.getToken());
            }

            @Override
            public Connection getConnection(String u, String p) {
                throw new UnsupportedOperationException();
            }
            // ... other DataSource methods (getLogWriter, etc.)
        };

        // Wrap in HikariCP for connection pooling
        HikariConfig config = new HikariConfig();
        config.setDataSource(tokenDataSource);
        config.setMaximumPoolSize(10);
        config.setMinimumIdle(1);
        // Recycle connections before 60-min token expiry
        config.setMaxLifetime(45 * 60 * 1000L);

        return new HikariDataSource(config);
    }

    public static void main(String[] args) throws SQLException {
        DataSource pool = createDataSource();

        try (Connection conn = pool.getConnection();
             Statement st = conn.createStatement();
             ResultSet rs = st.executeQuery("SELECT current_user, current_database()")) {
            if (rs.next()) {
                System.out.println("User: " + rs.getString(1));
                System.out.println("Database: " + rs.getString(2));
            }
        }
    }
}

依存 関係: Databricks SDK for Java v0.73.0+ (com.databricks:databricks-sdk-java)、PostgreSQL JDBC ドライバー (org.postgresql:postgresql)、HikariCP (com.zaxxer:HikariCP)

6. 接続を実行して確認する

Python

依存関係をインストールします。

pip install databricks-sdk psycopg[binary,pool]

実行:

# Save all the code from step 5 (above) as db.py, then run:
from db import get_connection_pool

pool = get_connection_pool()
with pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT current_user, current_database()")
        print(cur.fetchone())

想定される出力:

('c00f575e-d706-4f6b-b62c-e7a14850571b', 'databricks_postgres')

current_user手順 1 のサービス プリンシパル クライアント ID と一致する場合、OAuth トークンのローテーションが機能しています。

Java

メモ: これは、上記の Java 例の依存関係を pom.xmlに含む Maven プロジェクトがあることを前提としています。

依存関係をインストールします。

mvn install

実行:

mvn exec:java -Dexec.mainClass="com.example.LakebaseConnection"

想定される出力:

User: c00f575e-d706-4f6b-b62c-e7a14850571b
Database: databricks_postgres

ユーザーが手順 1 のサービス プリンシパル クライアント ID と一致する場合、OAuth トークンのローテーションが機能しています。

Go

依存関係をインストールします。

go mod init myapp
go get github.com/databricks/databricks-sdk-go
go get github.com/jackc/pgx/v5

実行:

go run main.go

想定される出力:

Connected as: c00f575e-d706-4f6b-b62c-e7a14850571b to database: databricks_postgres

ユーザーが手順 1 のサービス プリンシパル クライアント ID と一致する場合、OAuth トークンのローテーションが機能しています。

メモ: Lakebase Autoscaling が 0 からコンピューティングを開始すると、アイドル後の最初の接続に時間がかかる場合があります。

トラブルシューティング

エラー 修正
"ワークスペース アクセス権を持たないユーザーに対して API が無効になっています" サービス プリンシパルの "ワークスペース アクセス" を有効にします (手順 1)。
"ロールが存在しない" か、認証が失敗する UI ではなく、SQL (手順 2) を使用して OAuth ロールを作成します。
"接続が拒否されました" または "エンドポイントが見つかりません" ENDPOINT_NAME形式projects/<id>/branches/<id>/endpoints/<id>使用します。エンドポイント ID はホスト内にあります。
"無効なユーザー" または "ユーザーが見つかりません" PGUSER表示名ではなく、サービス プリンシパル クライアント ID (UUID) に設定します。