Important
Lakebase 自動スケールは、ベータ版としてeastus2、westeurope、westusの各リージョンで利用できます。
Lakebase 自動スケーリングは、自動スケール コンピューティング、ゼロへのスケーリング、分岐、インスタント リストアを備えた最新バージョンの Lakebase です。 Lakebase Provisioned との機能の比較については、バージョンの選択を参照してください。
このガイドでは、OAuth トークンローテーションで標準の Postgres ドライバー (psycopg、pgx、JDBC) を使用して、外部アプリケーションを Lakebase 自動スケーリングに接続する方法について説明します。 Azure Databricks SDK は、各新しい接続を開くときに generate_database_credential() を呼び出すサービス プリンシパルと接続プールと共に使用するため、接続するたびに新しいトークン (60 分の有効期間) を取得します。 Python、Java、Go の例を示します。 自動資格情報管理を使用して簡単にセットアップできるように、 代わりに Azure Databricks Apps を検討してください。
ビルドする内容: OAuth トークンローテーションを使用して外部アプリケーションから Lakebase 自動スケーリングに接続し、接続が機能することを確認する接続パターン。
Databricks SDK (Python v0.89.0 以降、Java v0.73.0 以降、または Go v0.109.0 以降) が必要です。 次の手順を実行します。
:::tip その他の言語 Databricks SDK がサポートされていない言語 (Node.js、Ruby、PHP、Elixir、Rust など) については、「 API を使用して外部アプリを Lakebase に接続する」を参照してください。 :::
注
トークンスコープ: データベース資格情報トークンはワークスペース スコープです。
endpoint パラメーターが必要ですが、返されたトークンは、サービス プリンシパルにアクセス許可があるワークスペース内の任意のデータベースまたはプロジェクトにアクセスできます。
動作方法
Databricks SDK は、ワークスペース トークン管理を自動的に処理することで OAuth 認証を簡素化します。
アプリケーションは、エンドポイント パラメーターを使用して generate_database_credential() を呼び出します。 SDK は、ワークスペース OAuth トークンを内部的に取得し (コードは必要ありません)、Lakebase API からデータベース資格情報を要求して、アプリケーションに返します。 その後、Postgres に接続するときに、この資格情報をパスワードとして使用します。
ワークスペース OAuth トークンとデータベース資格情報の両方が 60 分後に期限切れになります。 接続プールは、新しい接続を作成するときに generate_database_credential() を呼び出すことによって自動更新を処理します。
1. OAuth シークレットを使用してサービス プリンシパルを作成する
OAuth シークレットを使用して Azure Databricks サービス プリンシパルを作成します。 詳細については、「 サービス プリンシパル アクセスの承認」を参照してください。 外部アプリをビルドする場合は、次の点に注意してください。
- シークレットを任意の有効期間 (最大 730 日間) に設定します。 これにより、シークレットを更新する必要がある頻度が定義されます。これは、ローテーションを使用してデータベース資格情報を生成するために使用されます。
- サービス プリンシパルに対して "ワークスペース アクセス" を有効にします ([設定] → [ID とアクセス] → [サービス プリンシパル] →
{name}→ [構成] タブ)。 新しいデータベース資格情報を生成するために必要です。 -
クライアント ID (UUID) をメモします。 これは、アプリのセットアップと
PGUSERで一致する Postgres ロールを作成するときに使用します。
2. サービス プリンシパルの Postgres ロールを作成する
Lakebase UI では、パスワードベースのロールのみがサポートされます。 手順 1 のクライアント ID を使用して、Lakebase SQL エディターで OAuth ロールを作成します (表示名ではありません。ロール名では大文字と小文字が区別されます)。
-- Enable the auth extension (if not already enabled)
CREATE EXTENSION IF NOT EXISTS databricks_auth;
-- Create OAuth role using the service principal client ID
SELECT databricks_create_role('{client-id}', 'SERVICE_PRINCIPAL');
-- Grant database permissions
GRANT CONNECT ON DATABASE databricks_postgres TO "{client-id}";
GRANT USAGE ON SCHEMA public TO "{client-id}";
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO "{client-id}";
ALTER DEFAULT PRIVILEGES IN SCHEMA public
GRANT SELECT, INSERT, UPDATE, DELETE ON TABLES TO "{client-id}";
{client-id}をサービス プリンシパル クライアント ID に置き換えます。
OAuth ロールの作成を参照してください。
3. 接続の詳細を取得する
Lakebase コンソールでプロジェクトから [ 接続] をクリックし、[ブランチとエンドポイント] を選択し、 ホスト、 データベース (通常は databricks_postgres)、 エンドポイント名 (形式: projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>) をメモします。
または、CLI を使用します。
databricks postgres list-endpoints projects/<project-id>/branches/<branch-id>
詳細については、 接続文字列 を参照してください。
4. 環境変数を設定する
アプリケーションを実行する前に、次の環境変数を設定します。
# Databricks workspace authentication
export DATABRICKS_HOST="https://your-workspace.databricks.com"
export DATABRICKS_CLIENT_ID="<service-principal-client-id>"
export DATABRICKS_CLIENT_SECRET="<your-oauth-secret>"
# Lakebase connection details (from step 3)
export ENDPOINT_NAME="projects/<project-id>/branches/<branch-id>/endpoints/<endpoint-id>"
export PGHOST="<endpoint-id>.database.<region>.databricks.com"
export PGDATABASE="databricks_postgres"
export PGUSER="<service-principal-client-id>" # Same UUID as step 1
export PGPORT="5432"
export PGSSLMODE="require" # Python only
5. 接続コードを追加する
Python
この例では、psycopg3 をカスタム接続クラスと共に使用し、プールが新しい接続を作成するときに新しいトークンを生成します。
import os
from databricks.sdk import WorkspaceClient
import psycopg
from psycopg_pool import ConnectionPool
# Initialize Databricks SDK
workspace_client = None
def _get_workspace_client():
"""Get or create the workspace client for OAuth."""
global workspace_client
if workspace_client is None:
workspace_client = WorkspaceClient(
host=os.environ["DATABRICKS_HOST"],
client_id=os.environ["DATABRICKS_CLIENT_ID"],
client_secret=os.environ["DATABRICKS_CLIENT_SECRET"],
)
return workspace_client
def _get_endpoint_name():
"""Get endpoint name from environment."""
name = os.environ.get("ENDPOINT_NAME")
if not name:
raise ValueError(
"ENDPOINT_NAME must be set (format: projects/<id>/branches/<id>/endpoints/<id>)"
)
return name
class OAuthConnection(psycopg.Connection):
"""Custom connection class that generates a fresh OAuth token per connection."""
@classmethod
def connect(cls, conninfo="", **kwargs):
endpoint_name = _get_endpoint_name()
client = _get_workspace_client()
# Generate database credential (tokens are workspace-scoped)
credential = client.postgres.generate_database_credential(
endpoint=endpoint_name
)
kwargs["password"] = credential.token
return super().connect(conninfo, **kwargs)
# Create connection pool with OAuth token rotation
def get_connection_pool():
"""Get or create the connection pool."""
database = os.environ["PGDATABASE"]
user = os.environ["PGUSER"]
host = os.environ["PGHOST"]
port = os.environ.get("PGPORT", "5432")
sslmode = os.environ.get("PGSSLMODE", "require")
conninfo = f"dbname={database} user={user} host={host} port={port} sslmode={sslmode}"
return ConnectionPool(
conninfo=conninfo,
connection_class=OAuthConnection,
min_size=1,
max_size=10,
open=True,
)
# Use the pool in your application
pool = get_connection_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT current_user, current_database()")
print(cur.fetchone())
依存関係:databricks-sdk>=0.89.0、psycopg[binary,pool]>=3.1.0
Go
この例では、新しい接続ごとに新しいトークンを生成する BeforeConnect コールバックで pgxpool を使用します。
package main
import (
"context"
"fmt"
"log"
"os"
"time"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/service/postgres"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
func createConnectionPool(ctx context.Context) (*pgxpool.Pool, error) {
// Initialize Databricks workspace client
w, err := databricks.NewWorkspaceClient(&databricks.Config{
Host: os.Getenv("DATABRICKS_HOST"),
ClientID: os.Getenv("DATABRICKS_CLIENT_ID"),
ClientSecret: os.Getenv("DATABRICKS_CLIENT_SECRET"),
})
if err != nil {
return nil, err
}
// Build connection string
connStr := fmt.Sprintf("host=%s port=%s dbname=%s user=%s sslmode=require",
os.Getenv("PGHOST"),
os.Getenv("PGPORT"),
os.Getenv("PGDATABASE"),
os.Getenv("PGUSER"))
config, err := pgxpool.ParseConfig(connStr)
if err != nil {
return nil, err
}
// Configure pool
config.MaxConns = 10
config.MinConns = 1
config.MaxConnLifetime = 45 * time.Minute
config.MaxConnIdleTime = 15 * time.Minute
// Generate fresh token for each new connection
config.BeforeConnect = func(ctx context.Context, connConfig *pgx.ConnConfig) error {
credential, err := w.Postgres.GenerateDatabaseCredential(ctx,
postgres.GenerateDatabaseCredentialRequest{
Endpoint: os.Getenv("ENDPOINT_NAME"),
})
if err != nil {
return err
}
connConfig.Password = credential.Token
return nil
}
return pgxpool.NewWithConfig(ctx, config)
}
func main() {
ctx := context.Background()
pool, err := createConnectionPool(ctx)
if err != nil {
log.Fatal(err)
}
defer pool.Close()
var user, database string
err = pool.QueryRow(ctx, "SELECT current_user, current_database()").Scan(&user, &database)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Connected as: %s to database: %s\n", user, database)
}
依存 関係: Databricks SDK for Go v0.109.0 以降 (github.com/databricks/databricks-sdk-go)、pgx ドライバー (github.com/jackc/pgx/v5)
メモ:BeforeConnectコールバックにより、新しい接続ごとに新しい OAuth トークンが確保され、実行時間の長いアプリケーションの自動トークン ローテーションが処理されます。
Java
この例では、HikariCP で JDBC を使用し、プールが新しい接続を作成するたびに新しいトークンを生成するカスタム DataSource を使用します。
import java.sql.*;
import javax.sql.DataSource;
import com.databricks.sdk.WorkspaceClient;
import com.databricks.sdk.core.DatabricksConfig;
import com.databricks.sdk.service.postgres.*;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
public class LakebaseConnection {
private static WorkspaceClient workspaceClient() {
String host = System.getenv("DATABRICKS_HOST");
String clientId = System.getenv("DATABRICKS_CLIENT_ID");
String clientSecret = System.getenv("DATABRICKS_CLIENT_SECRET");
return new WorkspaceClient(new DatabricksConfig()
.setHost(host)
.setClientId(clientId)
.setClientSecret(clientSecret));
}
private static DataSource createDataSource() {
WorkspaceClient w = workspaceClient();
String endpointName = System.getenv("ENDPOINT_NAME");
String host = System.getenv("PGHOST");
String database = System.getenv("PGDATABASE");
String user = System.getenv("PGUSER");
String port = System.getenv().getOrDefault("PGPORT", "5432");
String jdbcUrl = "jdbc:postgresql://" + host + ":" + port +
"/" + database + "?sslmode=require";
// DataSource that returns a new connection with a fresh token (tokens are workspace-scoped)
DataSource tokenDataSource = new DataSource() {
@Override
public Connection getConnection() throws SQLException {
DatabaseCredential cred = w.postgres().generateDatabaseCredential(
new GenerateDatabaseCredentialRequest().setEndpoint(endpointName)
);
return DriverManager.getConnection(jdbcUrl, user, cred.getToken());
}
@Override
public Connection getConnection(String u, String p) {
throw new UnsupportedOperationException();
}
// ... other DataSource methods (getLogWriter, etc.)
};
// Wrap in HikariCP for connection pooling
HikariConfig config = new HikariConfig();
config.setDataSource(tokenDataSource);
config.setMaximumPoolSize(10);
config.setMinimumIdle(1);
// Recycle connections before 60-min token expiry
config.setMaxLifetime(45 * 60 * 1000L);
return new HikariDataSource(config);
}
public static void main(String[] args) throws SQLException {
DataSource pool = createDataSource();
try (Connection conn = pool.getConnection();
Statement st = conn.createStatement();
ResultSet rs = st.executeQuery("SELECT current_user, current_database()")) {
if (rs.next()) {
System.out.println("User: " + rs.getString(1));
System.out.println("Database: " + rs.getString(2));
}
}
}
}
依存 関係: Databricks SDK for Java v0.73.0+ (com.databricks:databricks-sdk-java)、PostgreSQL JDBC ドライバー (org.postgresql:postgresql)、HikariCP (com.zaxxer:HikariCP)
6. 接続を実行して確認する
Python
依存関係をインストールします。
pip install databricks-sdk psycopg[binary,pool]
実行:
# Save all the code from step 5 (above) as db.py, then run:
from db import get_connection_pool
pool = get_connection_pool()
with pool.connection() as conn:
with conn.cursor() as cur:
cur.execute("SELECT current_user, current_database()")
print(cur.fetchone())
想定される出力:
('c00f575e-d706-4f6b-b62c-e7a14850571b', 'databricks_postgres')
current_user手順 1 のサービス プリンシパル クライアント ID と一致する場合、OAuth トークンのローテーションが機能しています。
Java
メモ: これは、上記の Java 例の依存関係を pom.xmlに含む Maven プロジェクトがあることを前提としています。
依存関係をインストールします。
mvn install
実行:
mvn exec:java -Dexec.mainClass="com.example.LakebaseConnection"
想定される出力:
User: c00f575e-d706-4f6b-b62c-e7a14850571b
Database: databricks_postgres
ユーザーが手順 1 のサービス プリンシパル クライアント ID と一致する場合、OAuth トークンのローテーションが機能しています。
Go
依存関係をインストールします。
go mod init myapp
go get github.com/databricks/databricks-sdk-go
go get github.com/jackc/pgx/v5
実行:
go run main.go
想定される出力:
Connected as: c00f575e-d706-4f6b-b62c-e7a14850571b to database: databricks_postgres
ユーザーが手順 1 のサービス プリンシパル クライアント ID と一致する場合、OAuth トークンのローテーションが機能しています。
メモ: Lakebase Autoscaling が 0 からコンピューティングを開始すると、アイドル後の最初の接続に時間がかかる場合があります。
トラブルシューティング
| エラー | 修正 |
|---|---|
| "ワークスペース アクセス権を持たないユーザーに対して API が無効になっています" | サービス プリンシパルの "ワークスペース アクセス" を有効にします (手順 1)。 |
| "ロールが存在しない" か、認証が失敗する | UI ではなく、SQL (手順 2) を使用して OAuth ロールを作成します。 |
| "接続が拒否されました" または "エンドポイントが見つかりません" |
ENDPOINT_NAME形式projects/<id>/branches/<id>/endpoints/<id>使用します。エンドポイント ID はホスト内にあります。 |
| "無効なユーザー" または "ユーザーが見つかりません" |
PGUSER表示名ではなく、サービス プリンシパル クライアント ID (UUID) に設定します。 |