Crea una pipeline Dataflow utilizzando Python

Questo documento mostra come utilizzare l'SDK Apache Beam per Python per creare un programma che definisce una pipeline. Dopodiché, esegui la pipeline utilizzando un runner locale diretto o un runner basato su cloud come Dataflow. Per un'introduzione alla pipeline WordCount, guarda il video Come utilizzare WordCount in Apache Beam.


Per seguire le indicazioni dettagliate per questa attività direttamente nella Google Cloud console, fai clic su Procedura guidata:

Procedura guidata


Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

  4. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  10. Install the Google Cloud CLI.

  11. Se utilizzi un provider di identità (IdP) esterno, devi prima accedere alla gcloud CLI con la tua identità federata.

  12. Per inizializzare gcloud CLI, esegui questo comando:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Verify that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • USER_IDENTIFIER: the identifier for your user account—for example, myemail@example.com.
    • ROLE: the IAM role that you grant to your user account.
  18. Concedi ruoli al account di servizio Compute Engine predefinito. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • Sostituisci PROJECT_ID con l'ID progetto.
    • Sostituisci PROJECT_NUMBER con il numero del tuo progetto. Per trovare il numero del progetto, consulta Identificare i progetti o utilizza il comando gcloud projects describe.
    • Sostituisci SERVICE_ACCOUNT_ROLE con ogni singolo ruolo.
  19. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (Standard).
    • Imposta la posizione di archiviazione su: US (Stati Uniti).
    • Sostituisci BUCKET_NAME con un nome di bucket univoco. Non includere informazioni sensibili nel nome del bucket perché lo spazio dei nomi dei bucket è globale e visibile pubblicamente.
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Copia l'ID progetto Google Cloud e il nome del bucket Cloud Storage. Ti serviranno questi valori più avanti in questo documento.
    • Configura l'ambiente

      In questa sezione, utilizza il prompt dei comandi per configurare un ambiente virtuale Python isolato per eseguire il progetto della pipeline utilizzando venv. Questo processo ti consente di isolare le dipendenze di un progetto da quelle di altri progetti.

      Se non hai un prompt dei comandi a portata di mano, puoi utilizzare Cloud Shell. Cloud Shell ha già installato il gestore dei pacchetti per Python 3, quindi puoi passare alla creazione di un ambiente virtuale.

      Per installare Python e creare un ambiente virtuale:

      1. Verifica che nel tuo sistema siano in esecuzione Python 3 e pip:
        python --version
        python -m pip --version
      2. Se necessario, installa Python 3 e poi configura un ambiente virtuale Python: segui le istruzioni fornite nelle sezioni Installazione di Python e Configurazione di venv della pagina Configurazione di un ambiente di sviluppo Python.

      Dopo aver completato la guida rapida, puoi disattivare l'ambiente virtuale eseguendo deactivate.

      Scarica l'SDK Apache Beam

      L'SDK Apache Beam è un modello di programmazione open source per pipeline di dati. Definisci una pipeline con un programma Apache Beam e poi scegli un runner, ad esempio Dataflow, per eseguire la pipeline.

      Per scaricare e installare l'SDK Apache Beam:

      1. Verifica di trovarti nell'ambiente virtuale Python che hai creato nella sezione precedente. Assicurati che il prompt inizi con <env_name>, dove env_name è il nome dell'ambiente virtuale.
      2. Installa l'ultima versione dell'SDK Apache Beam per Python:
      3. pip install apache-beam[gcp]

      Esegui la pipeline in locale

      Per vedere come viene eseguita una pipeline in locale, utilizza un modulo Python pronto all'uso per l'esempio wordcount incluso nel pacchetto apache_beam.

      L'esempio di pipeline wordcount esegue le seguenti operazioni:

      1. Accetta un file di testo come input.

        Questo file di testo si trova in un bucket Cloud Storage con il nome della risorsa gs://dataflow-samples/shakespeare/kinglear.txt.

      2. Analizza ogni riga in parole.
      3. Esegue un conteggio della frequenza delle parole tokenizzate.

      Per eseguire lo staging della pipeline wordcount in locale:

      1. Dal terminale locale, esegui l'esempio wordcount:
        python -m apache_beam.examples.wordcount \
          --output outputs
      2. Visualizza l'output della pipeline:
        more outputs*
      3. Per uscire, premi q.
      L'esecuzione della pipeline in locale ti consente di testare ed eseguire il debug del programma Apache Beam. Puoi visualizzare il codice sorgente di wordcount.py su GitHub di Apache Beam.

      Esegui la pipeline sul servizio Dataflow

      In questa sezione, esegui la pipeline di esempio wordcount dal pacchetto apache_beam nel servizio Dataflow. Questo esempio specifica DataflowRunner come parametro per --runner.
      • Esegui la pipeline:
        python -m apache_beam.examples.wordcount \
            --region DATAFLOW_REGION \
            --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://BUCKET_NAME/results/outputs \
            --runner DataflowRunner \
            --project PROJECT_ID \
            --temp_location gs://BUCKET_NAME/tmp/

        Sostituisci quanto segue:

        • DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio europe-west1

          Il flag --region sostituisce la regione predefinita impostata nel server dei metadati, nel client locale o nelle variabili di ambiente.

        • BUCKET_NAME: il nome del bucket Cloud Storage copiato in precedenza
        • PROJECT_ID: l'ID progetto Google Cloud che hai copiato in precedenza

      Visualizza i tuoi risultati

      Quando esegui una pipeline utilizzando Dataflow, i risultati vengono archiviati in un bucket Cloud Storage. In questa sezione, verifica che la pipeline sia in esecuzione utilizzando la console Google Cloud o il terminale locale.

      ConsoleGoogle Cloud

      Per visualizzare i risultati nella Google Cloud console:

      1. Nella console Google Cloud , vai alla pagina Job di Dataflow.

        Vai a Job

        La pagina Job mostra i dettagli del job wordcount, incluso lo stato In esecuzione all'inizio e poi Riuscito.

      2. Vai alla pagina Bucket di Cloud Storage.

        Vai a Bucket

      3. Dall'elenco dei bucket nel progetto, fai clic sul bucket di archiviazione creato in precedenza.

        Nella directory wordcount vengono visualizzati i file di output creati dal job.

      Terminale locale

      Visualizza i risultati dal terminale o utilizzando Cloud Shell.

      1. Per elencare i file di output, utilizza il comando gcloud storage ls:
        gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
      2. Sostituisci BUCKET_NAME con il nome del bucket Cloud Storage utilizzato nel programma della pipeline.

      3. Per visualizzare i risultati nei file di output, utilizza il comando gcloud storage cat:
        gcloud storage cat gs://BUCKET_NAME/results/outputs*

      Modifica il codice della pipeline

      La pipeline wordcount negli esempi precedenti distingue tra parole maiuscole e minuscole. I seguenti passaggi mostrano come modificare la pipeline in modo che la pipeline wordcount non faccia distinzione tra maiuscole e minuscole.
      1. Sulla macchina locale, scarica l'ultima copia del codice wordcount dal repository GitHub di Apache Beam.
      2. Dal terminale locale, esegui la pipeline:
        python wordcount.py --output outputs
      3. Visualizza i risultati:
        more outputs*
      4. Per uscire, premi q.
      5. Apri il file wordcount.py in un editor a tua scelta.
      6. All'interno della funzione run, esamina i passaggi della pipeline:
        counts = (
                lines
                | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
                | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                | 'GroupAndSum' >> beam.CombinePerKey(sum))

        Dopo split, le righe vengono suddivise in parole come stringhe.

      7. Per convertire le stringhe in minuscolo, modifica la riga dopo split:
        counts = (
                lines
                | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
                | 'lowercase' >> beam.Map(str.lower)
                | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
                | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
        Questa modifica mappa la funzione str.lower su ogni parola. Questa riga è equivalente a beam.Map(lambda word: str.lower(word)).
      8. Salva il file ed esegui il job wordcount modificato:
        python wordcount.py --output outputs
      9. Visualizza i risultati della pipeline modificata:
        more outputs*
      10. Per uscire, premi q.
      11. Esegui la pipeline modificata sul servizio Dataflow:
        python wordcount.py \
            --region DATAFLOW_REGION \
            --input gs://dataflow-samples/shakespeare/kinglear.txt \
            --output gs://BUCKET_NAME/results/outputs \
            --runner DataflowRunner \
            --project PROJECT_ID \
            --temp_location gs://BUCKET_NAME/tmp/

        Sostituisci quanto segue:

        • DATAFLOW_REGION: la regione in cui vuoi eseguire il deployment del job Dataflow
        • BUCKET_NAME: il nome del bucket Cloud Storage
        • PROJECT_ID: il tuo ID progetto Google Cloud

      Esegui la pulizia

      Per evitare che al tuo account Google Cloud vengano addebitati costi relativi alle risorse utilizzate in questa pagina, elimina il progetto Google Cloud con le risorse.

      1. Elimina il bucket:
        gcloud storage buckets delete BUCKET_NAME
      2. Se mantieni il progetto, revoca i ruoli che hai concesso al account di servizio Compute Engine predefinito. Esegui il seguente comando una volta per ciascuno dei seguenti ruoli IAM:

        • roles/dataflow.admin
        • roles/dataflow.worker
        • roles/storage.objectAdmin
        gcloud projects remove-iam-policy-binding PROJECT_ID \
            --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
            --role=SERVICE_ACCOUNT_ROLE
      3. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

        gcloud auth application-default revoke
      4. Optional: Revoke credentials from the gcloud CLI.

        gcloud auth revoke

      Passaggi successivi