Condividi tramite


Guida al Lakehouse: preparare e trasformare i dati nel Lakehouse

In questa esercitazione si usano notebook con il runtimeSpark per trasformare e preparare dati non elaborati nel lakehouse.

Prerequisiti

Prima di iniziare, è necessario completare le esercitazioni precedenti in questa serie:

  1. Creare un lakehouse
  2. Inserire i dati nel lakehouse
  3. Assicurarsi che gli schemi lakehouse siano abilitati nel lakehouse.

Preparazione dei dati

Nei passaggi precedenti del tutorial, hai fatto inserire dati non elaborati dall'origine nella sezione File del lakehouse. Ora è possibile trasformare i dati e prepararli per la creazione di tabelle Delta.

  1. Scarica i notebook dalla cartella Codice sorgente dell’esercitazione Lakehouse.

  2. Nel browser, vai all'area di lavoro Fabric nel portale Fabric.

  3. Selezionare Importa>notebook>da questo computer.

    Screenshot che mostra l'opzione import notebook nel portale di Fabric.

  4. Selezionare Carica dal riquadro Stato importazione che si apre sul lato destro della schermata.

  5. Selezionare solo il notebook corrispondente al linguaggio di codifica preferito.

    • PySpark (Prepare and transform data - PySpark.ipynb)
    • Spark SQL (Prepare and transform data - Spark SQL.ipynb)
  6. Selezionare Apri. Viene visualizzata una notifica che indica lo stato dell'importazione nell'angolo superiore destro della finestra del browser.

  7. Al termine dell'importazione, passare alla visualizzazione elementi dell'area di lavoro per verificare il notebook importato.

    Screenshot che mostra l'elenco dei notebook importati e la posizione in cui selezionare il lakehouse.

  8. Seleziona il lakehouse wwilakehouse per aprirlo, in modo che il prossimo notebook che apri sia collegato ad esso.

  9. Nel menu di spostamento in alto selezionare Apri notebook>esistente.

    Screenshot che mostra l'elenco dei notebook importati correttamente.

  10. Selezionare il notebook importato per PySpark o Spark SQL e selezionare Apri. Il notebook è già collegato al Lakehouse aperto, come mostrato in Lakehouse Explorer.

A questo punto è possibile eseguire le celle del notebook che creano e trasformano le tabelle Delta.

Nelle sezioni seguenti eseguire le celle del notebook in sequenza. Per eseguire una cella, selezionare l'icona Esegui visualizzata a sinistra della cella al passaggio del mouse. È anche possibile selezionare Esegui tutto sulla barra multifunzione superiore (Home) per eseguire tutte le celle in sequenza.

Importante

Per questa esercitazione è necessario abilitare gli schemi lakehouse. Se gli schemi non sono abilitati, il codice in questa esercitazione non funzionerà come previsto.

Nel notebook importato vengono visualizzate entrambe le sezioni Path 1 e Path 2 . Per questa esercitazione, usare Il percorso 1 (schemi lakehouse abilitati) e ignorare il percorso 2 (schemi lakehouse non abilitati).

Creare tabelle Delta

In questa sezione, esegui le celle del notebook per creare tabelle Delta dai dati non elaborati.

Le tabelle seguono uno schema star, che è un modello comune per organizzare i dati analitici:

  • Una tabella dei fatti (fact_sale) contiene gli eventi misurabili dell'azienda, in questo caso singole transazioni di vendita con quantità, prezzi e profitto.
  • Le tabelle delle dimensioni (dimension_city, dimension_customer, dimension_datedimension_employee, , ) dimension_stock_itemcontengono gli attributi descrittivi che danno contesto ai fatti, ad esempio dove si è verificata una vendita, chi l'ha fatto e quando.

