次の方法で共有


時系列予測モデルをトレーニングし評価する

このノートブックでは、季節的なサイクルを含む時系列データを予測するプログラムを作成します。 NYC Property Sales データセットを使用し、2003年から2015年までのデータが収録された、ニューヨーク市財務局が公開したこのデータセットをNYC Open Data Portalで利用します。

前提条件

ノートブックで作業を進める

ノートブックを次の2つの方法で使用できます。

  • 組み込みのノートブックを開いて実行します。
  • GitHubからノートブックをアップロードします。

ビルトインのノートブックを開きます

このチュートリアルには、サンプルの時系列ノートブックが付属しています。

  1. このチュートリアルのサンプルノートブックを開くには、「データサイエンス用にシステムを準備する」という指示に従ってください

  2. コードの実行を開始する前に、必ずレイクハウスをノートブックにアタッチしてください。

GitHubからノートブックをインポートする

AIsample - Time Series Forecasting.ipynb はこのチュートリアルに付属するノートブックです。

手順 1: カスタム ライブラリをインストールする

machine learning モデルを開発したり、アドホック データ分析を処理したりする場合は、Apache Spark セッション用のカスタム ライブラリ (このノートブックに prophet など) をすばやくインストールすることが必要になる場合があります。 このタスクを行うには、2つの選択肢があります。

  1. インラインインストール機能 (%pip%conda など) を使用して、新しいライブラリをすばやく始めることができます。 このメソッドは、ワークスペースではなく、現在のノートブックにのみカスタム ライブラリをインストールします。
# Use pip to install libraries
%pip install <library name>

# Use conda to install libraries
%conda install <library name>
  1. または、Fabric 環境の作成、パブリック ソースからのライブラリのインストール、またはカスタム ライブラリのアップロードを行います。 ワークスペース管理者は、ワークスペースの既定値として環境をアタッチできます。 環境内のすべてのライブラリは、ワークスペース内のすべてのノートブックと Spark ジョブ定義で使用できるようになります。 環境の詳細については、「 Microsoft Fabric で環境を作成、構成、および使用する方法を参照してください。

このノートブックでは、 %pip install を使用して prophet ライブラリをインストールします。 PySpark カーネルは、 %pip install後に再起動します。 この操作は、他のセルを実行する前にライブラリをインストールする必要があることを意味します。

# Use pip to install Prophet
%pip install prophet

手順 2: データを読み込む

データセット

このノートブックでは、NYC Property Sales データのデータセットを使用します。 2003 年から 2015 年までのデータが取り上げNYC オープン データ ポータルで NYC Finance部門によって公開されています。

このデータセットには、13 年間ニューヨーク市の不動産市場で販売されたすべての建物の販売記録が含まれています。 データセット内の列の定義については、「 Property Sales Files の用語集を参照してください。

近隣 建物クラスカテゴリー 税区分 ブロック 多く イーストメント 現在の建物クラス 住所 アパート番号 zip_code 住宅ユニット 商業単位 総単位数 土地平方フィート 総平方フィート 建設年 販売時の税区分 販売時の建物クラス 販売価格 販売日
Manhattan アルファベット市区町村 07 バケーションレンタル - ウォークアップ アパートメンツ 0.0 384.0 17.0 C4 225 イースト 2ND ストリート 10009.0 10.0 0.0 10.0 2145.0 6670.0 1900.0 2.0 C4 275000.0 2007-06-19
Manhattan アルファベット市区町村 07 バケーションレンタル - ウォークアップ アパートメンツ 2.0 405.0 12.0 C7 508 EAST 12TH STREET 10009.0 28.0 2.0 30.0 3872.0 15428.0 1930.0 2.0 C7 7794005.0 2007-05-21

目標は、履歴データに基づいて月間総売上を予測するモデルを構築することです。 このためには、Facebook によって開発されたopen source予測ライブラリである Prophet を使用します。 Prophet は加法モデルに基づいており、非線形傾向は毎日、毎週、毎年の季節性、休日の影響に適合しています。 Prophet は、強い季節的影響や複数の季節の履歴データを持つ時系列データセットに最適です。 さらに、Prophet は不足しているデータとデータの外れ値を堅牢に処理します。

