Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Lär dig hur du skapar och distribuerar en pipeline som matar in GPS-data, konverterar koordinater till ursprungliga rumsliga typer och jämför med lagergeofences för att spåra ankomster med Lakeflow Spark Deklarativa Pipelines (SDP) för dataorkestrering och Auto Loader. I den här handledningen används Databricks inbyggda rumsliga datatyper (GEOMETRY, GEOGRAPHY) och inbyggda rumsliga funktioner som ST_Point, ST_GeomFromWKT och ST_Contains, så att du kan köra geospatiala arbetsflöden i stor skala utan externa bibliotek.
I den här handledningen kommer du att:
- Skapa en pipeline och generera exempel på GPS- och geofence-data i en Unity Catalog-volym.
- Ingestera rå GPS-signaler inkrementellt med hjälp av Auto Loader till en bronsströmningstabell.
- Skapa en silver-strömningstabell som konverterar latitud och longitud till en inbyggd
GEOMETRYpunkt. - Skapa en materialiserad vy av lagergeofences från WKT-polygoner.
- Kör en rumslig koppling för att skapa en tabell över ankomster till lagret (vilken enhet som gick in i vilken geofence).
Resultatet är en pipeline i medaljstil: brons (rådata från GPS), silver (punkter som geometri) och guld (geofences och ankomsthändelser). För mer information, se Vad innebär arkitekturen med medallion lakehouse?
Requirements
För att slutföra den här självstudien måste du uppfylla följande krav:
- Loggas in på en Azure Databricks-arbetsyta.
- Låt Unity Catalog vara aktiverat för din arbetsyta.
- Ha serverlös beräkning aktiverad för ditt konto om du vill använda Serverless Lakeflow Spark Deklarativa Pipelines. Om serverlös beräkning inte är aktiverad fungerar stegen med standardberäkningen för din arbetsyta.
- Ha behörighet att skapa en beräkningsresurs eller åtkomst till en beräkningsresurs.
- Ha behörighet att skapa ett nytt schema i en katalog. De behörigheter som krävs är
USE CATALOGochCREATE SCHEMA. - Ha behörighet att skapa en ny volym i ett befintligt schema. De behörigheter som krävs är
USE SCHEMAochCREATE VOLUME. - Använd en körmiljö som stöder inbyggda rumsliga typer och rumsliga funktioner.
Steg 1: Skapa en pipeline
Skapa en ny ETL-pipeline och ange standardkatalogen och schemat för dina tabeller.
På din arbetsyta klickar du på
Nytt i det övre vänstra hörnet.
Klicka på ETL-pipeline.
Ändra rubriken på pipelinen till
Spatial pipeline tutorialeller ett namn som du föredrar.Under rubriken väljer du en katalog och ett schema som du har skrivbehörighet för.
Den här katalogen och schemat används som standard när du inte anger någon katalog eller ett schema i koden. Ersätt
<catalog>och<schema>i följande steg med de värden du väljer här.Från Avancerade alternativ väljer du Starta med en tom fil.
Välj en mapp för koden. Du kan välja Bläddra för att välja en mapp. du kan använda en Git-mapp för versionskontroll.
Välj Python eller SQL som språk för din första fil. Du kan lägga till filer på det andra språket senare.
Klicka på Välj för att skapa pipelinen och öppna Lakeflow Pipelines Editor.
Nu har du en tom pipeline med en standardkatalog och ett schema. Skapa sedan exempeldata för GPS och geofence.
Steg 2: Skapa exempeldata för GPS och geofence
Det här steget genererar exempeldata i en datamängd: råa GPS-pingar (JSON) och lagergeofences (JSON med WKT-polygoner). GPS-punkterna genereras i en avgränsningsruta som överlappar de två lagerpolygonerna, så den rumsliga kopplingen i ett senare steg returnerar ankomstrader. Du kan hoppa över det här steget om du redan har egna data i en volym eller tabell.
Klicka på
i tillgångsläsaren i Lakeflow Pipelines-redigeraren.Lägg till och sedan Utforskning.
Ange Namn till
Setup spatial data, välj Python och lämna standardmålmappen.Klicka på Skapa.
I den nya notebook-filen klistrar du in följande kod. Ersätt
<catalog>och<schema>med standardkatalogen och schemat som du angav i steg 1.Använd följande kod i notebook-filen för att generera GPS- och geofence-data.
from pyspark.sql import functions as F catalog = "<catalog>" # for example, "main" schema = "<schema>" # for example, "default" spark.sql(f"USE CATALOG `{catalog}`") spark.sql(f"USE SCHEMA `{schema}`") spark.sql(f"CREATE VOLUME IF NOT EXISTS `{catalog}`.`{schema}`.`raw_data`") volume_base = f"/Volumes/{catalog}/{schema}/raw_data" # GPS: 5000 rows in a box that overlaps both warehouse geofences (LA area) gps_path = f"{volume_base}/gps" df_gps = ( spark.range(0, 5000) .repartition(10) .select( F.format_string("device_%d", F.col("id").cast("long")).alias("device_id"), F.current_timestamp().alias("timestamp"), (-118.3 + F.rand() * 0.2).alias("longitude"), # -118.3 to -118.1 (34.0 + F.rand() * 0.2).alias("latitude"), # 34.0 to 34.2 ) ) df_gps.write.format("json").mode("overwrite").save(gps_path) print(f"Wrote 5000 GPS rows to {gps_path}") # Geofences: two warehouse polygons (WKT) in the same region geofences_path = f"{volume_base}/geofences" geofences_data = [ ("Warehouse_A", "POLYGON ((-118.35 34.02, -118.25 34.02, -118.25 34.08, -118.35 34.08, -118.35 34.02))"), ("Warehouse_B", "POLYGON ((-118.20 34.05, -118.12 34.05, -118.12 34.12, -118.20 34.12, -118.20 34.05))"), ] df_geo = spark.createDataFrame(geofences_data, ["warehouse_name", "boundary_wkt"]) df_geo.write.format("json").mode("overwrite").save(geofences_path) print(f"Wrote {len(geofences_data)} geofences to {geofences_path}")Kör cellen i anteckningsboken (Shift + Enter).
När körningen är klar innehåller volymen gps (råa pingar) och geofences (polygoner i WKT). I nästa steg matar du in GPS-data i en bronstabell.
Steg 3: Läs in GPS-data i en bronsströmningstabell
Mata in den råa GPS JSON-filen från volymen stegvis med hjälp av Auto Loader och skriv till en bronsuppspelningstabell.
Klicka på
Lägg till och sedan Transformering.
Ange Namn till
gps_bronze, välj SQL eller Python och klicka på Skapa.Ersätt filinnehållet med följande (använd fliken som matchar ditt språk). Ersätt
<catalog>och<schema>med standardkatalogen och schemat.SQL
CREATE OR REFRESH STREAMING TABLE gps_bronze COMMENT "Raw GPS pings ingested from volume using Auto Loader"; CREATE FLOW gps_bronze_ingest_flow AS INSERT INTO gps_bronze BY NAME SELECT * FROM STREAM read_files( "/Volumes/<catalog>/<schema>/raw_data/gps", format => "json", inferColumnTypes => "true" )Python
from pyspark import pipelines as dp path = "/Volumes/<catalog>/<schema>/raw_data/gps" dp.create_streaming_table( name="gps_bronze", comment="Raw GPS pings ingested from volume using Auto Loader", ) @dp.append_flow(target="gps_bronze", name="gps_bronze_ingest_flow") def gps_bronze_ingest_flow(): return ( spark.readStream.format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.inferColumnTypes", "true") .load(path) )Klicka på
Kör filen eller Kör pipelinen för att köra en uppdatering.
När uppdateringen är klar visar gps_bronze pipelinediagrammet tabellen. Lägg sedan till en silvertabell som konverterar koordinater till en intern geometripunkt.
Steg 4: Lägg till en silverströmnings-tabell med geometripunkter
Skapa en strömmande tabell som läser från bronstabellen och lägger till en GEOMETRY kolumn med hjälp av ST_Point(longitude, latitude).
Klicka på
Lägg till och sedan Transformering.
Ange Namn till
raw_gps_silver, välj SQL eller Python och klicka på Skapa.Klistra in följande kod i den nya filen.
SQL
CREATE OR REFRESH STREAMING TABLE raw_gps_silver COMMENT "GPS pings with native geometry point for spatial joins"; CREATE FLOW raw_gps_silver_flow AS INSERT INTO raw_gps_silver BY NAME SELECT device_id, timestamp, longitude, latitude, ST_Point(longitude, latitude) AS point_geom FROM STREAM(gps_bronze)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F dp.create_streaming_table( name="raw_gps_silver", comment="GPS pings with native geometry point for spatial joins", ) @dp.append_flow(target="raw_gps_silver", name="raw_gps_silver_flow") def raw_gps_silver_flow(): return ( spark.readStream.table("gps_bronze") .select( "device_id", "timestamp", "longitude", "latitude", F.expr("ST_Point(longitude, latitude)").alias("point_geom"), ) )Klicka på
Starta fil eller Starta pipeline.
Pipelinediagrammet visar nu gps_bronze och raw_gps_silver. Lägg sedan till lagergeofences som en materialiserad vy.
Steg 5: Skapa guldtabellen för lagergeofences
Skapa en materialiserad vy som läser geofences från volymen och konverterar WKT-kolumnen till en GEOMETRY kolumn med .ST_GeomFromWKT
Klicka på
Lägg till och sedan Transformering.
Ange Namn till
warehouse_geofences_gold, välj SQL eller Python och klicka på Skapa.Klistra in följande kod. Ersätt
<catalog>och<schema>med standardkatalogen och schemat.SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_geofences_gold AS SELECT warehouse_name, ST_GeomFromWKT(boundary_wkt) AS boundary_geom FROM read_files( "/Volumes/<catalog>/<schema>/raw_data/geofences", format => "json" )Python
from pyspark import pipelines as dp from pyspark.sql import functions as F path = "/Volumes/<catalog>/<schema>/raw_data/geofences" @dp.table(name="warehouse_geofences_gold", comment="Warehouse geofence polygons as geometry") def warehouse_geofences_gold(): return ( spark.read.format("json").load(path).select( "warehouse_name", F.expr("ST_GeomFromWKT(boundary_wkt)").alias("boundary_geom"), ) )Klicka på
Kör fil eller Kör pipeline.
Pipelinen innehåller nu geofences-tabellen. Lägg sedan till den rumsliga kopplingen för att beräkna ankomster till lager.
Steg 6: Skapa lagerankomsttabellen med en geospatial sammanfogning
Lägg till en materialiserad vy som kopplar de silverfärgade GPS-punkterna till geofences med hjälp av ST_Contains(boundary_geom, point_geom) för att avgöra när en enhet finns i en lagerpolygon.
Klicka på
Lägg till och sedan Transformering.
Ange Namn till
warehouse_arrivals, välj SQL eller Python och klicka på Skapa.Klistra in följande kod.
SQL
CREATE OR REPLACE MATERIALIZED VIEW warehouse_arrivals AS SELECT g.device_id, g.timestamp, w.warehouse_name FROM raw_gps_silver g JOIN warehouse_geofences_gold w ON ST_Contains(w.boundary_geom, g.point_geom)Python
from pyspark import pipelines as dp from pyspark.sql import functions as F @dp.table(name="warehouse_arrivals", comment="Devices that have entered a warehouse geofence") def warehouse_arrivals(): g = spark.read.table("raw_gps_silver") w = spark.read.table("warehouse_geofences_gold") return ( g.alias("g") .join(w.alias("w"), F.expr("ST_Contains(w.boundary_geom, g.point_geom)")) .select( F.col("g.device_id").alias("device_id"), F.col("g.timestamp").alias("timestamp"), F.col("w.warehouse_name").alias("warehouse_name"), ) )Klicka på
Kör fil eller Kör pipeline.
När uppdateringen är klar visar pipelinediagrammet alla fyra datauppsättningarna: gps_bronze, raw_gps_silver, warehouse_geofences_goldoch warehouse_arrivals.
Kontrollera den rumsliga kopplingen
Bekräfta att den rumsliga kopplingen skapade rader: punkter från silvertabellen som faller inuti en geofence visas i warehouse_arrivals. Kör något av följande i en notebook- eller SQL-redigerare (använd samma katalog och schema som pipelinemålet).
Räkna ankomster per lager (SQL):
SELECT warehouse_name, COUNT(*) AS arrival_count
FROM warehouse_arrivals
GROUP BY warehouse_name
ORDER BY warehouse_name;
Du bör se antal som inte är noll för Warehouse_A och Warehouse_B (exempel-GPS-data överlappar båda polygonerna). Så här inspekterar du exempelrader:
SELECT device_id, timestamp, warehouse_name
FROM warehouse_arrivals
ORDER BY timestamp DESC
LIMIT 10;
Samma kontroller i Python (notebook-fil):
# Count by warehouse
display(spark.table("warehouse_arrivals").groupBy("warehouse_name").count().orderBy("warehouse_name"))
# Sample rows
display(spark.table("warehouse_arrivals").orderBy("timestamp", ascending=False).limit(10))
Om du ser rader i warehouse_arrivalsST_Contains(boundary_geom, point_geom) fungerar kopplingen korrekt.
Steg 7: Schemalägg pipelinen (valfritt)
Om du vill hålla pipelinen uppdaterad när nya GPS-data hamnar i volymen skapar du ett jobb för att köra pipelinen enligt ett schema.
- Välj knappen Schema överst i redigeraren.
- Om dialogrutan Scheman visas väljer du Lägg till schema.
- Du kan också ge jobbet ett namn.
- Som standard körs schemat en gång per dag. Du kan acceptera detta eller ange ditt eget. Genom att välja Avancerat kan du ange en viss tid. Med fler alternativ kan du lägga till körningsmeddelanden.
- Välj Skapa för att tillämpa schemat.
För mer information om jobbkörningar, se Övervakning och observerbarhet för Lakeflow-jobb.