次の方法で共有


Azure Functions用 RedisStreamTrigger

RedisStreamTrigger はストリームから新規エントリを読み取り、それらの要素を関数に提示します。

関数トリガーの可用性のスコープ

トリガーの種類 Managed Redis のAzure Azure Cache for Redis
ストリーム イエス イエス

重要

Azure Managed Redis または Azure Cache for Redis の Enterprise レベルを使用する場合は、ポート 6380 または 6379 ではなくポート 10000 を使用します。

重要

Redis トリガーは、従量課金プランまたは Flex 従量課金プランで実行されている関数では現在サポートされていません。

重要

Functions の Node.js v4 モデルは、Azure Cache for Redis拡張機能ではまだサポートされていません。 v4 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。 v3 と v4 の違いの詳細については、移行ガイドを参照してください。

重要

Functions の Python v2 モデルは、Azure Cache for Redis拡張機能ではまだサポートされていません。 v2 モデルの動作の詳細については、Azure Functions Python 開発者ガイドを参照してください。

重要

.NET関数の場合は、isolated worker モデルをin-process モデルよりも使用することをお勧めします。 in-process モデルと isolated worker モデルの比較については、 Azure Functionsの.NETのisolated worker モデルと in-process モデルの違いを参照してください。

実行モデル 説明
分離ワーカー モデル 関数コードは、別の.NETワーカー プロセスで実行されます。 .NET と .NET Framework> のサポートされているバージョン<使用します。 詳細については、分離ワーカー モデルで C# Azure Functionsを実行するための Guide を参照してください。
インプロセス モデル 関数コードは、Functions ホスト プロセスと同じプロセスで実行されます。 .NET の Long Term Support (LTS) バージョンのみをサポートします。 詳細については、Azure Functions を使用した Develop C# クラス ライブラリ関数に関するページを参照してください。
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Functions.Worker.Extensions.Redis.Samples.RedisStreamTrigger
{
    internal class SimpleStreamTrigger
    {
        private readonly ILogger<SimpleStreamTrigger> logger;

        public SimpleStreamTrigger(ILogger<SimpleStreamTrigger> logger)
        {
            this.logger = logger;
        }

        [Function(nameof(SimpleStreamTrigger))]
        public void Run(
            [RedisStreamTrigger(Common.connectionStringSetting, "streamKey")] string entry)
        {
            logger.LogInformation(entry);
        }
    }
}

package com.function.RedisStreamTrigger;

import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.redis.annotation.*;

public class SimpleStreamTrigger {
    @FunctionName("SimpleStreamTrigger")
    public void run(
            @RedisStreamTrigger(
                name = "req",
                connection = "redisConnectionString",
                key = "streamTest",
                pollingIntervalInMs = 1000,
                maxBatchSize = 1)
                String message,
            final ExecutionContext context) {
            context.getLogger().info(message);
    }
}

このサンプルでは、同じ index.js ファイルと、function.json ファイル内のバインディング データを使用します。

index.js ファイルは次のとおりです。

module.exports = async function (context, entry) {
    context.log(entry);
}

function.json のバインディング データは次のとおりです。

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "index.js"
}

このサンプルでは、同じ run.ps1 ファイルと、function.json ファイル内のバインディング データを使用します。

run.ps1 ファイルは次のとおりです。

param($entry, $TriggerMetadata)
Write-Host ($entry | ConvertTo-Json)

function.json のバインディング データは次のとおりです。

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "run.ps1"
}

Python v1 プログラミング モデルでは、関数フォルダー内の別の function.json ファイルにバインドを定義する必要があります。 詳細については、Python 開発者ガイドを参照してください。

このサンプルでは、同じ __init__.py ファイルと、function.json ファイル内のバインディング データを使用します。

__init__.py ファイルは次のとおりです。

import logging

def main(entry: str):
    logging.info(entry)

function.json のバインディング データは次のとおりです。

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "__init__.py"
}

属性

パラメーター 説明 必要 既定値
Connection キャッシュ connection stringを含むアプリケーション設定の名前。例: <cacheName>.redis.cache.windows.net:6380,password... イエス
Key 読み取り元のキー。 イエス
PollingIntervalInMs Redis サーバーをポーリングする頻度 (ミリ秒単位)。 省略可能 1000
MessagesPerWorker 各関数のワーカーが処理する必要のあるメッセージの数。 関数がスケーリングする必要のあるワーカーの数を決めるために使われます。 省略可能 100
Count Redis から一度にプルする要素の数。 省略可能 10
DeleteAfterProcess 関数が処理後にストリーム エントリを削除するかどうかを示します。 省略可能 false

注釈

パラメーター 説明 必要 既定値
name entry イエス
connection キャッシュ connection stringを含むアプリケーション設定の名前。例: <cacheName>.redis.cache.windows.net:6380,password... イエス
key 読み取り元のキー。 イエス
pollingIntervalInMs Redis をポーリングする頻度 (ミリ秒単位)。 省略可能 1000
messagesPerWorker 各関数のワーカーが処理する必要のあるメッセージの数。 関数をスケーリングするワーカーの数を決定するために使用されます。 省略可能 100
count Redis から一度に読み取るエントリの数。 エントリは並列で処理されます。 省略可能 10
deleteAfterProcess 関数の実行後にストリーム エントリを削除するかどうか。 省略可能 false

構成

次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。

function.json のプロパティ 説明 必要 既定値
type イエス
deleteAfterProcess 省略可能 false
connection キャッシュ connection stringを含むアプリケーション設定の名前。例: <cacheName>.redis.cache.windows.net:6380,password... イエス
key 読み取り元のキー。 イエス
pollingIntervalInMs Redis をポーリングする頻度 (ミリ秒単位)。 省略可能 1000
messagesPerWorker (省略可能) 各関数のワーカーが処理する必要のあるメッセージの数。 関数がスケーリングする必要のあるワーカーの数を決めるために使用されます。 省略可能 100
count Redis から一度に読み取るエントリの数。 これらは並列で処理されます。 省略可能 10
name イエス
direction イエス

完全な例については、セクションの例を参照してください。

使用方法

RedisStreamTrigger Azure 関数は、ストリームから新しいエントリを読み取り、それらのエントリを関数に表示します。

トリガーは、構成可能な固定間隔で Redis をポーリングし、XREADGROUP を使ってストリームから要素を読み取ります。

関数のすべてのインスタンスのコンシューマー グループは、関数の名前(StreamTrigger サンプルSimpleStreamTriggerです。

各関数インスタンスは、 WEBSITE_INSTANCE_ID を使用するか、グループ内のコンシューマー名として使用するランダム GUID を生成して、関数のスケールアウトされたインスタンスがストリームから同じメッセージを読み取らないようにします。

タイプ 説明
byte[] チャネルからのメッセージ。
string チャネルからのメッセージ。
Custom トリガーは Json.NET シリアル化を使用して、チャネルからのメッセージを string からカスタム型にマップします。