建立發送工作

本頁說明如何建立工作並加入發送佇列。當您要處理工作時,必須先建立新的工作物件並加入佇列。您可以明確指定用於處理工作的服務和處理常式,然後選擇將工作特定資料傳送給處理常式;也可以微調工作的設定,例如安排未來的執行時間,或限制工作失敗時的重試次數。

建立新工作

如要建立工作並將工作加入佇列,請呼叫 taskqueue.add() 函式。以下程式碼會建立以 worker 為目標的服務工作,並透過設定網址 /update-counter 來叫用其處理常式:

class EnqueueTaskHandler(webapp2.RequestHandler):
    def post(self):
        amount = int(self.request.get('amount'))

        task = taskqueue.add(
            url='/update_counter',
            target='worker',
            params={'amount': amount})

        self.response.write(
            'Task {} enqueued, ETA {}.'.format(task.name, task.eta))

您也可以建立 Task 物件並呼叫其 add() 方法。

指定工作站服務

工作離開佇列時,工作佇列服務會將工作傳送至工作站服務。每個工作都有「目標」和「網址」,用於判斷最後執行工作的服務和處理常式。

target

目標會指定用於接收 HTTP 要求以執行工作的服務。這是字串,會以任何一種標準格式來指定服務/版本/執行個體。最常使用的格式如下:

    service
    version.service
    instance.version.service

系統會在應用程式的網域名稱前面添加目標字串。為工作設定目標的方法有三種:

  • 在建構工作時宣告目標。您可以使用 taskqueue.add() 函式的 target 參數明確設定目標,請參閱上方的範例。

  • queue.yaml 中定義佇列時加入 target 指令,如 queue-blue定義所示。所有使用 target 加入佇列的工作都會使用這個目標,即使建構工作時指派了不同的目標也是如此。

  • 如果並未以上述任一方法指定目標,則工作的目標即是將工作加入佇列的服務版本。 請注意,如果您以此方式使用預設的服務和版本,將工作加入佇列,而預設版本在工作執行之前有所變更,則工作會以新的預設版本執行。

url

url 會選取目標服務的其中一個處理常式來執行工作。

url 必須與目標服務的其中一個處理常式網址格式相符。如果工作中指定的方法為 GETPULL,則 url 可以包含查詢參數。如果並未指定任何 url,則會使用預設網址 /_ah/queue/[QUEUE_NAME],其中 [QUEUE_NAME] 是工作佇列的名稱。

將資料傳送至處理常式

您可以透過工作網址中的查詢參數,將資料傳送至處理常式,但只有在工作中指定的方法為 GETPULL,才能使用這種方式。

您也可以使用以下任何欄位將資料新增至工作:

  • payload,會在 HTTP 要求的內文中提交工作資料。
  • params

以下三種呼叫方式具相同效果:

taskqueue.add(method=GET, url='/update-counter?key=blue', target='worker')
taskqueue.add(url='/update-counter', params={'key': 'blue'}, target='worker')
taskqueue.add(url='/update-counter', payload="{'key': 'blue'}", target='worker')

命名工作

根據預設,在您建立新工作時,App Engine 會為工作指派不重複的名稱。不過,您可以使用 name 參數自行為工作指派名稱。指派自己的工作名稱有個優點:名稱相同的工作會遭到排除,也就是可以使用工作名稱來確保每項工作僅會加入一次。排除重複作業會在工作完成或刪除後持續 9 天。

請注意,排除重複邏輯會對效能造成大量負擔,加重延遲情形,且可能會提高命名工作相關的錯誤率。如果工作名稱採循序方式 (例如附有時間戳記),則這些成本還可能會大幅增加。因此,如果您自行指派名稱,建議您在工作名稱中使用分佈均勻的字首,例如內容的雜湊。

如果您要自行指派工作名稱,請注意,名稱的長度上限為 500 個字元,名稱可使用大小寫字母、數字、底線和連字號。

taskqueue.add(url='/url/path', name='first-try')

非同步新增工作

預設情況下,新增工作至佇列的呼叫會同步執行。在大多數情況下,同步呼叫皆能正常執行。新增工作至佇列的作業通常很快速。少數新增工作的作業可能需要較長的時間,但新增工作所需時間的中位數少於 5 毫秒。

由於無法批次新增任務至不同佇列,因而 Task Queue API 也提供非同步呼叫,讓使用者得以平行新增工作,進而盡可能縮短延遲時間。如果您建構的應用程式對延遲極其敏感,且需要同時對不同佇列執行新增工作的作業,就很適合。

如果想對同一工作佇列執行非同步呼叫,請使用佇列類別和 RPC 物件提供的非同步方法。請在傳回的 RPC 物件上呼叫 get_result(),以強制完成要求。在同一交易中非同步新增工作時,可在修訂交易前在 RPC 物件上呼叫 get_result(),以確保要求已完成。

在 Cloud Datastore 交易中將工作排入佇列

您可以將工作排入佇列做為 Datastore 交易的一部分,以便只在成功修訂交易時才將工作排入佇列,並確保工作已順利新增。加入到交易中的工作會視為交易的一部分,並且具有相同層級的隔離與一致性

應用程式無法在單一交易期間將超過五個的交易工作插入到工作佇列。交易工作不能有使用者指定的名稱。

以下程式碼範例示範如何將交易工作插入到發送佇列中,成為 Datastore 交易的一部分:

from google.appengine.api import taskqueue
from google.appengine.ext import ndb

@ndb.transactional
def do_something_in_transaction():
  taskqueue.add(url='/path/to/my/worker', transactional=True)
  #...

do_something_in_transaction()

使用延遲工作程式庫而非工作站服務

要為每個不同的工作設定處理常式 (如前幾節所述) 可能相當麻煩,因為可能將工作的複雜引數序列化或取消序列化,特別是當有各式各樣的小工作要在佇列中執行時,更是個大工程。Python SDK 包含一個程式庫 (google.appengine.ext.deferred),可提供簡單的函式,讓您略過設定專屬工作處理常式,以及序列化和反序列化參數的所有工作。

如要使用這個程式庫,您必須將 deferred 內建元件新增至 app.yaml。如需更多資訊,請參閱 app.yaml 參考資料的「內建處理程序」一節。

如要使用 deferred 程式庫,只需將函式及其引數傳遞至 deferred.defer()

import logging

from google.appengine.ext import deferred

def do_something_expensive(a, b, c=None):
    logging.info("Doing something expensive!")
    # Do your work here

# Somewhere else
deferred.defer(do_something_expensive, "Hello, world!", 42, True)

deferred 程式庫會將函式呼叫及其引數打包,然後將其新增至工作佇列。執行工作時,deferred 程式庫會執行 do_something_expensive("Hello, world!", 42, True)

在多租戶應用程式中處理工作

根據預設,發送佇列會使用工作建立時在命名空間管理員中設定的目前命名空間。如果您的應用程式使用多租戶架構,請參閱命名空間 Python 2 API 一文。

後續步驟