Prophet は、次の 3 つのコンポーネントで構成される分解可能な時系列モデルを使用します。

  • トレンド: Prophet は自動変化点を選択して、区分ごとの一定の成長率を推測します。
  • 季節性: 既定では、Prophet はフーリエ級数を使用して週単位と年単位の季節性をモデル化します。
  • 祝日: Prophetは過去と将来のすべての祝日を必要とします。 将来繰り返されない休日の場合、Prophet の予測には含まれません。

このノートブックは月単位でデータを集計するため、休日は無視されます。

Prophet のモデリング手法の詳細については、公式の論文を参照してください。

データセットをダウンロードし、レイクハウスにアップロードする

データ ソースは 15 個の .csv ファイルで構成されます。 これらのファイルには、ニューヨークにある 5 つの自治区における 2003 年から 2015 年までの不動産販売記録が含まれています。 便宜上、nyc_property_sales.tar ファイルにこれらの .csv ファイルをすべて保持し、1 つのファイルに圧縮されています。 ブロブストレージはこの.tarファイルをホストしており、公開されています。

ヒント

このコード セルに示されているパラメーターを使用すると、このノートブックをさまざまなデータセットに簡単に適用できます。

URL = "https://synapseaisolutionsa.z13.web.core.windows.net/data/NYC_Property_Sales_Dataset/"
TAR_FILE_NAME = "nyc_property_sales.tar"
DATA_FOLDER = "Files/NYC_Property_Sales_Dataset"
TAR_FILE_PATH = f"/lakehouse/default/{DATA_FOLDER}/tar/"
CSV_FILE_PATH = f"/lakehouse/default/{DATA_FOLDER}/csv/"

EXPERIMENT_NAME = "aisample-timeseries" # MLflow experiment name

このコードは、一般公開されているバージョンのデータセットをダウンロードし、Fabric Lakehouse に格納します。

重要

ノートブックを実行する前に、必ずレイクハウスをノートブックに追加してください。 そうしないと、エラーが発生します。

import os

if not os.path.exists("/lakehouse/default"):
    # Add a lakehouse if the notebook has no default lakehouse
    # A new notebook will not link to any lakehouse by default
    raise FileNotFoundError(
        "Default lakehouse not found, please add a lakehouse for the notebook."
    )
else:
    # Verify whether or not the required files are already in the lakehouse, and if not, download and unzip
    if not os.path.exists(f"{TAR_FILE_PATH}{TAR_FILE_NAME}"):
        os.makedirs(TAR_FILE_PATH, exist_ok=True)
        os.system(f"wget {URL}{TAR_FILE_NAME} -O {TAR_FILE_PATH}{TAR_FILE_NAME}")

    os.makedirs(CSV_FILE_PATH, exist_ok=True)
    os.system(f"tar -zxvf {TAR_FILE_PATH}{TAR_FILE_NAME} -C {CSV_FILE_PATH}")

このノートブックの実行時間の記録を開始します。

# Record the notebook running time
import time

ts = time.time()

MLflow 実験追跡を設定する

MLflow ログ機能を拡張するために、自動ログは、トレーニング中にmachine learning モデルの入力パラメーターと出力メトリックの値を自動的にキャプチャします。 その後、この情報はワークスペースに記録されます。ここで、MLflow API またはワークスペース内の対応する実験にアクセスして視覚化することができます。 自動ログ記録の詳細については、「Microsoft Fabricを参照してください。

# Set up the MLflow experiment
import mlflow

mlflow.set_experiment(EXPERIMENT_NAME)
mlflow.autolog(disable=True)  # Disable MLflow autologging

注記

ノートブック セッションMicrosoft Fabric自動ログ記録を無効にするには、mlflow.autolog() を呼び出し、disable=Trueを設定します。

レイクハウスから日付の未加工データを読み取る

df = (
    spark.read.format("csv")
    .option("header", "true")
    .load("Files/NYC_Property_Sales_Dataset/csv")
)

手順 3: 探索的データ分析の開始

データセットを確認するには、データのサブセットを手動で調べて理解を深める必要があります。 display関数を使用して DataFrame を出力します。 また、チャート ビューを表示すると、データセットのサブセットを簡単に視覚化できます。

display(df)

