Published on

Cloud Schedulerで定時にジョブを実行する

Authors
  • avatar
    Name
    Nomad Dev Life
    Twitter
2025-07-21-title

Google Cloudで指定時刻に毎日ジョブを実行する

それぞれのユーザーが、指定した時刻に毎日ジョブを実行したい、という要件はよくあります。

しかし、スケジューラーライブラリなどを使ってその機能を実装した場合、cronなどの外部スケジューラを別途設定したり、スケジューラをserviceとしてbackgroundで実行しておくなど、スケジューラの管理が面倒です。

さらに、負荷上昇や一時的な障害などにより、ジョブの実行が失敗した場合のリトライなども考える必要があります。

今回は、Google Cloud の Cloud RunCloud SchedulerCloud Tasks を組み合わせて、スケーラブルで信頼性の高いジョブ実行システムを構築してみます。

3つのサービスの役割

それぞれのサービスが持つ特性を活かすことで、堅牢なシステムが実現します。

Cloud Scheduler

定期実行をトリガーするサービスです。毎日特定の時刻にジョブを実行したい場合に利用します。Linuxで言えばcronのようなものです。APIを呼び出すことで、各ユーザーの実行スケジュールを管理することができます。

Cloud Schedulerから任意のHTTP End PointやCloud Runを直接呼び出すことも可能ですが、後述のCloud Tasksを介することで、より信頼性の高い実行が実現します。

Cloud Tasks

タスクキューイングサービスです。Cloud Schedulerからのリクエストを受け取り、Cloud Runへのリクエストをキューイングします。これにより、Cloud Run側の急な負荷上昇や一時的な障害があっても、リクエストが失われることなく再試行されるため、システムの信頼性が向上します。

Cloud Run

サーバーレスでコンテナを実行できるため、バッチジョブの実行環境として最適です。リクエストがあった時だけ起動し、処理が終了すれば停止するため、コスト効率にも優れています。

アーキテクチャの概要

以下の流れでジョブが実行されます。

  1. Cloud Scheduler が毎日指定した時刻にトリガーされ、Cloud Tasks にタスクをプッシュします。
  2. Cloud Tasks は、プッシュされたタスクを Cloud Run のサービスエンドポイントへ HTTP リクエストとして送信します。
  3. Cloud Run はリクエストを受け取り、デプロイされたコンテナイメージに基づいてバッチジョブを実行します。
2025-07-21-title

ステップバイステップの実装例

1. Cloud Run サービスの準備

まず、実行したいバッチ処理をコンテナイメージとして作成し、Cloud Run にデプロイします。ここでは例として、FastAPI を使用した簡単なスクリプトを使用します。

バッチ処理コード (main.py):

import os
import datetime
from fastapi import FastAPI, Request

app = FastAPI()

@app.post("/")
async def run_batch_job(request: Request):
    """
    Cloud Tasks からのPOSTリクエストを受け取り、バッチジョブを実行します。
    """
    now = datetime.datetime.now()
    print(f"[{now}] Batch job started!")
    
    # リクエストボディをJSONとして読み込む (Cloud Tasks からのペイロード)
    try:
        request_body = await request.json()
        print(f"Received payload: {request_body}")
    except Exception as e:
        print(f"Could not parse request body as JSON: {e}")
        request_body = {} # JSONでない場合は空の辞書とする
    
    # ここに実際のバッチ処理ロジックを記述します。
    # 例: データベースの更新、データ処理、外部APIコールなど
    print(f"[{now}] Executing actual batch logic with message: {request_body.get('message', 'No message')}")
    
    print(f"[{now}] Batch job finished!")
    return {"message": "Batch job executed successfully", "timestamp": now.isoformat()}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", 8080)))

Dockerfile:

FROM python:3.12-slim

COPY --from=ghcr.io/astral-sh/uv:latest /uv /bin/uv

COPY . /app

WORKDIR /app
RUN uv sync --frozen --no-cache

# Run the application.
CMD ["/app/.venv/bin/fastapi", "run", "app/main.py", "--port", "8080", "--host", "0.0.0.0"]

uv:

uv add fastapi[standard]

これらのファイルを準備したら、コンテナイメージをビルドして Artifact Registry (または Container Registry) にプッシュし、Cloud Run にデプロイします。

gcloud auth configure-docker asia-northeast1-docker.pkg.dev
docker build -t asia-northeast1-docker.pkg.dev/[PROJECT_ID]/gcp-sched-tasks-run-sample/image1:0.0.1 .
docker push asia-northeast1-docker.pkg.dev/[PROJECT_ID]/gcp-sched-tasks-run-sample/image1:0.0.1

gcloud run deploy gcp-sched-tasks-run-sample \
--image asia-northeast1-docker.pkg.dev/[PROJECT_ID]/gcp-sched-tasks-run-sample/image1:0.0.1 \
--platform managed \
--region asia-northeast1 \
--allow-unauthenticated # Cloud Tasks からのアクセスを許可するため

注意: --allow-unauthenticated は、認証なしでCloud Runへのアクセスを許可するオプションです。本番環境では、Cloud TasksのサービスアカウントにCloud Run Invokerロールを付与し、認証を設定することが推奨されます。

デプロイ後、Cloud Run サービスのURLを控えておきましょう。これは後で Cloud Functions のコード内で Cloud Tasks のターゲットURLとして使用します。

2. Cloud Tasks キューの作成

