class ConceptQL::Operators::Operator

Attributes

validations[RW]
errors[RW]
nodifier[RW]

Public Class Methods

codes_should_match(format) click to toggle source
# File lib/conceptql/operators/operator.rb, line 128
def codes_should_match(format)
  @codes_regexp = format
  validate_codes_match
end
default_query_columns() click to toggle source
# File lib/conceptql/operators/operator.rb, line 96
def default_query_columns
  define_method(:query_cols) do
    dynamic_columns
  end
end
inherited(subclass) click to toggle source
Calls superclass method
# File lib/conceptql/operators/operator.rb, line 133
def inherited(subclass)
  super
  subclass.instance_variable_set(:@validations, validations.dup)
  subclass.instance_variable_set(:@codes_regexp, codes_regexp.dup) if codes_regexp
end
new(*) click to toggle source
Calls superclass method
# File lib/conceptql/operators/operator.rb, line 139
def new(*)
  operator = super

  # If operator has a label, replace it with a recall so all references
  # to it use the same code.
  if operator.label && !operator.errors
    operator.scope.add_operator(operator)
    operator = Operators::Recall.new(operator.nodifier, operator.label, replaced: true)
  end

  operator
end
new(nodifier, *args) click to toggle source
# File lib/conceptql/operators/operator.rb, line 153
def initialize(nodifier, *args)
  @nodifier = nodifier
  @options = {}
  while args.last.is_a?(Hash)
    @options = @options.merge(args.extract_options!.deep_rekey)
  end
  args.reject!{|arg| arg.nil? || arg == ''}
  @upstreams, @arguments = args.partition { |arg| arg.is_a?(Array) || arg.is_a?(Operator) }
  @values = args

  scope.nest(self) do
    create_upstreams
  end
end
query_columns(*tables) click to toggle source
# File lib/conceptql/operators/operator.rb, line 90
def query_columns(*tables)
  define_method(:query_cols) do
    table_columns(*tables)
  end
end
register(file, *data_models) click to toggle source
# File lib/conceptql/operators/operator.rb, line 83
def register(file, *data_models)
  data_models = OPERATORS.keys if data_models.empty?
  data_models.each do |dm|
    OPERATORS[dm][File.basename(file).sub(/\.rb\z/, '')] = self
  end
end
require_column(column) click to toggle source
# File lib/conceptql/operators/operator.rb, line 102
def require_column(column)
  @required_columns ||= []
  @required_columns << column
end

Public Instance Methods

annotate(db, opts = {}) click to toggle source
# File lib/conceptql/operators/operator.rb, line 188
def annotate(db, opts = {})
  return @annotation if defined?(@annotation)

  scope_key = options[:id]||self.class.just_class_name.underscore
  annotation = {}
  counts = (annotation[:counts] ||= {})
  metadata = {:annotation=>annotation}
  if name = self.class.preferred_name
    metadata[:name] = name
  end
  res = [operator_name, *annotate_values(db, opts)]

  if upstreams_valid?(db, opts) && scope.valid? && include_counts?(db, opts)
    scope.with_ctes(evaluate(db), db)
      .from_self
      .select_group(:criterion_domain)
      .select_append{count{}.*.as(:rows)}
      .select_append{count(:person_id).distinct.as(:n)}
      .each do |h|
        counts[h.delete(:criterion_domain).to_sym] = h
      end
  elsif !errors.empty?
    annotation[:errors] = errors
    scope.add_errors(scope_key, errors)
  end
  scope.add_operators(self)
  domains(db).each do |domain|
    cur_counts = counts[domain] ||= {:rows=>0, :n=>0}
    scope.add_counts(scope_key, domain, cur_counts)
  end

  if defined?(@warnings) && !warnings.empty?
    annotation[:warnings] = warnings
    scope.add_warnings(scope_key, warnings)
  end

  if res.last.is_a?(Hash)
    res.last.merge!(metadata)
  else
    res << metadata
  end

  @annotation = res
