module Octo::Message::MessageParser

Parsing kafka messages for octo consumer

Public Instance Methods

parse(msg) click to toggle source

Parsing Message hash in Octo compatible form @param [Hash] Message Hash @return [Hash] Hash in Octo form

# File lib/octocore/message_parser.rb, line 13
def parse(msg)
  msg = JSON.parse(msg)
  m = { event_name: msg['event_name'] }
  case msg['event_name']
  when 'funnel_update'
    m.merge!({
              rediskey: msg['rediskey']
            })
  when 'update.profile'
    m.merge!({
              profileDetails: msg['profileDetails']
            })
  when 'page.view'
    m.merge!({
              routeUrl:     msg['routeUrl'],
              categories:   msg.fetch('categories', []),
              tags:         msg.fetch('tags', [])
            })
  when 'productpage.view'
    m.merge!({
              routeUrl:     msg['routeUrl'],
              categories:   msg.fetch('categories', []),
              tags:         msg.fetch('tags', []),
              productId:    msg['productId'],
              productName:  msg['productName'],
              price:        msg['price']
            })
  when 'update.push_token'
    m.merge!({
              pushType:     msg['notificationType'],
              pushKey:      msg['pushKey'],
              pushToken:    msg['pushToken']
            })
  end
  enterprise = msg['enterprise']
  raise StandardError, 'Parse Error' if enterprise.nil?

  eid = if enterprise.has_key?'custom_id'
          enterprise['custom_id']
        elsif enterprise.has_key?'customId'
          enterprise['customId']
        end

  ename = if enterprise.has_key?'user_name'
            enterprise['user_name']
          elsif enterprise.has_key?'userName'
            enterprise['userName']
          else
            nil
          end
  m.merge!({
    id:             msg.fetch('uuid', nil),
    enterpriseId:   eid,
    enterpriseName: ename,
    phone:          msg.fetch('phoneDetails', nil),
    browser:        msg.fetch('browserDetails', nil),
    userId:         msg.fetch('userId', -1),
    created_at:     Time.now
  })

  m
end