Freigeben über


Lakehouse-Tutorial: Aufbereiten und Transformieren von Daten im Lakehouse

In diesem Tutorial verwenden Sie Notebooks mit der Spark-Runtime, um Rohdaten in Ihrem Lakehouse zu transformieren und aufzubereiten.

Voraussetzungen

Bevor Sie beginnen, müssen Sie die vorherigen Lernprogramme in dieser Reihe abschließen:

  1. Erstellen eines Lakehouse
  2. Daten in das Lakehouse einlesen
  3. Stellen Sie sicher, dass in Ihrem Lakehouse die Lakehouse-Schemas aktiviert sind.

Vorbereiten von Daten

Aus den vorherigen Tutorial-Schritten haben Sie Rohdaten aus der Quelle in den Abschnitt "Dateien" des Lakehouse importiert. Jetzt können Sie diese Daten transformieren und für die Erstellung von Deltatabellen aufbereiten.

  1. Laden Sie die Notebooks aus dem Ordner Lakehouse Tutorial Source Code herunter.

  2. Wechseln Sie in Ihrem Browser zum Fabric-Arbeitsbereich im Fabric-Portal.

  3. Wählen Sie "Notizbuch>von diesem Computer>" aus.

    Screenshot der Option

  4. Wählen Sie im Bereich Importstatus, der auf der rechten Seite des Bildschirms geöffnet wird, die Option Hochladen aus.

  5. Wählen Sie nur das Notizbuch aus, das Ihrer bevorzugten Codierungssprache entspricht.

    • PySpark (Prepare and transform data - PySpark.ipynb)
    • Spark SQL (Prepare and transform data - Spark SQL.ipynb)
  6. Klicken Sie auf Öffnen. In der oberen rechten Ecke des Browserfensters wird eine Benachrichtigung zum Importstatus angezeigt.

  7. Nachdem der Import erfolgreich war, wechseln Sie zur Elementansicht des Arbeitsbereichs, um das importierte Notizbuch zu überprüfen.

    Screenshot mit der Liste der importierten Notizbücher und der Option, das Lakehouse auszuwählen.

  8. Wählen Sie das wwilakehouse-Lakehouse aus, um es zu öffnen, damit das Notizbuch, das Sie als Nächstes öffnen, damit verknüpft ist.

  9. Wählen Sie im oberen Navigationsmenü " Notizbuch>vorhandenes Notizbuch öffnen" aus.

    Screenshot der Liste der erfolgreich importierten Notizbücher.

  10. Wählen Sie Ihr importiertes Notizbuch für PySpark oder Spark SQL aus, und wählen Sie "Öffnen" aus. Das Notizbuch ist bereits mit Ihrem geöffneten Seehaus verknüpft, wie im Seehaus Explorer gezeigt.

Jetzt können Sie die Notizbuchzellen ausführen, die Ihre Delta-Tabellen erstellen und transformieren.

Führen Sie in den folgenden Abschnitten die Notizbuchzellen sequenziell aus. Um eine Zelle auszuführen, wählen Sie das Symbol "Ausführen " aus, das links neben der Zelle beim Daraufzeigen angezeigt wird. Sie können auch " Alle ausführen" im oberen Menüband (Start) auswählen, um alle Zellen in Sequenz auszuführen.

Von Bedeutung

Dieses Lernprogramm erfordert, dass Lakehouse-Schemas aktiviert werden. Wenn Schemas nicht aktiviert sind, funktioniert der Code in diesem Lernprogramm nicht wie beabsichtigt.

Im importierten Notizbuch sehen Sie sowohl Pfad 1- als auch Pfad 2-Abschnitte. Verwenden Sie für dieses Lernprogramm Path 1 (Lakehouse-Schemas aktiviert) und ignorieren Sie Path 2 (Lakehouse-Schemas nicht aktiviert).

Erstellen von Deltatabellen

In diesem Abschnitt führen Sie die Notizbuchzellen aus, um Delta-Tabellen aus den Rohdaten zu erstellen.

Die Tabellen folgen einem Sternschema, bei dem es sich um ein gängiges Muster zum Organisieren analytischer Daten handelt:

  • Eine Faktentabelle (fact_sale) enthält die messbaren Ereignisse des Unternehmens – in diesem Fall einzelne Verkaufstransaktionen mit Mengen, Preisen und Gewinn.
  • Dimensionstabellen (dimension_city, dimension_customer, dimension_date, dimension_employee, dimension_stock_item) enthalten die beschreibenden Attribute, die Kontext zu den Fakten geben, z. B. wo ein Verkauf stattgefunden hat, wer es gemacht hat und wann.