次に、Cloud Tasks のキューを作成します。このキューが Cloud Scheduler からのタスクを受け取り、Cloud Run へ転送します。

gcloud tasks queues create gcp-sched-tasks-run-sample \
--location asia-northeast1

3. Cloud Scheduler ジョブの作成 (Cloud Functions 経由)

最後に、Cloud Scheduler ジョブを作成します。ここでは、毎日午前9時 (JST) にジョブを実行する例を示します。Cloud Scheduler から直接 Cloud Tasks のプッシュキューをトリガーすることはできないため、間に Cloud Functions を挟みます。Cloud Functions が Cloud Scheduler からのリクエストを受け取り、Cloud Tasks にタスクを追加します。

Cloud Functions (第1世代) のコード (main.py):

import os
import json
import datetime
from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2

# 環境変数からプロジェクトID、ロケーション、キュー名、Cloud Run URLを取得
project = os.environ.get('GCP_PROJECT')
location = os.environ.get('CLOUD_TASKS_LOCATION', 'asia-northeast1') # Cloud Tasks キューの場所
queue = os.environ.get('CLOUD_TASKS_QUEUE_NAME', 'gcp-sched-tasks-run-sample') # 作成したCloud Tasks キューの名前
cloud_run_url = os.environ.get('CLOUD_RUN_SERVICE_URL') # Cloud Run サービスのURL

client = tasks_v2.CloudTasksClient()
parent = client.queue_path(project, location, queue)

def add_task_to_cloud_tasks(request):
    """Cloud Scheduler からのHTTPリクエストを受け取り、Cloud Tasks にタスクを追加する"""
    if not cloud_run_url:
        print("Error: CLOUD_RUN_SERVICE_URL environment variable is not set.")
        return "Internal Server Error: Cloud Run URL not configured", 500

    request_json = request.get_json(silent=True)
    
    # Cloud Run へ渡したいペイロードがあればここで設定
    # Cloud Scheduler からの任意のデータを Cloud Run へ渡すことができます
    payload_data = {"message": "Hello from Cloud Scheduler (via Cloud Functions)!", "triggered_at": datetime.datetime.now().isoformat()}
    
    # タスクの作成
    task = {
        'http_request': {
            'http_method': tasks_v2.HttpMethod.POST,
            'url': cloud_run_url,
            'headers': {
                'Content-Type': 'application/json'
            },
            'body': json.dumps(payload_data).encode('utf-8')
        }
    }

    # タスクをキューに追加
    try:
        response = client.create_task(parent=parent, task=task)
        print(f"Task created: {response.name}")
        return "Task added to Cloud Tasks successfully", 200
    except Exception as e:
        print(f"Error creating task: {e}")
        return f"Error adding task to Cloud Tasks: {e}", 500

requirements.txt

google-cloud-tasks==2.15.0 # Cloud Tasks APIと連携するためのライブラリ
google-cloud-storage # もしCloud Storageを使うなら (今回は必須ではないですが例として) 
Flask # request.get_json() を使う場合は必要 (ただし、Cloud Functionsのトリガーの場合は自動的に提供されることが多い)

この Cloud Functions をデプロイし、そのトリガーURLを Cloud Scheduler の --uri に指定します。

gcloud functions deploy cloud_tasks_http_endpoint_fastapi \
  --runtime python313 \
  --trigger-http \
  --allow-unauthenticated \
  --entry-point add_task_to_cloud_tasks \
  --region asia-northeast1 \
  --set-env-vars GCP_PROJECT=[PROJECT_ID],CLOUD_TASKS_LOCATION=asia-northeast1,CLOUD_TASKS_QUEUE_NAME=gcp-sched-tasks-run-sample,CLOUD_RUN_SERVICE_URL=[YOUR_CLOUD_RUN_SERVICE_URL]

Cloud Functions デプロイ時の注意: [YOUR_CLOUD_RUN_SERVICE_URL] は、上記でデプロイした Cloud Run サービスの実際のURLに置き換えてください。

次に、この Cloud Functions のトリガーURLを使用して Cloud Scheduler ジョブを作成します。

gcloud scheduler jobs create http gcp-sched-tasks-run-sample-job \
  --schedule "0 0 * * *" \
  --uri "https://[REGION]-[PROJECT_ID].cloudfunctions.net/cloud_tasks_http_endpoint_fastapi" \
  --http-method POST \
  --message-body "{}" \
  --headers "Content-Type=application/json" \
  --oauth-service-account-email [CLOUD_SCHEDULER_SERVICE_ACCOUNT_EMAIL] \
  --location asia-northeast1

Cloud Scheduler の --oauth-service-account-email について:

Cloud Scheduler が Cloud Functions を呼び出す際に認証が必要な場合は、適切なサービスアカウントを指定します。Cloud Functions が --allow-unauthenticated でデプロイされていれば不要ですが、本番環境では認証を設定してください。


おわりに

Cloud Scheduler, Cloud Tasks, Cloud Run を組み合わせることで、ユーザーが指定した時刻に毎日ジョブを実行する、スケーラブルで信頼性の高いシステムを構築できます。Cloud Scheduler で定期実行をトリガーし、Cloud Tasks で堅牢なキューイングを実現することで、システム全体の安定性を高めることができます。

今回の例ではCloud Schedulerのジョブを1つしか作成しませんでしたが、ユーザー毎に作成すれば、ユーザーが指定した任意の時刻にジョブを実行することもできます。