データセットを手動で確認することで、いくつかの初期の知見が得られます。

  • 0.00 ドルの販売価格のインスタンス。 用語集によると、この値は現金を考慮しない所有権の譲渡を意味します。 つまり、トランザクションにキャッシュ フローはありません。 $0.00 sales_price 値の売上をデータセットから削除します。

  • データセットには、さまざまなビルド クラスが含まれます。 ただし、このノートブックは、用語集>に従って"A" とマークされている住宅用建物に焦点を当てます。 データセットをフィルター処理して、住宅の建物のみを含めます。 これを行うためには、building_class_at_time_of_sale 列または building_class_at_present 列のいずれかを含めます。 building_class_at_time_of_sale データのみを含めます。

  • データセットには、total_units 値が 0、または gross_square_feet 値が 0 のインスタンスが含まれます。 total_units値またはgross_square_units値が 0 であるすべてのインスタンスを削除します。

  • 一部の列 ( apartment_numbertax_classbuild_class_at_presentなど) には、欠損値または NULL 値があります。 不足しているデータに事務的なエラーまたは存在しないデータが含まれているとします。 分析はこれらの欠損値に依存しないため、無視できます。

  • sale_price 列は文字列として格納され、先頭に "$" 文字が付加されます。 分析を続行するために、この列を数値として表します。 sale_price列を整数としてキャストします。

型変換とフィルター処理

特定された問題の一部を解決するには、必要なライブラリをインポートします。

# Import libraries
import pyspark.sql.functions as F
from pyspark.sql.types import *

売上データを文字列から整数にキャストする

正規表現を使用して、文字列の数値部分をドル記号から区切り (たとえば、文字列 $300,000$300,000 を分割)、数値部分を整数としてキャストします。

次に、次の条件をすべて満たすインスタンスのみを含むようにデータをフィルター処理します。

  1. sales_priceが 0 より大きい。
  2. total_unitsが 0 より大きい。
  3. gross_square_feetが 0 より大きい。
  4. building_class_at_time_of_saleは A 型です。
df = df.withColumn(
    "sale_price", F.regexp_replace("sale_price", "[$,]", "").cast(IntegerType())
)
df = df.select("*").where(
    'sale_price > 0 and total_units > 0 and gross_square_feet > 0 and building_class_at_time_of_sale like "A%"'
)

月単位での集計

データ リソースはプロパティの売上を日ごとに追跡しますが、この方法では、このノートブックでは細かすぎます。 代わりに、月単位でデータを集計します。

まず、月と年のデータのみを表示するように日付値を変更します。 日付の値には、年データが含まれます。 2005 年 12 月と 2006 年 12 月など、引き続き区別できます。

さらに、分析に関連する列のみを保持します。 これらの列には、 sales_pricetotal_unitsgross_square_feet、および sales_dateが含まれます。 また、sales_datemonth にリネームする必要があります。

monthly_sale_df = df.select(
    "sale_price",
    "total_units",
    "gross_square_feet",
    F.date_format("sale_date", "yyyy-MM").alias("month"),
)
display(monthly_sale_df)

sale_pricetotal_unitsgross_square_feetの値を月別に集計します。 次に、データを month でグループ化し、各グループ内のすべての値を合計します。

summary_df = (
    monthly_sale_df.groupBy("month")
    .agg(
        F.sum("sale_price").alias("total_sales"),
        F.sum("total_units").alias("units"),
        F.sum("gross_square_feet").alias("square_feet"),
    )
    .orderBy("month")
)

display(summary_df)

Pyspark から Pandas への換算

Pyspark DataFrames は、大規模なデータセットを適切に処理します。 ただし、データ集計のため、DataFrame のサイズは小さくなります。 この変更により、pandas DataFrames を使用できるようになりました。

このコードは、pyspark DataFrame から pandas DataFrame にデータセットをキャストします。

import pandas as pd

df_pandas = summary_df.toPandas()
display(df_pandas)

ビジュアル化

ニューヨーク市の不動産取引の傾向を調べて、データをより深く理解することができます。 この調査は、潜在的なパターンと季節性の傾向に関する洞察につながります。 Microsoft Fabricのデータ視覚化の詳細については、「Notebook の視覚化 リソース」を参照してください。

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