Wählen Sie auf dieser Lernprogrammseite die Registerkarte aus, die dem importierten Notizbuch entspricht, und verwenden Sie dieselbe Registerkarte für alle Schritte. Die Registerkarten befinden sich in diesem Artikel, nicht im Notizbuch.

  1. Zelle 1 – Spark-Sitzungskonfiguration. Diese Zelle ermöglicht zwei Fabric-Features, die das Schreiben und Lesen von Daten in nachfolgenden Zellen optimieren. V-Order optimiert das Layout der Parquet-Datei für schnellere Lesevorgänge und eine bessere Komprimierung. Durch Optimieren des Schreibvorgangs wird die Anzahl der geschriebenen Dateien reduziert und die Größe der einzelnen Dateien erhöht.

    Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.

    spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
    spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
    spark.conf.set("spark.microsoft.delta.optimizeWrite.binSize", "1073741824")
    
  2. Zelle 2 - Fakten - Verkauf. Diese Zelle liest rohe Parkettdaten aus Files/wwi-raw-data/full/fact_sale_1y_full, fügt Datumsteilspalten (Jahr, Quartal und Monat) hinzu und schreibt fact_sale als Delta-Tabelle, die nach Jahr und Quartal partitioniert wird.

    Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.

    from pyspark.sql.functions import col, year, month, quarter
    
    table_name = 'fact_sale'
    
    df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full')
    df = df.withColumn('Year', year(col("InvoiceDateKey")))
    df = df.withColumn('Quarter', quarter(col("InvoiceDateKey")))
    df = df.withColumn('Month', month(col("InvoiceDateKey")))
    
    df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/dbo/" + table_name)
    
  3. Zelle 3 – Dimensionen. Diese Zelle liest die fünf Dimension-Parquet-Datasets und schreibt sie als Delta-Tabellen (dimension_city, dimension_customer, dimension_date, dimension_employee, und dimension_stock_item) unter Tables/dbo/....

    Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.

    def loadFullDataFromSource(table_name):
       df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name)
       df = df.drop("Photo")
       df.write.mode("overwrite").format("delta").save("Tables/dbo/" + table_name)
    
    full_tables = [
       'dimension_city',
       'dimension_customer',
       'dimension_date',
       'dimension_employee',
       'dimension_stock_item'
    ]
    
    for table in full_tables:
       loadFullDataFromSource(table)
    
  4. Um die erstellten Tabellen zu überprüfen, klicken Sie mit der rechten Maustaste auf das Wwilakehouse Lakehouse im Explorer, und wählen Sie dann "Aktualisieren" aus. Die Tabellen werden angezeigt.

    Screenshot, der zeigt, wo Ihre erstellten Tabellen im Lakehouse-Explorer zu finden sind.

Transformieren von Daten für Geschäftsaggregate

In diesem Abschnitt fahren Sie im selben Notizbuch fort und führen die nächsten Zellen aus, um Aggregattabellen aus den Delta-Tabellen zu erstellen, die Sie im vorherigen Abschnitt erstellt haben.

  1. Stellen Sie sicher, dass das Notizbuch noch mit wwilakehouse verknüpft ist.

  2. Zelle 4 – Laden von Quelltabellen für die Transformation (nur PySpark). Wenn Sie das PySpark-Notizbuch verwenden, führen Sie diese Zelle aus, um Delta-Tabellen in DataFrames für die folgenden Aggregationsschritte zu laden.

    Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.

    df_fact_sale = spark.read.format("delta").load("Tables/dbo/fact_sale")
    df_dimension_date = spark.read.format("delta").load("Tables/dbo/dimension_date")
    df_dimension_city = spark.read.format("delta").load("Tables/dbo/dimension_city")
    
  3. Zelle 5 – Erstellen Sie aggregate_sale_by_date_city. Diese Zelle verknüpft Umsatz-, Datums- und Ortsdaten und erstellt dann die Aggregattabelle auf Stadtebene.

    Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.

    sale_by_date_city = (
       df_fact_sale.alias("sale")
       .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner")
       .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner")
       .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")
       .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory")
       .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit")
       .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax")
       .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount")
       .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax")
       .withColumnRenamed("sum(Profit)", "SumOfProfit")
       .orderBy("date.Date", "city.StateProvince", "city.City")
    )
    
    sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dbo/aggregate_sale_by_date_city")
    
  4. Zelle 6 – Erstellen Sie aggregate_sale_by_date_employee. Diese Zelle verknüpft Umsatz-, Datums- und Mitarbeiterdaten und erstellt dann die Aggregattabelle auf Mitarbeiterebene.

    Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.

    spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee
    AS
    SELECT
               DD.Date, DD.CalendarMonthLabel
            , DD.Day, DD.ShortMonth Month, CalendarYear Year
            , DE.PreferredName, DE.Employee
            , SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax
            , SUM(FS.TaxAmount) SumOfTaxAmount
            , SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax
            , SUM(FS.Profit) SumOfProfit
    FROM delta.`Tables/dbo/fact_sale` FS
    INNER JOIN delta.`Tables/dbo/dimension_date` DD ON FS.InvoiceDateKey = DD.Date
    INNER JOIN delta.`Tables/dbo/dimension_employee` DE ON FS.SalespersonKey = DE.EmployeeKey
    GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee
    ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC
    """)
    
    sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee")
    sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dbo/aggregate_sale_by_date_employee")
    
  5. Um die erstellten Tabellen zu überprüfen, klicken Sie mit der rechten Maustaste auf das Wwilakehouse Lakehouse im Explorer, und wählen Sie dann "Aktualisieren" aus. Die Aggregationstabellen werden angezeigt.

    Screenshot des Lakehouse-Explorers, der zeigt, wo die neuen Tabellen angezeigt werden.

In diesem Lernprogramm werden Daten als Delta Lake-Dateien geschrieben. Fabric erkennt und registriert diese Tabellen automatisch im Metastore, sodass Sie keine separaten CREATE TABLE Anweisungen ausführen müssen.

Nächster Schritt