About Pulumi:
Build and ship infrastructure faster using languages you know and love. Use Pulumi’s open source SDK to provision infrastructure on any cloud, and securely and collaboratively build and manage infrastructure using Pulumi Cloud.
Why Pulumi?
To be able to develop in a programming language we love using the features of that language.
Aim of Article
To build an end to end infrastructure that will write the event
sent via http
from any application (game, e-commerce etc. can be considered) to BigQuery
Before we start you have to complete the following steps:
So you will have pulumi project and stack.
:memo: Note: Although I will be explaining this article on GCP, I have created a project to create a similar architecture with AWS to get the settings via yaml and I will explain it through this project.
To build a data pipeline we need to endpoint for http requests. We can create a Cloud Function or API Gateway for handling to HTTP requests.
💡 Tip: In this article we can continue with
Google Cloud API Gateway
First we need to set up Storage:
We need a function to write data to a topic in Google Cloud Pubsub, we can get zip file from the Google Cloud Storage Bucket.
it can be reach out from this link to Cloud function source code.
With this code it can be sent data to Google Cloud Pubsub
In order to create a zip file from source code you can run following code:
make function-zip
// createBucketForFunction creates bucket for function
// it is used to get zipped function
func (g *Gcp) createBucketForFunction() (*storage.Bucket, *storage.BucketObject, error) {
bucket, err := storage.NewBucket(g.ctx, g.config.Function.Build.Source.Storage.Bucket.Name, &storage.BucketArgs{
Name: pulumi.String(g.config.Function.Build.Source.Storage.Bucket.Name), // "ptemplate-gcf-source"
Location: pulumi.String(g.region), // "europe-west3"
UniformBucketLevelAccess: pulumi.Bool(true),
ForceDestroy: pulumi.Bool(g.config.Function.Build.Source.Storage.ForceDestroy), // true
g.functionSourceBucket = bucket
object, err := storage.NewBucketObject(g.ctx, g.config.Function.Build.Source.Storage.Bucket.Object.Name, &storage.BucketObjectArgs{
Name: pulumi.String(g.config.Function.Build.Source.Storage.Bucket.Object.Name), // ""
Bucket: bucket.Name,
Source: pulumi.NewFileAsset(g.config.Function.Build.Source.Storage.Bucket.Object.Path), // "assets/cloudfunction/"
}, pulumi.DependsOn([]pulumi.Resource{bucket}))
g.functionSourceBucketObj = object
return bucket, object, err
In order to create a Bigquery
cluster, we need to assign parameters like below.You can find sample values via config.yaml.
After this process is over, we need a table where we will store the events, we need to create a schema.
// CreateDWH creates BigQuery Dataset and Table according to given values
// Initial schema will be created
func (g *Gcp) CreateDWH() error {
dataset, err := bigquery.NewDataset(g.ctx, g.config.Dwh.BigQuery.Dataset, &bigquery.DatasetArgs{
DatasetId: pulumi.String(g.config.Dwh.BigQuery.Dataset), // "ptemplate_dataset"
Location: pulumi.String(g.region), // "europe-west2"
table, err := bigquery.NewTable(g.ctx, g.config.Dwh.BigQuery.TableId, &bigquery.TableArgs{
DeletionProtection: pulumi.Bool(g.config.Dwh.BigQuery.DeletionProtection), // false
TableId: pulumi.String(g.config.Dwh.BigQuery.TableId), // "ptemplate_events"
DatasetId: dataset.DatasetId,
"name": "game_name",
"type": "STRING",
"mode": "NULLABLE"
"name": "event_name",
"type": "STRING",
"mode": "NULLABLE"
"name": "event_data",
"type": "JSON",
"mode": "NULLABLE"
Schema: pulumi.String(g.config.Dwh.BigQuery.Schema),
g.table = table
return err
With Google Cloud Pubsub
, we can create a topic and write data in it and then create a subscription to transfer this data to a Google Cloud service.
Below you can find how to create subscriptions for Google Cloud Storage
and Bigquery
// CreateStream create Pubsub topic and subscription according to given values
// You can set the destination such as "cloudstorage,bigquery"
func (g *Gcp) CreateStream() error {
var err error
topic, err := pubsub.NewTopic(g.ctx, g.config.Stream.PubSubConf.Topic.Name, &pubsub.TopicArgs{
Name: pulumi.String(g.config.Stream.PubSubConf.Topic.Name), // "events_topic"
g.topic = topic
subsArgs := &pubsub.SubscriptionArgs{
Name: pulumi.String(g.config.Stream.PubSubConf.Subscription.Name), // "subscription"
Topic: topic.ID(),
AckDeadlineSeconds: pulumi.Int(20),
resources := []pulumi.Resource{}
if g.config.Stream.Destination == "cloudstorage" {
subsArgs.CloudStorageConfig = &pubsub.SubscriptionCloudStorageConfigArgs{
Bucket: g.bucket.ID(),
MaxDuration: pulumi.String(g.config.Stream.PubSubConf.Subscription.CloudStorageConf.Duration),
} else if g.config.Stream.Destination == "bigquery" {
subsArgs.BigqueryConfig = &pubsub.SubscriptionBigqueryConfigArgs{
Table: pulumi.All(g.table.Project, g.table.DatasetId, g.table.TableId).ApplyT(func(_args []interface{}) (string, error) {
project := _args[0].(string)
datasetId := _args[1].(string)
tableId := _args[2].(string)
return fmt.Sprintf("%v.%v.%v", project, datasetId, tableId), nil
UseTableSchema: pulumi.Bool(true),
resources = append(resources, g.table)
_, err = pubsub.NewSubscription(g.ctx, g.config.Stream.PubSubConf.Subscription.Name, subsArgs, pulumi.DependsOn(resources))
return err
Cloudfunctionsv2 service needs a service account, so we create it first. It shows our zip file in the bucket as the source.
Requests coming through the API Gateway
are sent directly to the Cloud Run
💡 cloudfunctionsv2 is new version for cloudfunctions You can check the link
// createServiceAccount creates a service account for cloud function
func (g *Gcp) createServiceAccount() (*serviceaccount.Account, error) {
account, err := serviceaccount.NewAccount(g.ctx, g.config.Iam.ServiceAcc.AccountID, &serviceaccount.AccountArgs{
AccountId: pulumi.String(g.config.Iam.ServiceAcc.AccountID), // "ptemplate-svc-acc"
DisplayName: pulumi.String(g.config.Iam.ServiceAcc.DisplayName), // "Ptemplate Service Account"
Project: pulumi.String(g.config.Iam.ServiceAcc.Project), // "pulumitemplate"
CreateIgnoreAlreadyExists: pulumi.Bool(true),
g.serviceAcc = account
return account, err
// CreateFunction creates Cloud function/Cloud Run according to given values
// You can upload zip to lambda
// If lambda function creation operation is successful URL will be exported.
// cloudfunctionsv2 is new version for cloudfunctions You can check the link below
func (g *Gcp) CreateFunction() error {
if g.config.Iam.ServiceAcc != nil {
_, _ = g.createServiceAccount()
buildArgs := &cloudfunctionsv2.FunctionBuildConfigArgs{
Runtime: pulumi.String(g.config.Function.Build.Runtime), // "go122"
EntryPoint: pulumi.String(g.config.Function.Build.EntryPoint), // "PubsubProducer"
funcArgs := &cloudfunctionsv2.FunctionArgs{
Name: pulumi.String(g.config.Function.Name), // ptemplate-proxy
Location: pulumi.String(g.region), // "europe-west2"
Project: pulumi.String(g.project), // "pulumitemplate"
BuildConfig: buildArgs,
ServiceConfig: &cloudfunctionsv2.FunctionServiceConfigArgs{
MaxInstanceCount: pulumi.Int(g.config.Function.ServiceConf.MaxInstance), // 1
AvailableMemory: pulumi.String(g.config.Function.ServiceConf.AvailableMem), // "256M"
TimeoutSeconds: pulumi.Int(g.config.Function.ServiceConf.Timeout), // 60
ServiceAccountEmail: g.serviceAcc.Email,
EnvironmentVariables: pulumi.StringMap{
"PROJECT_ID": pulumi.String(g.project), // "pulumitemplate"
"TOPIC_ID": pulumi.String(g.config.Stream.PubSubConf.Topic.Name)}, // "events_topic"
if g.config.Function.Trigger != nil {
triggerArgs := &cloudfunctionsv2.FunctionEventTriggerArgs{
EventType: pulumi.String(g.config.Function.Trigger.EventType),
RetryPolicy: pulumi.String("RETRY_POLICY_RETRY"),
TriggerRegion: pulumi.String(g.config.Function.Trigger.Region),
if g.config.Template.Name == "data-pipeline" {
triggerArgs.PubsubTopic = pulumi.String(g.config.Function.Trigger.EventType)
funcArgs.EventTrigger = triggerArgs
if g.config.Function.Build.Source != nil {
bucket, object, _ := g.createBucketForFunction()
buildArgs.Source = &cloudfunctionsv2.FunctionBuildConfigSourceArgs{
StorageSource: &cloudfunctionsv2.FunctionBuildConfigSourceStorageSourceArgs{
Bucket: bucket.Name,
Object: object.Name,
function, err := cloudfunctionsv2.NewFunction(g.ctx, g.config.Function.Name, funcArgs, pulumi.DependsOn([]pulumi.Resource{g.functionSourceBucket, g.functionSourceBucketObj, g.serviceAcc}))
g.function = function
g.ctx.Export("function_url", function.Url)
return err
Creating “bindings” and “members” needed by services on GCP
// configureRolesForFunction creates/binds a member according to given values.
func (g *Gcp) configureRolesForFunction() error {
var err error
for _, role := range g.config.Iam.Roles {
if role.Type == "cloudfuncv2member" {
_, err = cloudfunctionsv2.NewFunctionIamMember(g.ctx, role.Name, &cloudfunctionsv2.FunctionIamMemberArgs{
Project: pulumi.String(g.project), // "pulumitemplate"
Location: pulumi.String(g.region), // "europe-west2"
CloudFunction: pulumi.String(g.config.Function.Name), // ptemplate-proxy
Role: pulumi.String(role.Role),
Member: g.serviceAcc.Email.ApplyT(func(email string) (string, error) {
return fmt.Sprintf("serviceAccount:%v", email), nil
}, pulumi.DependsOn([]pulumi.Resource{g.function}))
} else if role.Type == "pubsubmember" {
_, err = pubsub.NewTopicIAMMember(g.ctx, role.Name, &pubsub.TopicIAMMemberArgs{
Project: pulumi.String(g.project), // "pulumitemplate"
Topic: pulumi.String(g.config.Stream.PubSubConf.Topic.Name), // "events_topic"
Role: pulumi.String(role.Role),
Member: g.serviceAcc.Email.ApplyT(func(email string) (string, error) {
return fmt.Sprintf("serviceAccount:%v", email), nil
}, pulumi.DependsOn([]pulumi.Resource{g.topic}))
} else if role.Type == "cloudrunbinding" {
_, err = cloudrun.NewIamBinding(g.ctx, role.Name, &cloudrun.IamBindingArgs{
Project: pulumi.String(g.project),
Service: pulumi.String(g.config.Function.Name), // ptemplate-proxy
Location: pulumi.String(g.region), // "europe-west2"
Role: pulumi.String(role.Role),
Members: pulumi.StringArray{
g.serviceAcc.Email.ApplyT(func(email string) (string, error) {
return fmt.Sprintf("serviceAccount:%v", email), nil
}, pulumi.DependsOn([]pulumi.Resource{g.function}))
return err
// ConfigureIAM configures the IAM member according to given values.
// You can create the multiple role.
func (g *Gcp) ConfigureIAM() error {
project, _ := organizations.LookupProject(g.ctx, &organizations.LookupProjectArgs{ProjectId: pulumi.StringRef(g.config.Iam.ServiceAcc.Project)}, nil)
var err error
for _, role := range g.config.Iam.Roles {
if role.Type == "bucketmember" {
_, _ = storage.NewBucketIAMMember(g.ctx, role.Name, &storage.BucketIAMMemberArgs{
Bucket: g.bucket.Name,
Role: pulumi.String(role.Role),
Member: pulumi.String(fmt.Sprintf(role.Member, project.Number)),
}, pulumi.DependsOn([]pulumi.Resource{g.bucket}))
} else if role.Type == "projectmember" {
_, _ = projects.NewIAMMember(g.ctx, role.Name, &projects.IAMMemberArgs{
Project: pulumi.String(*project.ProjectId),
Role: pulumi.String(role.Role),
Member: pulumi.String(fmt.Sprintf(role.Member, project.Number)),
return err
Identity Platform
You can create a Oauth client on this link After that you need create a Google provider for authentication. You can add provider on this link
Api Gateway
To create a gateway in Google Cloud Api Gateway service, we need an Openapi schema, we generate this schema according to the data in config.yaml and then show it as a spec file.
represents the Cloud run/function address
represents Google provider for authentication
represents Google Cloud Oauth client
After this step it will be created new endpoint what we defined.
And then we log in with Google and add the incoming id to the Authentication header when sending a request and ensure security.
In order to get a token in the development environment, you can get it by logging in with your Google account by serving the following html somewhere.
<script src="" async defer></script>
function handleCredentialResponse(response) {
<div id="g_id_onload"
<div class="g_id_signin" data-type="standard"></div>
// generateSpecFile generates a yaml file according to given values
// Since oauth2 client cannot be created by Gcloud CLI we need to create Oauth2 Client first
// With this yaml you are able to use Google Authentication while using the function URL
func (g *Gcp) generateSpecFile() error {
spec := types.OpenApiSpec{}
spec.Swagger = "2.0"
spec.Info.Title = g.config.APIGateway.Name
spec.Info.Version = "1.0.0"
spec.SecurityDefinitions.GoogleIDToken.AuthorizationURL = ""
spec.SecurityDefinitions.GoogleIDToken.Flow = "implicit"
spec.SecurityDefinitions.GoogleIDToken.Type = "oauth2"
spec.SecurityDefinitions.GoogleIDToken.XGoogleIssuer = ""
spec.SecurityDefinitions.GoogleIDToken.XGoogleJwksURI = ""
spec.SecurityDefinitions.GoogleIDToken.XGoogleAudiences = g.config.Idp.ClientId
spec.Schemes = []string{"https"}
spec.Produces = []string{"application/json"}
spec.Paths.Event.Post.OperationID = fmt.Sprintf("%v-op", g.config.APIGateway.Name) // "ptemplate-api-op"
spec.Paths.Event.Post.XGoogleBackend.Address = fmt.Sprintf("", g.config.Function.Region, g.config.Iam.ServiceAcc.Project, g.config.Function.Name)
securities := []types.Security{}
securities = append(securities, types.Security{GoogleIDToken: make([]interface{}, 0)})
spec.Paths.Event.Post.Security = securities
spec.Paths.Event.Post.Responses.Num200.Description = "OK"
yamlFile, _ := yaml.Marshal(&spec)
err := os.MkdirAll("api/gcp", 0700)
err = os.WriteFile("api/gcp/api.yaml", yamlFile, 0644)
return err
// CreateApiGateway creates Api and Gateway according to given values
// Before create a Gateway you need to generate spec file
func (g *Gcp) CreateApiGateway() error {
err := g.generateSpecFile()
apiGw, err := apigateway.NewApi(g.ctx, g.config.APIGateway.Name, &apigateway.ApiArgs{
ApiId: pulumi.String(g.config.APIGateway.Name), // "ptemplate-api"
}, pulumi.DependsOn([]pulumi.Resource{g.function}))
if err != nil {
return err
invokeFilebase64, err := std.Filebase64(g.ctx, &std.Filebase64Args{
Input: g.config.APIGateway.OpenApiSpec, // "api/gcp/api.yaml"
}, nil)
apiGwApiConfig, err := apigateway.NewApiConfig(g.ctx, fmt.Sprintf("%v-config", g.config.APIGateway.Name), &apigateway.ApiConfigArgs{
Api: apiGw.ApiId,
ApiConfigId: pulumi.String(fmt.Sprintf("%v-config", g.config.APIGateway.Name)),
OpenapiDocuments: apigateway.ApiConfigOpenapiDocumentArray{
Document: &apigateway.ApiConfigOpenapiDocumentDocumentArgs{
Path: pulumi.String(g.config.APIGateway.OpenApiSpec), // "api/gcp/api.yaml"
Contents: pulumi.String(invokeFilebase64.Result),
}, pulumi.DependsOn([]pulumi.Resource{apiGw}))
_, err = apigateway.NewGateway(g.ctx, fmt.Sprintf("%v-gw", g.config.APIGateway.Name), &apigateway.GatewayArgs{
ApiConfig: apiGwApiConfig.ID(),
GatewayId: pulumi.String(fmt.Sprintf("%v-gw", g.config.APIGateway.Name)), // "ptemplate-api-gw"
Region: pulumi.String(g.config.APIGateway.Region), // "europe-west2"
}, pulumi.DependsOn([]pulumi.Resource{apiGwApiConfig}))
return err
If you want to test it, you can use the following command after pulling the above project to your local machine.
make select-stack stack=datapipeline-pubsub-bigquery-apigateway
make up stack=datapipeline-pubsub-bigquery-apigateway