f, (ax1, ax2) = plt.subplots(2, 1, figsize=(35, 10))
plt.sca(ax1)
plt.xticks(np.arange(0, 15 * 12, step=12))
plt.ticklabel_format(style="plain", axis="y")
sns.lineplot(x="month", y="total_sales", data=df_pandas)
plt.ylabel("Total Sales")
plt.xlabel("Time")
plt.title("Total Property Sales by Month")

plt.sca(ax2)
plt.xticks(np.arange(0, 15 * 12, step=12))
plt.ticklabel_format(style="plain", axis="y")
sns.lineplot(x="month", y="square_feet", data=df_pandas)
plt.ylabel("Total Square Feet")
plt.xlabel("Time")
plt.title("Total Property Square Feet Sold by Month")
plt.show()

探索的データ分析からの観測の概要

  • データは、年単位の周期で明確な定期的なパターンを示しています。つまり、データの 季節性は年単位です。
  • 夏の月は、冬の月に比べて販売量が多いようです。
  • 売上が高い年と売上の低い年を比較すると、高い販売月と高い売上年の低い販売月の収益差が、高い売上月と低販売年の低い販売月の収益差を絶対に超えていることがわかります。

たとえば、2004 年の売上が最も高い月と最も低い販売月の収益差は次のようになります。

$900,000,000 - $500,000,000 = $400,000,000

2011 年の場合、その収益差異の計算は次のようになります。

$400,000,000 - $300,000,000 = $100,000,000

この観測は後で重要になります。 多重度加法 の季節性の影響を決定する必要がある場合です。

手順 4: モデルのトレーニングと追跡

モデル フィッティング

Prophet は常に 2 列の DataFrame を入力として受け取ります。 1 つの入力列は ds という名前の時刻列で、もう 1 つの入力列は y という名前の値列です。 時間列には、日付、時刻、または datetime データ形式 (たとえば YYYY_MM) を指定する必要があります。 このデータセットは、その条件を満たしています。 値列は数値データ形式である必要があります。

モデルフィッティングの場合は、時間列の名前をds に変更し、値列の名前をy に変更します。 次に、データを預言者に渡します。 詳細については、Prophet Python API のドキュメントを参照してください。

df_pandas["ds"] = pd.to_datetime(df_pandas["month"])
df_pandas["y"] = df_pandas["total_sales"]

Prophetは、scikit-learn 規約に従います。 まず、Prophet の新しいインスタンスを作成し、特定のパラメーター (たとえば seasonality_mode) を設定し、そのインスタンスをデータセットに合わせて調整します。

  • 一定の加法係数は、Prophet でのデフォルトの季節性効果ですが、季節性効果パラメーターには "乗法" 季節性 を使用します。 前のセクションの分析では、季節性の振幅の変化により、単純な加法季節性がデータにまったく適合しないことを示しました。

  • データは月単位で集計されるため、 weekly_seasonality パラメーターを off に設定します。 これにより、週単位のデータは使用できなくなります。

  • Markov Chain Monte Carlo (MCMC) メソッドを使用して、季節性の不確実性の推定をキャプチャします。 既定では、Prophet から傾向と観測ノイズに関する不確実性の推定を得られますが、季節性に関しては得られません。 MCMC はより多くの処理時間を必要としますが、これにより、アルゴリズムから季節性と傾向と観測ノイズに関する不確実性の推定を得られます。 詳細については、Prophet Uncertainty Intervals のドキュメントを参照してください。

  • changepoint_prior_scale パラメーターを使用して、自動変更ポイント検出の感度を調整します。 Prophet アルゴリズムは、軌道が突然変化するデータ内のインスタンスを自動的に見つけようとします。 正しい値を見つけるのが難しくなる可能性があります。 この問題を解決するには、さまざまな値を試してから、最適なパフォーマンスのモデルを選択します。 詳細については、Prophet Trend Changepoints のドキュメントを参照してください。

from prophet import Prophet

def fit_model(dataframe, seasonality_mode, weekly_seasonality, chpt_prior, mcmc_samples):
    m = Prophet(
        seasonality_mode=seasonality_mode,
        weekly_seasonality=weekly_seasonality,
        changepoint_prior_scale=chpt_prior,
        mcmc_samples=mcmc_samples,
    )
    m.fit(dataframe)
    return m

クロス検証

