Compartir a través de


Inferencia de ONNX en Spark

En este ejemplo, entrenará un modelo LightGBM y convertirá ese modelo al formato ONNX . Una vez convertido, use el modelo para deducir algunos datos de prueba en Spark.

En este ejemplo se usan estos paquetes y versiones de Python:

  • onnxmltools==1.7.0
  • lightgbm==3.2.1

Prerrequisitos

  • Adjunte su cuaderno a un almacén de lago de datos. En el lado izquierdo, seleccione Añadir para añadir un almacén de lago de datos existente o crear uno.
  • Es posible que tenga que instalar onnxmltools. Para ello, agregue !pip install onnxmltools==1.7.0 en una celda de código de un cuaderno y, a continuación, ejecute esa celda.
  • Es posible que tenga que instalar lightgbm. Para ello, agregue !pip install lightgbm==3.2.1 en una celda de código del cuaderno y luego ejecute esa celda.

Carga de los datos de ejemplo

Para cargar los datos de ejemplo, agregue estos ejemplos de código a las celdas del cuaderno y, a continuación, ejecute esas celdas:

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)

display(df)

La salida debe ser similar a la tabla siguiente. Las columnas específicas que se muestran, el número de filas y los valores reales de la tabla pueden diferir:

Ratio de Cobertura de Intereses Indicador de ingresos netos Equidad en responsabilidad
0.5641 1,0 0.0165
0.5702 1,0 0.0208
0.5673 1,0 0.0165

Uso de LightGBM para entrenar un modelo

from pyspark.ml.feature import VectorAssembler
from synapse.ml.lightgbm import LightGBMClassifier

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")

train_data = featurizer.transform(df)["Bankrupt?", "features"]

model = (
    LightGBMClassifier(featuresCol="features", labelCol="Bankrupt?", dataTransferMode="bulk")
    .setEarlyStoppingRound(300)
    .setLambdaL1(0.5)
    .setNumIterations(1000)
    .setNumThreads(-1)
    .setMaxDeltaStep(0.5)
    .setNumLeaves(31)
    .setMaxDepth(-1)
    .setBaggingFraction(0.7)
    .setFeatureFraction(0.7)
    .setBaggingFreq(2)
    .setObjective("binary")
    .setIsUnbalance(True)
    .setMinSumHessianInLeaf(20)
    .setMinGainToSplit(0.01)
)

model = model.fit(train_data)

Convertir el modelo en formato ONNX

El código siguiente exporta el modelo entrenado a un refuerzo LightGBM y, a continuación, convierte el modelo en el formato ONNX:

import lightgbm as lgb
from lightgbm import Booster, LGBMClassifier


def convertModel(lgbm_model: LGBMClassifier or Booster, input_size: int) -> bytes:
    from onnxmltools.convert import convert_lightgbm
    from onnxconverter_common.data_types import FloatTensorType

    initial_types = [("input", FloatTensorType([-1, input_size]))]
    onnx_model = convert_lightgbm(
        lgbm_model, initial_types=initial_types, target_opset=9
    )
    return onnx_model.SerializeToString()


booster_model_str = model.getLightGBMBooster().modelStr().get()
booster = lgb.Booster(model_str=booster_model_str)
model_payload_ml = convertModel(booster, len(feature_cols))

Después de la conversión, cargue la carga de ONNX en un ONNXModel y inspeccione las entradas y salidas del modelo:

from synapse.ml.onnx import ONNXModel

onnx_ml = ONNXModel().setModelPayload(model_payload_ml)

print("Model inputs:" + str(onnx_ml.getModelInputs()))
print("Model outputs:" + str(onnx_ml.getModelOutputs()))

Asigne la entrada del modelo al nombre de columna (FeedDict) del dataframe de entrada, y asigne los nombres de las columnas del dataframe de salida a las salidas del modelo (FetchDict).

onnx_ml = (
    onnx_ml.setDeviceType("CPU")
    .setFeedDict({"input": "features"})
    .setFetchDict({"probability": "probabilities", "prediction": "label"})
    .setMiniBatchSize(5000)
)

Uso del modelo para la inferencia

Para realizar inferencias con el modelo, el código siguiente crea datos de prueba y transforma los datos a través del modelo ONNX:

from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np

n = 1000 * 1000
m = 95
test = np.random.rand(n, m)
testPdf = pd.DataFrame(test)
cols = list(map(str, testPdf.columns))
testDf = spark.createDataFrame(testPdf)
testDf = testDf.union(testDf).repartition(200)
testDf = (
    VectorAssembler()
    .setInputCols(cols)
    .setOutputCol("features")
    .transform(testDf)
    .drop(*cols)
    .cache()
)

display(onnx_ml.transform(testDf))

La salida debe ser similar a la tabla siguiente, aunque los valores y el número de filas pueden diferir:

Índice Características Predicción Probabilidad
1 "{"type":1,"values":[0.105... 0 "{"0":0.835...
2 "{"type":1,"values":[0.814... 0 "{"0":0.658...