class LogStash::Inputs::GooglePubSub
This is a github.com/elastic/logstash[Logstash] input plugin for cloud.google.com/pubsub/[Google Pub/Sub]. The plugin can subscribe to a topic and ingest messages.
The main motivation behind the development of this plugin was to ingest cloud.google.com/logging/[Stackdriver Logging] messages via the cloud.google.com/logging/docs/export/using_exported_logs[Exported Logs] feature of Stackdriver Logging.
Prerequisites¶ ↑
You must first create a Google Cloud Platform project and enable the the Google Pub/Sub API. If you intend to use the plugin ingest Stackdriver Logging messages, you must also enable the Stackdriver Logging API and configure log exporting to Pub/Sub. There is plentiful information on cloud.google.com/ to get started:
-
Google Cloud Platform Projects and cloud.google.com/docs/overview/[Overview]
-
Google Cloud Pub/Sub cloud.google.com/pubsub/[documentation]
-
Stackdriver Logging cloud.google.com/logging/[documentation]
Cloud Pub/Sub¶ ↑
Currently, this module requires you to create a `topic` manually and specify it in the logstash config file. You must also specify a `subscription`, but the plugin will attempt to create the pull-based `subscription` on its own.
All messages received from Pub/Sub will be converted to a logstash `event` and added to the processing pipeline queue. All Pub/Sub messages will be `acknowledged` and removed from the Pub/Sub `topic` (please see more about cloud.google.com/pubsub/overview#concepts)[Pub/Sub concepts].
It is generally assumed that incoming messages will be in JSON and added to the logstash `event` as-is. However, if a plain text message is received, the plugin will return the raw text in as `raw_message` in the logstash `event`.
Authentication¶ ↑
You have two options for authentication depending on where you run Logstash.
-
If you are running Logstash outside of Google Cloud Platform, then you will
need to create a Google Cloud Platform Service Account and specify the full path to the JSON private key file in your config. You must assign sufficient roles to the Service Account to create a subscription and to pull messages from the subscription. Learn more about GCP Service Accounts and IAM roles here:
- Google Cloud Platform IAM https://cloud.google.com/iam/[overview] - Creating Service Accounts https://cloud.google.com/iam/docs/creating-managing-service-accounts[overview] - Granting Roles https://cloud.google.com/iam/docs/granting-roles-to-service-accounts[overview]
-
If you are running Logstash on a Google Compute Engine instance, you may opt
to use Application Default Credentials. In this case, you will not need to specify a JSON private key file in your config.
Stackdriver Logging (optional)¶ ↑
If you intend to use the logstash plugin for Stackdriver Logging message ingestion, you must first manually set up the Export option to Cloud Pub/Sub and the manually create the `topic`. Please see the more detailed instructions at, cloud.google.com/logging/docs/export/using_exported_logs [Exported Logs] and ensure that the cloud.google.com/logging/docs/export/configure_export#manual-access-pubsub[necessary permissions] have also been manually configured.
Logging messages from Stackdriver Logging exported to Pub/Sub are received as JSON and converted to a logstash `event` as-is in cloud.google.com/logging/docs/export/using_exported_logs#log_entries_in_google_pubsub_topics[this format].
Sample Configuration¶ ↑
Below is a copy of the included `example.conf-tmpl` file that shows a basic configuration for this plugin.
- source,ruby
input {
google_pubsub { # Your GCP project id (name) project_id => "my-project-1234" # The topic name below is currently hard-coded in the plugin. You # must first create this topic by hand and ensure you are exporting # logging to this pubsub topic. topic => "logstash-input-dev" # The subscription name is customizeable. The plugin will attempt to # create the subscription (but use the hard-coded topic name above). subscription => "logstash-sub" # If you are running logstash within GCE, it will use # Application Default Credentials and use GCE's metadata # service to fetch tokens. However, if you are running logstash # outside of GCE, you will need to specify the service account's # JSON key file below. #json_key_file => "/home/erjohnso/pkey.json" }
} output { stdout { codec => rubydebug } }
Metadata and Attributes¶ ↑
The original Pub/Sub message is preserved in the special Logstash `[@metadata]` field so you can fetch:
-
Message attributes
-
The origiginal base64 data
-
Pub/Sub message ID for de-duplication
-
Publish time
You MUST extract any fields you want in a filter prior to the data being sent to an output because Logstash deletes `@metadata` fields otherwise.
See the PubsubMessage cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage[documentation] for a full description of the fields.
Example to get the message ID:
- source,ruby
input {google_pubsub {…}}
filter {
mutate { add_field => { "messageId" => "%{[@metadata][pubsub_message][messageId]}" } }
}
output {…}
-
Constants
- BATCHED_RECORD_SEPARATOR
- COMPRESSION_ALGORITHM_ZLIB
Public Instance Methods
# File lib/logstash/inputs/google_pubsub.rb, line 325 def extract_metadata(java_message) { data: java_message.getData().toStringUtf8(), attributes: java_message.getAttributesMap(), messageId: java_message.getMessageId(), publishTime: Timestamps.toString(java_message.getPublishTime()) } end
# File lib/logstash/inputs/google_pubsub.rb, line 228 def register @logger.debug("Registering Google PubSub Input: project_id=#{@project_id}, topic=#{@topic}, subscription=#{@subscription}") @subscription_id = "projects/#{@project_id}/subscriptions/#{@subscription}" if @json_key_file @credentialsProvider = FixedCredentialsProvider.create( ServiceAccountCredentials.fromStream(java.io.FileInputStream.new(@json_key_file)) ) end @topic_name = ProjectTopicName.of(@project_id, @topic) @subscription_name = ProjectSubscriptionName.of(@project_id, @subscription) end
# File lib/logstash/inputs/google_pubsub.rb, line 245 def run(queue) # Attempt to create the subscription if @create_subscription @logger.debug("Creating subscription #{@subscription_id}") subscriptionAdminClient = SubscriptionAdminClient.create begin subscriptionAdminClient.createSubscription(@subscription_name, @topic_name, PushConfig.getDefaultInstance(), 0) rescue @logger.info("Subscription already exists") end end @logger.debug("Pulling messages from sub '#{@subscription_id}'") handler = MessageReceiver.new do |message| # handle incoming message, then ack/nack the received message algorithm = message.getAttributesMap()["compression_algorithm"] case algorithm when nil data = message.getData().toStringUtf8() @codec.decode(data) do |event| event.set("host", event.get("host") || @host) event.set("[@metadata][pubsub_message]", extract_metadata(message)) if @include_metadata decorate(event) queue << event end when COMPRESSION_ALGORITHM_ZLIB begin data = message.getData().toByteArray() # decompress batch bais = java.io.ByteArrayInputStream.new(data) iis = java.util.zip.InflaterInputStream.new(bais) result = "" buf = Java::byte[@inflate_buffer_size].new rlen = -1 while (rlen = iis.read(buf)) != -1 do result += java.lang.String.new(java.util.Arrays.copyOf(buf, rlen), "UTF-8") end # split into multiple events lines = result.split(BATCHED_RECORD_SEPARATOR) lines.each do |line| event = LogStash::Event.new("message" => line) event.set("host", event.get("host") || @host) event.set("[@metadata][pubsub_message]", extract_metadata(message)) if @include_metadata decorate(event) queue << event end rescue java.util.zip.DataFormatException, java.util.zip.ZipException => e @logger.error(e.backtrace.join("\n")) raise end else @logger.error("unknown compression algorithm: '#{algorithm}'") raise ArgumentError, "unknown compression algorithm: '#{algorithm}'" end end listener = SubscriberListener.new do |from, failure| @logger.error("#{failure}") raise failure end flowControlSettings = FlowControlSettings.newBuilder().setMaxOutstandingElementCount(@max_messages).build() executorProvider = InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build() subscriberBuilder = Subscriber.newBuilder(@subscription_name, handler) .setFlowControlSettings(flowControlSettings) .setExecutorProvider(executorProvider) .setParallelPullCount(1) if @credentialsProvider subscriberBuilder.setCredentialsProvider(@credentialsProvider) end @subscriber = subscriberBuilder.build() @subscriber.addListener(listener, MoreExecutors.directExecutor()) @subscriber.startAsync() @subscriber.awaitTerminated() end
# File lib/logstash/inputs/google_pubsub.rb, line 241 def stop @subscriber.stopAsync().awaitTerminated() if @subscriber != nil end