end
cast_column(column, value = nil) click to toggle source
# File lib/conceptql/operators/operator.rb, line 339
def cast_column(column, value = nil)
  type = Scope::COLUMN_TYPES.fetch(column)
  case type
  when String, :String
    Sequel.cast_string(value).as(column)
  when Date, :Date
    Sequel.cast(value, type).as(column)
  when Float, :Bigint, :Float
    Sequel.cast_numeric(value, type).as(column)
  else
    raise "Unexpected type: '#{type.inspect}' for column: '#{column}'"
  end
end
code_list(db) click to toggle source
# File lib/conceptql/operators/operator.rb, line 233
def code_list(db)
  code_lists = @upstreams.map do | upstream_op |
    upstream_op.code_list(db)
  end
  code_lists.flatten(1)
end
columns(query, local_domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 291
def columns(query, local_domain)
  criterion_domain = :criterion_domain

  if local_domain
    criterion_domain = cast_column(:criterion_domain, local_domain.to_s)
  end

  columns = [:person_id,
              domain_id(local_domain),
              criterion_domain]
  columns += date_columns(query, local_domain)
  columns += [ source_value(query, local_domain) ]
  columns += additional_columns(query, local_domain)
end
create_upstreams() click to toggle source
# File lib/conceptql/operators/operator.rb, line 168
def create_upstreams
  @upstreams.map!{|stmt| to_op(stmt)}
end
data_model() click to toggle source
# File lib/conceptql/operators/operator.rb, line 331
def data_model
  nodifier.data_model
end
database_type() click to toggle source
# File lib/conceptql/operators/operator.rb, line 335
def database_type
  nodifier.database_type
end
domains(db) click to toggle source
# File lib/conceptql/operators/operator.rb, line 278
def domains(db)
  @domains ||= determine_domains(db)
end
dup_values(args) click to toggle source
# File lib/conceptql/operators/operator.rb, line 240
def dup_values(args)
  self.class.new(nodifier, *args)
end
dynamic_columns() click to toggle source
# File lib/conceptql/operators/operator.rb, line 184
def dynamic_columns
  scope.query_columns
end
evaluate(db) click to toggle source
# File lib/conceptql/operators/operator.rb, line 248
def evaluate(db)
  select_it(query(db))
end
inspect() click to toggle source
# File lib/conceptql/operators/operator.rb, line 244
def inspect
  "<##{self.class} upstreams=[#{upstreams.map(&:inspect).join(', ')}] arguments=[#{arguments.map(&:inspect).join(', ')}]>"
end
label() click to toggle source
# File lib/conceptql/operators/operator.rb, line 306
def label
  @label ||= begin
    options.delete(:label) if options[:label] && options[:label].to_s.strip.empty?
    options[:label].respond_to?(:strip) ? options[:label].strip : options[:label]
  end
end
operator_name() click to toggle source
# File lib/conceptql/operators/operator.rb, line 176
def operator_name
  self.class.just_class_name.underscore
end
optimized() click to toggle source
# File lib/conceptql/operators/operator.rb, line 256
def optimized
  dup_values(values.map{|x| x.is_a?(Operator) ? x.optimized : x})
end
required_columns() click to toggle source
# File lib/conceptql/operators/operator.rb, line 180
def required_columns
  self.class.required_columns
end
scope() click to toggle source
# File lib/conceptql/operators/operator.rb, line 327
def scope
  nodifier.scope
end
select_it(query, specific_domain = nil) click to toggle source
# File lib/conceptql/operators/operator.rb, line 264
def select_it(query, specific_domain = nil)
  if specific_domain.nil? && respond_to?(:domain) && TABLE_COLUMNS.keys.include?(domain)
    specific_domain = domain
  end

  q = setup_select(query, specific_domain)

  if scope && scope.person_ids && upstreams.empty?
    q = q.where(person_id: scope.person_ids).from_self
  end

  q
end
setup_select(query, local_domain = nil) click to toggle source
# File lib/conceptql/operators/operator.rb, line 286
def setup_select(query, local_domain = nil)
  query = modify_query(query, local_domain)
  query.select(*columns(query, local_domain))
end
sql(db) click to toggle source
# File lib/conceptql/operators/operator.rb, line 252
def sql(db)
  evaluate(db).sql
end
stream() click to toggle source
# File lib/conceptql/operators/operator.rb, line 282
def stream
  @stream ||= upstreams.first
end
to_op(stmt) click to toggle source
# File lib/conceptql/operators/operator.rb, line 172
def to_op(stmt)
  stmt.is_a?(Operator) ? stmt : nodifier.create(*stmt)
end
unionable?(other) click to toggle source
# File lib/conceptql/operators/operator.rb, line 260
def unionable?(other)
  false
end
upstreams_valid?(db, opts = {}) click to toggle source
# File lib/conceptql/operators/operator.rb, line 323
def upstreams_valid?(db, opts = {})
    valid?(db, opts) && upstreams.all?{|u| u.upstreams_valid?(db, opts)}
end
valid?(db, opts = {}) click to toggle source
# File lib/conceptql/operators/operator.rb, line 315
def valid?(db, opts = {})
  return @errors.empty? if defined?(@errors)
  @errors = []
  @warnings = []
  validate(db, opts)
  errors.empty?
end

Private Instance Methods

add_error(*args) click to toggle source
# File lib/conceptql/operators/operator.rb, line 733
def add_error(*args)
  errors << args
end
add_warning(*args) click to toggle source
# File lib/conceptql/operators/operator.rb, line 737
def add_warning(*args)
  warnings << args
end
add_warnings?(db, opts = {}) click to toggle source
# File lib/conceptql/operators/operator.rb, line 729
def add_warnings?(db, opts = {})
  @errors.empty? && db && db.adapter_scheme != :mock && !opts[:skip_db]
end
additional_columns(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 436
def additional_columns(query, domain)
  special_columns = {
    provenance_type: Proc.new { provenance_type(query, domain) },
    provider_id: Proc.new { provider_id(query, domain) },
    place_of_service_concept_id: Proc.new { place_of_service_concept_id(query, domain) }
  }

  additional_cols = special_columns.each_with_object([]) do |(column, proc_obj), columns|
    columns << proc_obj.call if dynamic_columns.include?(column)
  end

  standard_columns = dynamic_columns - Scope::DEFAULT_COLUMNS.keys
  standard_columns -= special_columns.keys

  standard_columns.each do |column|
    additional_cols << if query_columns(query).include?(column)
      column
    else
      cast_column(column)
    end
  end

  additional_cols
end
annotate_values(db, opts) click to toggle source
# File lib/conceptql/operators/operator.rb, line 355
def annotate_values(db, opts)
  (upstreams.map { |op| op.annotate(db, opts) } + arguments).push(options)
end
arguments_fix(db, args = nil) click to toggle source
# File lib/conceptql/operators/operator.rb, line 745
def arguments_fix(db, args = nil)
  args ||= arguments
  return args unless needs_arguments_cte?(args)
  args = args.dup
  first_arg = Sequel.expr(args.shift).as(:arg)
  args.unshift(first_arg)
  args = args.map { |v| [v] }
  args_cte = db.values(args)
  db[:args]
    .with(:args, args_cte)
    .select(:arg)
end
assemble_date(query, *symbols) click to toggle source
# File lib/conceptql/operators/operator.rb, line 606
def assemble_date(query, *symbols)
  strings = symbols.map do |symbol|
    sub = '2000'
    col = Sequel.cast_string(symbol)
    if symbol != :year_of_birth
      sub = '01'
      col = Sequel.function(:lpad, col, 2, '0')
    end
    Sequel.function(:coalesce, col, Sequel.expr(sub))
  end

  strings_with_dashes = strings.zip(['-'] * (symbols.length - 1)).flatten.compact
  concatted_strings = Sequel.join(strings_with_dashes)

  date = concatted_strings
  if query.db.database_type == :impala
    date = Sequel.cast(Sequel.function(:concat_ws, '-', *strings), DateTime)
  end
  cast_date(query.db, date)
end
bad_arguments() click to toggle source
# File lib/conceptql/operators/operator.rb, line 716
def bad_arguments
  return [] unless self.class.codes_regexp
  @bad_arguments ||= arguments.reject do |arg|
    self.class.codes_regexp === arg
  end
end
cast_date(db, date) click to toggle source
# File lib/conceptql/operators/operator.rb, line 627
def cast_date(db, date)
  case db.database_type
  when :oracle
    Sequel.function(:to_date, date, 'YYYY-MM-DD')
  when :mssql
    Sequel.lit('CONVERT(DATETIME, ?)', date)
  else
    Sequel.cast(date, Date)
  end
end
criterion_id() click to toggle source
# File lib/conceptql/operators/operator.rb, line 359
def criterion_id
  :criterion_id
end
date_columns(query, domain = nil) click to toggle source
# File lib/conceptql/operators/operator.rb, line 481
def date_columns(query, domain = nil)
  return [:start_date, :end_date] if (query_columns(query).include?(:start_date) && query_columns(query).include?(:end_date))
  return [:start_date, :end_date] unless domain

  date_klass = Date
  if query.db.database_type == :impala
    date_klass = DateTime
  end

  sd = start_date_column(query, domain)
  sd = Sequel.cast(Sequel.expr(sd), date_klass).as(:start_date) unless sd == :start_date
  ed = end_date_column(query, domain)
  ed = Sequel.cast(Sequel.function(:coalesce, Sequel.expr(ed), start_date_column(query, domain)), date_klass).as(:end_date) unless ed == :end_date
  [sd, ed]
end
determine_domains(db) click to toggle source
# File lib/conceptql/operators/operator.rb, line 638
def determine_domains(db)
  if upstreams.empty?
    if respond_to?(:domain)
      [domain]
    else
      [:invalid]
    end
  else
    doms = upstreams.compact.flat_map { |up| up.domains(db) }.uniq
    doms.empty? ? [:invalid] : doms
  end
end
domain_id(domain = nil) click to toggle source
# File lib/conceptql/operators/operator.rb, line 363
def domain_id(domain = nil)
  return :criterion_id if domain.nil?
  domain = :person if domain == :death
  Sequel.expr(make_domain_id(domain)).as(:criterion_id)
end
end_date_column(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 516
def end_date_column(query, domain)
  {
    condition_occurrence: :condition_end_date,
    death: :death_date,
    drug_exposure: :drug_exposure_end_date,
    drug_cost: nil,
    payer_plan_period: :payer_plan_period_end_date,
    person: person_date_of_birth(query),
    procedure_occurrence: :procedure_date,
    procedure_cost: nil,
    observation: :observation_date,
    observation_period: :observation_period_end_date,
    visit_occurrence: :visit_end_date
  }[domain]
end
impala?() click to toggle source
# File lib/conceptql/operators/operator.rb, line 403
def impala?
  database_type.to_sym == :impala
end
include_counts?(db, opts) click to toggle source
# File lib/conceptql/operators/operator.rb, line 758
def include_counts?(db, opts)
  db && !opts[:skip_db] && !opts[:skip_counts]
end
make_domain_id(domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 369
def make_domain_id(domain)
  (domain.to_s + '_id').to_sym
end
make_table_name(table) click to toggle source
# File lib/conceptql/operators/operator.rb, line 373
def make_table_name(table)
  "#{table}___tab".to_sym
end
modify_query(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 586
def modify_query(query, domain)
  {
    place_of_service_concept_id: ConceptQL::QueryModifiers::PoSQueryModifier,
    drug_name: ConceptQL::QueryModifiers::DrugQueryModifier
  }.each do |column, klass|
    #p [domain, column, table, join_id, source_column]
    #p dynamic_columns
    #p query_cols
    next if domain.nil?
    next unless dynamic_columns.include?(column)
    query = klass.new(query, self).modified_query
  end

  query
end
needs_arguments_cte?(args) click to toggle source
# File lib/conceptql/operators/operator.rb, line 741
def needs_arguments_cte?(args)
  impala? && arguments.length > 5000
end
omopv4?() click to toggle source
# File lib/conceptql/operators/operator.rb, line 399
def omopv4?
  data_model == :omopv4
end
omopv4_plus?() click to toggle source
# File lib/conceptql/operators/operator.rb, line 395
def omopv4_plus?
  data_model == :omopv4_plus
end
person_date_of_birth(query) click to toggle source
# File lib/conceptql/operators/operator.rb, line 602
def person_date_of_birth(query)
  assemble_date(query, :year_of_birth, :month_of_birth, :day_of_birth)
end
place_of_service_concept_id(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 476
def place_of_service_concept_id(query, domain)
  return :place_of_service_concept_id if query_columns(query).include?(:place_of_service_concept_id)
  cast_column(:place_of_service_concept_id, place_of_service_concept_id_column(query, domain))
end
place_of_service_concept_id_column(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 580
def place_of_service_concept_id_column(query, domain)
  return nil if domain.nil?
  return :place_of_service_concept_id if table_cols(domain).include?(:visit_occurrence_id)
  return nil
end
provenance_type(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 466
def provenance_type(query, domain)
  return :provenance_type if query_columns(query).include?(:provenance_type)
  cast_column(:provenance_type, provenance_type_column(query, domain))
end
provenance_type_column(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 548
def provenance_type_column(query, domain)
  {
    condition_occurrence: :condition_type_concept_id,
    death: :death_type_concept_id,
    drug_exposure: :drug_type_concept_id,
    observation: :observation_type_concept_id,
    procedure_occurrence: :procedure_type_concept_id
  }[domain]
end
provider_id(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 471
def provider_id(query, domain)
  return :provider_id if query_columns(query).include?(:provider_id)
  cast_column(:provider_id, provider_id_column(query, domain))
end
provider_id_column(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 568
def provider_id_column(query, domain)
  {
    condition_occurrence: :associated_provider_id,
    death: :death_type_concept_id,
    drug_exposure: :prescribing_provider_id,
    observation: :associated_provider_id,
    person: :provider_id,
    procedure_occurrence: :associated_provider_id,
    provider: :provider_id
  }[domain]
end
query_cols() click to toggle source
# File lib/conceptql/operators/operator.rb, line 377
def query_cols
  raise NotImplementedError, self
end
query_columns(query) click to toggle source
# File lib/conceptql/operators/operator.rb, line 381
def query_columns(query)
  unless cols = query.opts[:force_columns]
    cols = query_cols
  end

  if ENV['CONCEPTQL_CHECK_COLUMNS']
    if cols.sort != query.columns.sort
      raise "columns don't match:\nclass: #{self.class}\nexpected: #{cols}\nactual: #{query.columns}\nvalues: #{values}\nSQL: #{query.sql}"
    end
  end

  cols
end
source_value(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 461
def source_value(query, domain)
  return :source_value if query_columns(query).include?(:source_value)
  cast_column(:source_value, source_value_column(query, domain))
end
source_value_column(query, domain) click to toggle source
# File lib/conceptql/operators/operator.rb, line 532
def source_value_column(query, domain)
  {
    condition_occurrence: :condition_source_value,
    death: :cause_of_death_source_value,
    drug_exposure: :drug_source_value,
    drug_cost: nil,
    payer_plan_period: :payer_plan_period_source_value,
    person: :person_source_value,
    procedure_occurrence: :procedure_source_value,
    procedure_cost: nil,
    observation: :observation_source_value,
    observation_period: nil,
    visit_occurrence: :place_of_service_source_value
  }[domain]
end
start_date_column(query, domain) click to toggle source

TODO: Move these hashes into a configuration file THey are OMOP-specific bits of information and need to be abstracted away

# File lib/conceptql/operators/operator.rb, line 500
def start_date_column(query, domain)
  {
    condition_occurrence: :condition_start_date,
    death: :death_date,
    drug_exposure: :drug_exposure_start_date,
    drug_cost: nil,
    payer_plan_period: :payer_plan_period_start_date,
    person: person_date_of_birth(query),
    procedure_occurrence: :procedure_date,
    procedure_cost: nil,
    observation: :observation_date,
    observation_period: :observation_period_start_date,
    visit_occurrence: :visit_start_date
  }[domain]
end
table_cols(table) click to toggle source
# File lib/conceptql/operators/operator.rb, line 415
def table_cols(table)
  table = table_to_sym(table)
  cols = TABLE_COLUMNS.fetch(table)
  if omopv4_plus?
    cols += Array(table_vocabulary_id(table))
  end
  cols
end
table_columns(*tables) click to toggle source
# File lib/conceptql/operators/operator.rb, line 424
def table_columns(*tables)
  tables.map{|t| table_cols(t)}.flatten
end
table_source_value(table) click to toggle source
# File lib/conceptql/operators/operator.rb, line 428
def table_source_value(table)
  TABLE_SOURCE_VALUE_COLUMN.fetch(table_to_sym(table))
end
table_to_sym(table) click to toggle source
# File lib/conceptql/operators/operator.rb, line 407
def table_to_sym(table)
  case table
  when Symbol
    table = Sequel.split_symbol(table)[1].to_sym
  end
  table
end
table_vocabulary_id(table) click to toggle source
# File lib/conceptql/operators/operator.rb, line 432
def table_vocabulary_id(table)
  TABLE_VOCABULARY_ID_COLUMN[table_to_sym(table)]
end
upstream_operator_names() click to toggle source

Validation Related

# File lib/conceptql/operators/operator.rb, line 653
def upstream_operator_names
  @upstreams.map(&:operator_name)
end
validate(db, opts = {}) click to toggle source
# File lib/conceptql/operators/operator.rb, line 657
def validate(db, opts = {})
  add_error("invalid label") if label && !label.is_a?(String)
  self.class.validations.each do |args|
    send(*args)
  end
end
validate_at_least_one_argument() click to toggle source
# File lib/conceptql/operators/operator.rb, line 694
def validate_at_least_one_argument
  add_error("has no arguments") if @arguments.empty?
end
validate_at_least_one_upstream() click to toggle source
# File lib/conceptql/operators/operator.rb, line 677
def validate_at_least_one_upstream
  add_error("has no upstream") if @upstreams.empty?
end
validate_at_most_one_argument() click to toggle source
# File lib/conceptql/operators/operator.rb, line 690
def validate_at_most_one_argument
  add_error("has multiple arguments", @arguments) if @arguments.length > 1
end
validate_at_most_one_upstream() click to toggle source
# File lib/conceptql/operators/operator.rb, line 673
def validate_at_most_one_upstream
  add_error("has multiple upstreams", upstream_operator_names) if @upstreams.length > 1
end
validate_codes_match() click to toggle source
# File lib/conceptql/operators/operator.rb, line 723
def validate_codes_match
  unless bad_arguments.empty?
    add_warning("improperly formatted code", *bad_arguments)
  end
end
validate_no_arguments() click to toggle source
# File lib/conceptql/operators/operator.rb, line 681
def validate_no_arguments
  add_error("has arguments", @arguments) unless @arguments.empty?
end
validate_no_upstreams() click to toggle source
# File lib/conceptql/operators/operator.rb, line 664
def validate_no_upstreams
  add_error("has upstreams", upstream_operator_names) unless @upstreams.empty?
end
validate_one_argument() click to toggle source
# File lib/conceptql/operators/operator.rb, line 685
def validate_one_argument
  validate_at_least_one_argument
  validate_at_most_one_argument
end
validate_one_upstream() click to toggle source
# File lib/conceptql/operators/operator.rb, line 668
def validate_one_upstream
  validate_at_least_one_upstream
  validate_at_most_one_upstream
end
validate_option(format, *opts) click to toggle source
# File lib/conceptql/operators/operator.rb, line 698
def validate_option(format, *opts)
  opts.each do |opt|
    if options.has_key?(opt)
      unless format === options[opt]
        add_error("wrong option format", opt.to_s)
      end
    end
  end
end
validate_required_options(*opts) click to toggle source
# File lib/conceptql/operators/operator.rb, line 708
def validate_required_options(*opts)
  opts.each do |opt|
    unless options.has_key?(opt)
      add_error("required option not present", opt.to_s)
    end
  end
end