Prophet には、クロス検証ツールが組み込まれています。 このツールでは、予測エラーを見積もり、最適なパフォーマンスでモデルを見つけることができます。

クロス検証手法では、モデルの効率を検証できます。 この手法では、データセットのサブセットに対してモデルをトレーニングし、以前は見えないデータセットのサブセットに対してテストを実行します。 この手法では、統計モデルが独立したデータセットにどの程度一般化されるかをチェックできます。

クロス検証の場合は、トレーニング データセットに含まれていないデータセットの特定のサンプルを予約します。 次に、デプロイの前に、そのサンプルでトレーニング済みのモデルをテストします。 ただし、この方法は時系列データでは機能しません。 モデルに 2005 年 1 月と 2005 年 3 月のデータが表示され、2005 年 2 月の予測を試みると、データ傾向がどこに潜在しているかが分かるため、モデルは基本的に チート を行うことができます。 実際の用途では、未知の地域として将来を予測することが目的です。

この問題に対処し、テストを信頼できるようにするには、日付に基づいてデータセットを分割します。 トレーニングには、特定の日付までのデータセット (たとえば、最初の 11 年間のデータ) を使用してから、残りの未知のデータを予測に使用します。

このシナリオでは、11 年間のトレーニング データから始めて、1 年間の期間を使用して毎月の予測を行います。 具体的には、トレーニング データには、2003 年から 2013 年までのすべてが含まれています。 最初の実行では、2014 年 1 月から 2015 年 1 月までの予測が処理します。 次の実行では、2014 年 2 月から 2015 年 2 月までの予測などを処理します。

トレーニング済みの 3 つのモデルごとにこのプロセスを繰り返して、最適なパフォーマンスを発揮するモデルを確認します。 次に、これらの予測を実際の値と比較して、最適なモデルの予測品質を確立します。

from prophet.diagnostics import cross_validation
from prophet.diagnostics import performance_metrics

def evaluation(m):
    df_cv = cross_validation(m, initial="4017 days", period="30 days", horizon="365 days")
    df_p = performance_metrics(df_cv, monthly=True)
    future = m.make_future_dataframe(periods=12, freq="M")
    forecast = m.predict(future)
    return df_p, future, forecast

MLflow を使用してモデルをログする

モデルをログに記録してパラメーターを追跡し、後で使用できるようにモデルを保存します。 関連するすべてのモデル情報は、ワークスペースで実験名に記録されます。 モデル、パラメーター、メトリック、および MLflow 自動ログ記録項目は、1 回の MLflow 実行に保存されます。

# Setup MLflow
from mlflow.models.signature import infer_signature

実験を実施する

machine learning実験は、関連するすべてのmachine learning実行の組織と制御の主要な単位として機能します。 "実行" はモデル コードの 1 回の実行に対応します。 Machine learning実験追跡とは、すべての異なる実験とそのコンポーネントの管理を指します。 この管理には、パラメーター、メトリック、モデル、およびその他のartifactsが含まれます。 これは、特定のmachine learning実験の必要なコンポーネントを整理するのに役立ちます。 Machine learning実験追跡を使用すると、保存された実験を使用して過去の結果を簡単に複製することもできます。 詳細については、Microsoft Fabric の machine learning 実験を参照してください。 含める手順 (このノートブックでの Prophet モデルのフィッティングと評価など) を決定したら、実験を実行できます。

model_name = f"{EXPERIMENT_NAME}-prophet"

models = []
df_metrics = []
forecasts = []
seasonality_mode = "multiplicative"
weekly_seasonality = False
changepoint_priors = [0.01, 0.05, 0.1]
mcmc_samples = 100

for chpt_prior in changepoint_priors:
    with mlflow.start_run(run_name=f"prophet_changepoint_{chpt_prior}"):
        # init model and fit
        m = fit_model(df_pandas, seasonality_mode, weekly_seasonality, chpt_prior, mcmc_samples)
        models.append(m)
        # Validation
        df_p, future, forecast = evaluation(m)
        df_metrics.append(df_p)
        forecasts.append(forecast)
        # Log model and parameters with MLflow
        mlflow.prophet.log_model(
            m,
            model_name,
            registered_model_name=model_name,
            signature=infer_signature(future, forecast),
        )
        mlflow.log_params(
            {
                "seasonality_mode": seasonality_mode,
                "mcmc_samples": mcmc_samples,
                "weekly_seasonality": weekly_seasonality,
                "changepoint_prior": chpt_prior,
            }
        )
        metrics = df_p.mean().to_dict()
        metrics.pop("horizon")
        mlflow.log_metrics(metrics)

