class LogStash::Inputs::GooglePubSub

This is a github.com/elastic/logstash[Logstash] input plugin for cloud.google.com/pubsub/[Google Pub/Sub]. The plugin can subscribe to a topic and ingest messages.

The main motivation behind the development of this plugin was to ingest cloud.google.com/logging/[Stackdriver Logging] messages via the cloud.google.com/logging/docs/export/using_exported_logs[Exported Logs] feature of Stackdriver Logging.

Prerequisites

You must first create a Google Cloud Platform project and enable the the Google Pub/Sub API. If you intend to use the plugin ingest Stackdriver Logging messages, you must also enable the Stackdriver Logging API and configure log exporting to Pub/Sub. There is plentiful information on cloud.google.com/ to get started:

Cloud Pub/Sub

Currently, this module requires you to create a `topic` manually and specify it in the logstash config file. You must also specify a `subscription`, but the plugin will attempt to create the pull-based `subscription` on its own.

All messages received from Pub/Sub will be converted to a logstash `event` and added to the processing pipeline queue. All Pub/Sub messages will be `acknowledged` and removed from the Pub/Sub `topic` (please see more about cloud.google.com/pubsub/overview#concepts)[Pub/Sub concepts].

It is generally assumed that incoming messages will be in JSON and added to the logstash `event` as-is. However, if a plain text message is received, the plugin will return the raw text in as `raw_message` in the logstash `event`.

Authentication

You have two options for authentication depending on where you run Logstash.

  1. If you are running Logstash outside of Google Cloud Platform, then you will

need to create a Google Cloud Platform Service Account and specify the full path to the JSON private key file in your config. You must assign sufficient roles to the Service Account to create a subscription and to pull messages from the subscription. Learn more about GCP Service Accounts and IAM roles here:

- Google Cloud Platform IAM https://cloud.google.com/iam/[overview]
- Creating Service Accounts https://cloud.google.com/iam/docs/creating-managing-service-accounts[overview]
- Granting Roles https://cloud.google.com/iam/docs/granting-roles-to-service-accounts[overview]
  1. If you are running Logstash on a Google Compute Engine instance, you may opt

to use Application Default Credentials. In this case, you will not need to specify a JSON private key file in your config.

Stackdriver Logging (optional)

If you intend to use the logstash plugin for Stackdriver Logging message ingestion, you must first manually set up the Export option to Cloud Pub/Sub and the manually create the `topic`. Please see the more detailed instructions at, cloud.google.com/logging/docs/export/using_exported_logs [Exported Logs] and ensure that the cloud.google.com/logging/docs/export/configure_export#manual-access-pubsub[necessary permissions] have also been manually configured.

Logging messages from Stackdriver Logging exported to Pub/Sub are received as JSON and converted to a logstash `event` as-is in cloud.google.com/logging/docs/export/using_exported_logs#log_entries_in_google_pubsub_topics[this format].

Sample Configuration

Below is a copy of the included `example.conf-tmpl` file that shows a basic configuration for this plugin.

source,ruby

input {

google_pubsub {
    # Your GCP project id (name)
    project_id => "my-project-1234"

    # The topic name below is currently hard-coded in the plugin. You
    # must first create this topic by hand and ensure you are exporting
    # logging to this pubsub topic.
    topic => "logstash-input-dev"

    # The subscription name is customizeable. The plugin will attempt to
    # create the subscription (but use the hard-coded topic name above).
    subscription => "logstash-sub"

    # If you are running logstash within GCE, it will use
    # Application Default Credentials and use GCE's metadata
    # service to fetch tokens.  However, if you are running logstash
    # outside of GCE, you will need to specify the service account's
    # JSON key file below.
    #json_key_file => "/home/erjohnso/pkey.json"
}

} output { stdout { codec => rubydebug } }


Metadata and Attributes

The original Pub/Sub message is preserved in the special Logstash `[@metadata]` field so you can fetch:

  • Message attributes

  • The origiginal base64 data

  • Pub/Sub message ID for de-duplication

  • Publish time

You MUST extract any fields you want in a filter prior to the data being sent to an output because Logstash deletes `@metadata` fields otherwise.

See the PubsubMessage cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage[documentation] for a full description of the fields.

Example to get the message ID:

source,ruby

input {google_pubsub {…}}

filter {

mutate {
  add_field => { "messageId" => "%{[@metadata][pubsub_message][messageId]}" }
}

}

output {…}


Constants

BATCHED_RECORD_SEPARATOR
COMPRESSION_ALGORITHM_ZLIB

Public Instance Methods

extract_metadata(java_message) click to toggle source
# File lib/logstash/inputs/google_pubsub.rb, line 325
def extract_metadata(java_message)
  {
    data: java_message.getData().toStringUtf8(),
    attributes: java_message.getAttributesMap(),
    messageId: java_message.getMessageId(),
    publishTime: Timestamps.toString(java_message.getPublishTime())
  }
end
register() click to toggle source
# File lib/logstash/inputs/google_pubsub.rb, line 228
def register
  @logger.debug("Registering Google PubSub Input: project_id=#{@project_id}, topic=#{@topic}, subscription=#{@subscription}")
  @subscription_id = "projects/#{@project_id}/subscriptions/#{@subscription}"

  if @json_key_file
    @credentialsProvider = FixedCredentialsProvider.create(
      ServiceAccountCredentials.fromStream(java.io.FileInputStream.new(@json_key_file))
    )
  end
  @topic_name = ProjectTopicName.of(@project_id, @topic)
  @subscription_name = ProjectSubscriptionName.of(@project_id, @subscription)
end
run(queue) click to toggle source
# File lib/logstash/inputs/google_pubsub.rb, line 245
def run(queue)
  # Attempt to create the subscription
  if @create_subscription
    @logger.debug("Creating subscription #{@subscription_id}")
    subscriptionAdminClient = SubscriptionAdminClient.create
    begin
      subscriptionAdminClient.createSubscription(@subscription_name, @topic_name, PushConfig.getDefaultInstance(), 0)
    rescue
      @logger.info("Subscription already exists")
    end
  end

  @logger.debug("Pulling messages from sub '#{@subscription_id}'")
  handler = MessageReceiver.new do |message|
    # handle incoming message, then ack/nack the received message
    algorithm = message.getAttributesMap()["compression_algorithm"]

    case algorithm
    when nil
      data = message.getData().toStringUtf8()
      @codec.decode(data) do |event|
        event.set("host", event.get("host") || @host)
        event.set("[@metadata][pubsub_message]", extract_metadata(message)) if @include_metadata
        decorate(event)
        queue << event
      end
    when COMPRESSION_ALGORITHM_ZLIB
      begin
        data = message.getData().toByteArray()

        # decompress batch
        bais = java.io.ByteArrayInputStream.new(data)
        iis = java.util.zip.InflaterInputStream.new(bais)

        result = ""
        buf = Java::byte[@inflate_buffer_size].new
        rlen = -1

        while (rlen = iis.read(buf)) != -1 do
          result += java.lang.String.new(java.util.Arrays.copyOf(buf, rlen), "UTF-8")
        end

        # split into multiple events
        lines = result.split(BATCHED_RECORD_SEPARATOR)
        lines.each do |line|
          event = LogStash::Event.new("message" => line)
          event.set("host", event.get("host") || @host)
          event.set("[@metadata][pubsub_message]", extract_metadata(message)) if @include_metadata
          decorate(event)
          queue << event
        end
      rescue java.util.zip.DataFormatException, java.util.zip.ZipException => e
        @logger.error(e.backtrace.join("\n"))
        raise
      end
    else
      @logger.error("unknown compression algorithm: '#{algorithm}'")
      raise ArgumentError, "unknown compression algorithm: '#{algorithm}'"
    end
  end
  listener = SubscriberListener.new do |from, failure|
    @logger.error("#{failure}")
    raise failure
  end
  flowControlSettings = FlowControlSettings.newBuilder().setMaxOutstandingElementCount(@max_messages).build()
  executorProvider = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build()
  subscriberBuilder = Subscriber.newBuilder(@subscription_name, handler)
    .setFlowControlSettings(flowControlSettings)
    .setExecutorProvider(executorProvider)
    .setParallelPullCount(1)

  if @credentialsProvider
    subscriberBuilder.setCredentialsProvider(@credentialsProvider)
  end
  @subscriber = subscriberBuilder.build()
  @subscriber.addListener(listener, MoreExecutors.directExecutor())
  @subscriber.startAsync()
  @subscriber.awaitTerminated()
end
stop() click to toggle source
# File lib/logstash/inputs/google_pubsub.rb, line 241
def stop
  @subscriber.stopAsync().awaitTerminated() if @subscriber != nil
end