In questa pagina dell'esercitazione selezionare la scheda corrispondente al notebook importato e continuare a usare la stessa scheda per tutti i passaggi. Le tab sono nell'articolo, non nel notebook.

  1. Cella 1 - Configurazione della sessione Spark. Questa cella abilita due funzionalità di Fabric che ottimizzano la modalità di scrittura e lettura dei dati nelle celle successive. L'ordine V ottimizza il layout del file Parquet per letture più veloci e una migliore compressione. Ottimizzare la scrittura riduce il numero di file scritti e aumenta le dimensioni dei singoli file.

    Esegui questa cella e attendi il completamento prima di passare al passaggio successivo.

    spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
    spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
    spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")
    
  2. Cella 2 - Fatto - Vendita. Questa cella legge i dati parquet non elaborati da Files/wwi-raw-data/full/fact_sale_1y_full, aggiunge le colonne della parte di data (Year, Quarter e Month) e scrive fact_sale come tabella Delta partizionata da Year e Quarter.

    Esegui questa cella e attendi che finisca prima di passare al passaggio successivo.

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/dbo/" + table_name)
    
  3. Cella 3 - Dimensioni. Questa cella legge i cinque set di dati Parquet delle cinque dimensioni e li scrive come tabelle Delta (dimension_city, dimension_customer, dimension_date, dimension_employee, e dimension_stock_item) in Tables/dbo/....

    Esegui questa cella e attendi il completamento prima di passare al passaggio successivo.

    def loadFullDataFromSource(table_name):
       df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name)
       df = df.drop("Photo")
       df.write.mode("overwrite").format("delta").save("Tables/dbo/" + table_name)
    
    full_tables = [
       'dimension_city',
       'dimension_customer',
       'dimension_date',
       'dimension_employee',
       'dimension_stock_item'
    ]
    
    for table in full_tables:
       loadFullDataFromSource(table)
    
  4. Per convalidare le tabelle create, fare clic con il pulsante destro del mouse sul lakehouse wwilakehouse nella finestra di esplorazione e quindi scegliere Aggiorna. Appaiono le tabelle.

    Screenshot che illustra dove trovare le tue tabelle create in Lakehouse Explorer.

Trasformare i dati per le aggregazioni aziendali

In questa sezione si continua nello stesso notebook ed si eseguono le celle successive per creare tabelle di aggregazione dalle tabelle Delta create nella sezione precedente.

  1. Assicurarsi che il notebook sia ancora collegato a wwilakehouse.

  2. Cella 4- Caricare le tabelle di origine per la trasformazione (solo PySpark). Se si usa il notebook PySpark, eseguire questa cella per caricare tabelle Delta in dataframe per i passaggi di aggregazione che seguono.

    Esegui questa cella e attendi il completamento prima di passare al passaggio successivo.

    df_fact_sale = spark.read.format("delta").load("Tables/dbo/fact_sale")
    df_dimension_date = spark.read.format("delta").load("Tables/dbo/dimension_date")
    df_dimension_city = spark.read.format("delta").load("Tables/dbo/dimension_city")
    
  3. Cella 5 - Crea aggregate_sale_by_date_city. Questa cella unisce i dati di vendita, data e città, quindi crea la tabella di aggregazione a livello di città.

    Esegui questa cella e attendi il suo completamento prima di passare al passaggio successivo.

    sale_by_date_city = (
       df_fact_sale.alias("sale")
       .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner")
       .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner")
       .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")
       .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")
       .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")
       .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")
       .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")
       .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")
       .withColumnRenamed("sum(Profit)", "SumOfProfit")
       .orderBy("date.Date", "city.StateProvince", "city.City")
    )
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dbo/aggregate_sale_by_date_city")
    
  4. Cella 6 - Crea aggregate_sale_by_date_employee. Questo campo unisce i dati di vendita, data e dipendente, quindi crea la tabella aggregata per livello di dipendente.

    Esegui questa cella e attendi che finisca prima di passare al passaggio successivo.

    spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
               DD.Date, DD.CalendarMonthLabel
            , DD.Day, DD.ShortMonth Month, CalendarYear Year
            , DE.PreferredName, DE.Employee
            , SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
            , SUM(FS.TaxAmount) SumOfTaxAmount
            , SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
            , SUM(FS.Profit) SumOfProfit
    FROM delta.`Tables/dbo/fact_sale` FS
    INNER JOIN delta.`Tables/dbo/dimension_date` DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN delta.`Tables/dbo/dimension_employee` DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    """)
    
    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dbo/aggregate_sale_by_date_employee")
    
  5. Per convalidare le tabelle create, fare clic con il pulsante destro del mouse sul lakehouse wwilakehouse nella finestra di esplorazione e quindi scegliere Aggiorna. Appaiono le tabelle di aggregazione.

    Screenshot di Lakehouse Explorer che mostra dove vengono visualizzate le nuove tabelle.

Questa guida scrive i dati come file Delta Lake. Fabric individua e registra automaticamente queste tabelle nel metastore, quindi non è necessario eseguire istruzioni separate CREATE TABLE .

Passaggio successivo