プロパティ パネルのスクリーンショット。

Prophet を使用してモデルを視覚化する

Prophet には、モデルの適合結果を示す視覚化関数が組み込まれています。

黒い点は、モデルをトレーニングするデータ ポイントを表します。 青い線は予測を表し、水色の領域は不確実性区間を示します。 changepoint_prior_scale値が異なる 3 つのモデルを構築しました。 この 3 つのモデルの予測は、このコード ブロックの結果に表示されます。

for idx, pack in enumerate(zip(models, forecasts)):
    m, forecast = pack
    fig = m.plot(forecast)
    fig.suptitle(f"changepoint = {changepoint_priors[idx]}")

最初のグラフにおけるchangepoint_prior_scale値が最も小さいと、トレンドの変化に対して適切にフィットせずにアンダーフィッティングが発生します。 3 番目のグラフの最大 changepoint_prior_scale がオーバーフィットの原因となる可能性があります。 そのため、2 番目のグラフが最適な選択肢です。 この結果は、2 番目のモデルが最適であることを意味します。

Prophet では、基になる傾向と季節性を簡単に視覚化することもできます。 このコード ブロックの結果には、2 番目のモデルの視覚化が表示されます。

BEST_MODEL_INDEX = 1  # Set the best model index according to the previous results
fig2 = models[BEST_MODEL_INDEX].plot_components(forecast)

価格データの年単位の傾向グラフのスクリーンショット。

これらのグラフでは、薄い青色のシェーディングが不確定性を反映しています。 上のグラフは、強い、長い期間の揺れる傾向を示しています。 数年周期で、販売量は増加と減少を繰り返しています。 下のグラフは、売上が 2 月と 9 月にピークに達し、その月の年の最大値に達する傾向があることを示しています。 これらの月の直後である 3 月と 10 月は、年の最小値に分類されます。

次に例を示すさまざまなメトリックを使用して、モデルのパフォーマンスを評価します。

  • 平均二乗誤差 (MSE)
  • 二乗平均平方根誤差 (RMSE)
  • 平均絶対誤差 (MAE)
  • 平均絶対パーセント誤差 (MAPE)
  • 絶対パーセント誤差の中央値 (MDAPE)
  • 対称平均絶対誤差率 (sMAPE)

yhat_loweryhat_upperの見積もりを使用してカバレッジを評価します。 将来 1 年を 12 回予測する期間の違いに注意してください。

display(df_metrics[BEST_MODEL_INDEX])

この予測モデルに MAPE メトリックを使用すると、一般的に 1 か月後に拡張される予測には、約 8%のエラーが含まれます。 ただし、1 年後の予測では、誤差は約 10% に増加します。

手順 5: モデルをスコア付けし、予測結果を保存する

モデルをスコア付けし、予測結果を保存します。

Predict Transformer を使用して予測を行う

モデルを読み込み、それを使用して予測を行います。 machine learning モデルを運用化するには、PREDICT を使用します。これは、任意のコンピューティング エンジンでのバッチ スコアリングをサポートするスケーラブルなMicrosoft Fabric関数です。 PREDICTの詳細と、Microsoft Fabric内での使用方法については、このリソースを参照してください。

from synapse.ml.predict import MLFlowTransformer

spark.conf.set("spark.synapse.ml.predict.enabled", "true")

model = MLFlowTransformer(
    inputCols=future.columns.values,
    outputCol="prediction",
    modelName=f"{EXPERIMENT_NAME}-prophet",
    modelVersion=BEST_MODEL_INDEX,
)

test_spark = spark.createDataFrame(data=future, schema=future.columns.to_list())

batch_predictions = model.transform(test_spark)

display(batch_predictions)
# Code for saving predictions into lakehouse
batch_predictions.write.format("delta").mode("overwrite").save(
    f"{DATA_FOLDER}/predictions/batch_predictions"
)
# Determine the entire runtime
print(f"Full run cost {int(time.time() - ts)} seconds.")