Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Tutorial verwenden Sie Notebooks mit der Spark-Runtime, um Rohdaten in Ihrem Lakehouse zu transformieren und aufzubereiten.
Voraussetzungen
Bevor Sie beginnen, müssen Sie die vorherigen Lernprogramme in dieser Reihe abschließen:
- Erstellen eines Lakehouse
- Daten in das Lakehouse einlesen
- Stellen Sie sicher, dass in Ihrem Lakehouse die Lakehouse-Schemas aktiviert sind.
Vorbereiten von Daten
Aus den vorherigen Tutorial-Schritten haben Sie Rohdaten aus der Quelle in den Abschnitt "Dateien" des Lakehouse importiert. Jetzt können Sie diese Daten transformieren und für die Erstellung von Deltatabellen aufbereiten.
Laden Sie die Notebooks aus dem Ordner Lakehouse Tutorial Source Code herunter.
Wechseln Sie in Ihrem Browser zum Fabric-Arbeitsbereich im Fabric-Portal.
Wählen Sie "Notizbuch>von diesem Computer>" aus.
Wählen Sie im Bereich Importstatus, der auf der rechten Seite des Bildschirms geöffnet wird, die Option Hochladen aus.
Wählen Sie nur das Notizbuch aus, das Ihrer bevorzugten Codierungssprache entspricht.
-
PySpark (
Prepare and transform data - PySpark.ipynb) -
Spark SQL (
Prepare and transform data - Spark SQL.ipynb)
-
PySpark (
Klicken Sie auf Öffnen. In der oberen rechten Ecke des Browserfensters wird eine Benachrichtigung zum Importstatus angezeigt.
Nachdem der Import erfolgreich war, wechseln Sie zur Elementansicht des Arbeitsbereichs, um das importierte Notizbuch zu überprüfen.
Wählen Sie das wwilakehouse-Lakehouse aus, um es zu öffnen, damit das Notizbuch, das Sie als Nächstes öffnen, damit verknüpft ist.
Wählen Sie im oberen Navigationsmenü " Notizbuch>vorhandenes Notizbuch öffnen" aus.
Wählen Sie Ihr importiertes Notizbuch für PySpark oder Spark SQL aus, und wählen Sie "Öffnen" aus. Das Notizbuch ist bereits mit Ihrem geöffneten Seehaus verknüpft, wie im Seehaus Explorer gezeigt.
Jetzt können Sie die Notizbuchzellen ausführen, die Ihre Delta-Tabellen erstellen und transformieren.
Führen Sie in den folgenden Abschnitten die Notizbuchzellen sequenziell aus. Um eine Zelle auszuführen, wählen Sie das Symbol "Ausführen " aus, das links neben der Zelle beim Daraufzeigen angezeigt wird. Sie können auch " Alle ausführen" im oberen Menüband (Start) auswählen, um alle Zellen in Sequenz auszuführen.
Von Bedeutung
Dieses Lernprogramm erfordert, dass Lakehouse-Schemas aktiviert werden. Wenn Schemas nicht aktiviert sind, funktioniert der Code in diesem Lernprogramm nicht wie beabsichtigt.
Im importierten Notizbuch sehen Sie sowohl Pfad 1- als auch Pfad 2-Abschnitte. Verwenden Sie für dieses Lernprogramm Path 1 (Lakehouse-Schemas aktiviert) und ignorieren Sie Path 2 (Lakehouse-Schemas nicht aktiviert).
Erstellen von Deltatabellen
In diesem Abschnitt führen Sie die Notizbuchzellen aus, um Delta-Tabellen aus den Rohdaten zu erstellen.
Die Tabellen folgen einem Sternschema, bei dem es sich um ein gängiges Muster zum Organisieren analytischer Daten handelt:
- Eine Faktentabelle (
fact_sale) enthält die messbaren Ereignisse des Unternehmens – in diesem Fall einzelne Verkaufstransaktionen mit Mengen, Preisen und Gewinn. -
Dimensionstabellen (
dimension_city,dimension_customer,dimension_date,dimension_employee,dimension_stock_item) enthalten die beschreibenden Attribute, die Kontext zu den Fakten geben, z. B. wo ein Verkauf stattgefunden hat, wer es gemacht hat und wann.
Wählen Sie auf dieser Lernprogrammseite die Registerkarte aus, die dem importierten Notizbuch entspricht, und verwenden Sie dieselbe Registerkarte für alle Schritte. Die Registerkarten befinden sich in diesem Artikel, nicht im Notizbuch.
Zelle 1 – Spark-Sitzungskonfiguration. Diese Zelle ermöglicht zwei Fabric-Features, die das Schreiben und Lesen von Daten in nachfolgenden Zellen optimieren. V-Order optimiert das Layout der Parquet-Datei für schnellere Lesevorgänge und eine bessere Komprimierung. Durch Optimieren des Schreibvorgangs wird die Anzahl der geschriebenen Dateien reduziert und die Größe der einzelnen Dateien erhöht.
Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.
Zelle 2 - Fakten - Verkauf. Diese Zelle liest rohe Parkettdaten aus
Files/wwi-raw-data/full/fact_sale_1y_full, fügt Datumsteilspalten (Jahr, Quartal und Monat) hinzu und schreibtfact_saleals Delta-Tabelle, die nach Jahr und Quartal partitioniert wird.Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.
from pyspark.sql.functions import col, year, month, quarter table_name = 'fact_sale' df = spark.read.format("parquet").load('Files/wwi-raw-data/full/fact_sale_1y_full') df = df.withColumn('Year', year(col("InvoiceDateKey"))) df = df.withColumn('Quarter', quarter(col("InvoiceDateKey"))) df = df.withColumn('Month', month(col("InvoiceDateKey"))) df.write.mode("overwrite").format("delta").partitionBy("Year","Quarter").save("Tables/dbo/" + table_name)Zelle 3 – Dimensionen. Diese Zelle liest die fünf Dimension-Parquet-Datasets und schreibt sie als Delta-Tabellen (
dimension_city,dimension_customer,dimension_date,dimension_employee, unddimension_stock_item) unterTables/dbo/....Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.
def loadFullDataFromSource(table_name): df = spark.read.format("parquet").load('Files/wwi-raw-data/full/' + table_name) df = df.drop("Photo") df.write.mode("overwrite").format("delta").save("Tables/dbo/" + table_name) full_tables = [ 'dimension_city', 'dimension_customer', 'dimension_date', 'dimension_employee', 'dimension_stock_item' ] for table in full_tables: loadFullDataFromSource(table)Um die erstellten Tabellen zu überprüfen, klicken Sie mit der rechten Maustaste auf das Wwilakehouse Lakehouse im Explorer, und wählen Sie dann "Aktualisieren" aus. Die Tabellen werden angezeigt.
Transformieren von Daten für Geschäftsaggregate
In diesem Abschnitt fahren Sie im selben Notizbuch fort und führen die nächsten Zellen aus, um Aggregattabellen aus den Delta-Tabellen zu erstellen, die Sie im vorherigen Abschnitt erstellt haben.
Stellen Sie sicher, dass das Notizbuch noch mit wwilakehouse verknüpft ist.
Zelle 4 – Laden von Quelltabellen für die Transformation (nur PySpark). Wenn Sie das PySpark-Notizbuch verwenden, führen Sie diese Zelle aus, um Delta-Tabellen in DataFrames für die folgenden Aggregationsschritte zu laden.
Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.
Zelle 5 – Erstellen Sie
aggregate_sale_by_date_city. Diese Zelle verknüpft Umsatz-, Datums- und Ortsdaten und erstellt dann die Aggregattabelle auf Stadtebene.Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.
sale_by_date_city = ( df_fact_sale.alias("sale") .join(df_dimension_date.alias("date"), df_fact_sale.InvoiceDateKey == df_dimension_date.Date, "inner") .join(df_dimension_city.alias("city"), df_fact_sale.CityKey == df_dimension_city.CityKey, "inner") .select("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory", "sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit") .groupBy("date.Date", "date.CalendarMonthLabel", "date.Day", "date.ShortMonth", "date.CalendarYear", "city.City", "city.StateProvince", "city.SalesTerritory") .sum("sale.TotalExcludingTax", "sale.TaxAmount", "sale.TotalIncludingTax", "sale.Profit") .withColumnRenamed("sum(TotalExcludingTax)", "SumOfTotalExcludingTax") .withColumnRenamed("sum(TaxAmount)", "SumOfTaxAmount") .withColumnRenamed("sum(TotalIncludingTax)", "SumOfTotalIncludingTax") .withColumnRenamed("sum(Profit)", "SumOfProfit") .orderBy("date.Date", "city.StateProvince", "city.City") ) sale_by_date_city.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dbo/aggregate_sale_by_date_city")Zelle 6 – Erstellen Sie
aggregate_sale_by_date_employee. Diese Zelle verknüpft Umsatz-, Datums- und Mitarbeiterdaten und erstellt dann die Aggregattabelle auf Mitarbeiterebene.Führen Sie diese Zelle aus, und warten Sie, bis sie abgeschlossen ist, bevor Sie mit dem nächsten Schritt fortfahren.
spark.sql(""" CREATE OR REPLACE TEMPORARY VIEW sale_by_date_employee AS SELECT DD.Date, DD.CalendarMonthLabel , DD.Day, DD.ShortMonth Month, CalendarYear Year , DE.PreferredName, DE.Employee , SUM(FS.TotalExcludingTax) SumOfTotalExcludingTax , SUM(FS.TaxAmount) SumOfTaxAmount , SUM(FS.TotalIncludingTax) SumOfTotalIncludingTax , SUM(FS.Profit) SumOfProfit FROM delta.`Tables/dbo/fact_sale` FS INNER JOIN delta.`Tables/dbo/dimension_date` DD ON FS.InvoiceDateKey = DD.Date INNER JOIN delta.`Tables/dbo/dimension_employee` DE ON FS.SalespersonKey = DE.EmployeeKey GROUP BY DD.Date, DD.CalendarMonthLabel, DD.Day, DD.ShortMonth, DD.CalendarYear, DE.PreferredName, DE.Employee ORDER BY DD.Date ASC, DE.PreferredName ASC, DE.Employee ASC """) sale_by_date_employee = spark.sql("SELECT * FROM sale_by_date_employee") sale_by_date_employee.write.mode("overwrite").format("delta").option("overwriteSchema", "true").save("Tables/dbo/aggregate_sale_by_date_employee")Um die erstellten Tabellen zu überprüfen, klicken Sie mit der rechten Maustaste auf das Wwilakehouse Lakehouse im Explorer, und wählen Sie dann "Aktualisieren" aus. Die Aggregationstabellen werden angezeigt.
In diesem Lernprogramm werden Daten als Delta Lake-Dateien geschrieben. Fabric erkennt und registriert diese Tabellen automatisch im Metastore, sodass Sie keine separaten CREATE TABLE Anweisungen ausführen müssen.