Dataflow-Pipeline mit Python erstellen
In diesem Dokument erfahren Sie, wie Sie das Apache Beam SDK für Python verwenden, um ein Programm zu erstellen, das eine Pipeline definiert. Anschließend führen Sie die Pipeline mit einem direkten lokalen Runner oder einem cloudbasierten Runner wie Dataflow aus. Eine Einführung in die WordCount-Pipeline finden Sie im Video How to use WordCount in Apache Beam.
Eine detaillierte Anleitung dazu finden Sie direkt in der Google Cloud Console. Klicken Sie dazu einfach auf Anleitung:
Hinweise
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
-
Install the Google Cloud CLI.
-
Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.
-
Führen Sie folgenden Befehl aus, um die gcloud CLI zu initialisieren:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
Replace the following:
PROJECT_ID
: your project ID.USER_IDENTIFIER
: the identifier for your user account—for example,myemail@example.com
.ROLE
: the IAM role that you grant to your user account.
Weisen Sie Ihrem Compute Engine-Standarddienstkonto Rollen zu. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
- Ersetzen Sie
PROJECT_ID
durch Ihre Projekt-ID. - Ersetzen Sie
PROJECT_NUMBER
durch die Projekt-ID. Ihre Projektnummer finden Sie unter Projekte identifizieren oder verwenden Sie den Befehlgcloud projects describe
. - Ersetzen Sie
SERVICE_ACCOUNT_ROLE
durch jede einzelne Rolle.
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standard). -
Legen Sie als Speicherort Folgendes fest:
US
(USA). -
Ersetzen Sie
BUCKET_NAME
durch einen eindeutigen Bucket-Namen. Der Bucket-Name darf keine vertraulichen Informationen enthalten, da der Bucket-Namespace global und öffentlich sichtbar ist. - Kopieren Sie die Google Cloud Projekt-ID und den Namen des Cloud Storage-Bucket. Sie benötigen diese Werte später in diesem Dokument.
- Prüfen Sie, ob Python 3 und
pip
in Ihrem System ausgeführt werden:python --version python -m pip --version
- Installieren Sie gegebenenfalls Python 3 und richten Sie dann eine virtuelle Python-Umgebung ein. Folgen Sie dazu der Anleitung in den Abschnitten Python installieren und venv einrichten auf der Seite Python-Entwicklungsumgebung einrichten.
- Prüfen Sie, ob Sie sich in der virtuellen Python-Umgebung befinden, die Sie im vorherigen Abschnitt erstellt haben.
Die Eingabeaufforderung beginnt mit
<env_name>
, wobeienv_name
der Name der virtuellen Umgebung ist. - Installieren Sie die neueste Version des Apache Beam SDK für Python:
Sie nimmt eine Textdatei als Eingabe an.
Sie finden die Textdatei in einem Cloud Storage-Bucket mit dem Ressourcennamen
gs://dataflow-samples/shakespeare/kinglear.txt
.- Sie parst jede Zeile und unterteilt sie in Wörter.
- Sie misst die Häufigkeit der tokenisierten Wörter.
- Führen Sie auf Ihrem lokalen Terminal das Beispiel
wordcount
aus:python -m apache_beam.examples.wordcount \ --output outputs
- Sehen Sie sich die Ausgabe der Pipeline an:
more outputs*
- Drücken Sie zum Beenden q.
- Führen Sie die Pipeline aus:
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Ersetzen Sie Folgendes:
DATAFLOW_REGION
: Die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B.europe-west1
Das Flag
--region
überschreibt die Standardregion, die auf dem Metadatenserver, auf Ihrem lokalen Client oder in Umgebungsvariablen festgelegt ist.BUCKET_NAME
: der Name des Cloud Storage-Bucket, den Sie zuvor kopiert habenPROJECT_ID
: die Google Cloud Projekt-ID, die Sie zuvor kopiert haben
- Rufen Sie in der Google Cloud Console die Dataflow-Seite Jobs auf.
Auf der Seite Jobs werden Details zum
wordcount
-Job angezeigt, z. B. der Status Aktiv und dann Erfolgreich. - Rufen Sie die Seite Cloud Storage-Buckets auf:
Klicken Sie in der Liste der Buckets in Ihrem Projekt auf den Storage-Bucket, den Sie zuvor erstellt haben.
Im Verzeichnis
wordcount
werden die von Ihrem Job erstellten Ausgabedateien angezeigt.- Verwenden Sie den Befehl
gcloud storage ls
, um die Ausgabedateien aufzulisten:gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- Verwenden Sie den Befehl
gcloud storage cat
, um die Ergebnisse in den Ausgabedateien aufzurufen:gcloud storage cat gs://BUCKET_NAME/results/outputs*
- Laden Sie auf Ihrem lokalen Computer die neueste Kopie des
wordcount
-Codes aus dem Apache Beam GitHub-Repository herunter. - Führen Sie die Pipeline über das lokale Terminal aus:
python wordcount.py --output outputs
- Rufen Sie die Ergebnisse auf:
more outputs*
- Drücken Sie zum Beenden q.
- Öffnen Sie die Datei
wordcount.py
in einem Editor Ihrer Wahl. - Sehen Sie sich die Pipelineschritte in der Funktion
run
an:counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
Nach
split
werden die Zeilen in Wörter als Strings unterteilt. - Wenn Sie die Strings in Kleinbuchstaben darstellen möchten, ändern Sie die Zeile nach
split
: Durch diese Änderung wird die Funktioncounts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lower
jedem Wort zugeordnet. Diese Zeile entsprichtbeam.Map(lambda word: str.lower(word))
. - Speichern Sie die Datei und führen Sie den geänderten Job
wordcount
aus:python wordcount.py --output outputs
- Sehen Sie sich die Ergebnisse der geänderten Pipeline an:
more outputs*
- Drücken Sie zum Beenden q.
- Führen Sie die geänderte Pipeline im Dataflow-Dienst aus:
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
Ersetzen Sie Folgendes:
DATAFLOW_REGION
: die Region, in der Sie den Dataflow-Job bereitstellen möchtenBUCKET_NAME
: Name Ihres Cloud Storage-Buckets.PROJECT_ID
: Ihre Google Cloud -Projekt-ID
-
Löschen Sie den Bucket:
gcloud storage buckets delete BUCKET_NAME
Wenn Sie Ihr Projekt beibehalten, widerrufen Sie die Rollen, die Sie dem Compute Engine-Standarddienstkonto zugewiesen haben. Führen Sie den folgenden Befehl für jede der folgenden IAM-Rollen einmal aus:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
Umgebung einrichten
In diesem Abschnitt richten Sie über die Eingabeaufforderung eine isolierte virtuelle Python-Umgebung ein, um Ihr Pipeline-Projekt mit venv auszuführen. Auf diese Weise können Sie die Abhängigkeiten eines Projekts von den Abhängigkeiten anderer Projekte isolieren.
Wenn Ihnen im Moment keine Eingabeaufforderung zur Verfügung steht, können Sie Cloud Shell verwenden. Der Paketmanager für Python 3 ist in Cloud Shell bereits installiert, sodass Sie mit dem Erstellen einer virtuellen Umgebung fortfahren können.
So installieren Sie Python und erstellen dann eine virtuelle Umgebung:
Nachdem Sie die Kurzanleitung durchgearbeitet haben, können Sie die virtuelle Umgebung mit dem Befehl
deactivate
deaktivieren.Apache Beam SDK abrufen
Das Apache Beam SDK ist ein Open-Source-Programmiermodell für Datenpipelines. Sie definieren eine Pipeline mit einem Apache Beam-Programm und wählen dann einen Runner wie Dataflow aus, um Ihre Pipeline auszuführen.
So laden Sie das Apache Beam SDK herunter und installieren es:
pip install apache-beam[gcp]
Pipeline lokal ausführen
Wenn Sie sehen möchten, wie eine Pipeline lokal ausgeführt wird, verwenden Sie ein fertiges Python-Modul für das Beispiel
wordcount
, das im Paketapache_beam
enthalten ist.Das Pipeline-Beispiel
wordcount
führt Folgendes aus:Führen Sie die folgenden Schritte aus, um die Pipeline
wordcount
lokal bereitzustellen:wordcount.py
auf dem GitHub für Apache Beam ansehen.Pipeline im Dataflow-Dienst ausführen
In diesem Abschnitt führen Sie diewordcount
-Beispielpipeline aus dem Paketapache_beam
im Dataflow-Dienst aus. In diesem Beispiel wirdDataflowRunner
als Parameter für--runner
angegeben.Ergebnisse ansehen
Wenn Sie eine Pipeline in Dataflow ausführen, werden die Ergebnisse in einem Cloud Storage-Bucket gespeichert. Prüfen Sie in diesem Abschnitt, ob die Pipeline mit der Google Cloud Console oder dem lokalen Terminal ausgeführt wird.
Google Cloud console
So rufen Sie Ihre Ergebnisse in der Google Cloud Console auf:
Lokales Terminal
Sehen Sie sich die Ergebnisse über Ihr Terminal oder mithilfe von Cloud Shell an.
Ersetzen Sie
BUCKET_NAME
durch den Namen des Cloud Pipeline-Buckets, der im Pipelineprogramm verwendet wird.Pipelinecode ändern
Diewordcount
-Pipeline in den vorherigen Beispielen unterscheidet zwischen groß- und kleingeschriebenen Wörtern. In den folgenden Schritten wird gezeigt, wie Sie die Pipeline so ändern, dass die Groß- und Kleinschreibung bei derwordcount
-Pipeline nicht berücksichtigt wird.Bereinigen
Löschen Sie das Google Cloud -Projekt mit den Ressourcen, damit Ihrem Google Cloud -Konto die auf dieser Seite verwendeten Ressourcen nicht in Rechnung gestellt werden.
Nächste Schritte
-
Set the storage class to