Class: ActiveJob::QueueAdapters::AsyncAdapter::Scheduler
- Defined in:
 - activejob/lib/active_job/queue_adapters/async_adapter.rb
 
Overview
:nodoc:
Constant Summary collapse
- DEFAULT_EXECUTOR_OPTIONS =
 { min_threads: 0, max_threads: ENV.fetch("RAILS_MAX_THREADS", 5).to_i, auto_terminate: true, idletime: 60, # 1 minute max_queue: 0, # unlimited fallback_policy: :caller_runs # shouldn't matter -- 0 max queue }.freeze
Instance Attribute Summary collapse
- 
  
    
      #immediate  ⇒ Object 
    
    
  
  
  
  
    
    
  
  
  
  
  
  
    
Returns the value of attribute immediate.
 
Instance Method Summary collapse
- #enqueue(job, queue_name:) ⇒ Object
 - #enqueue_at(job, timestamp, queue_name:) ⇒ Object
 - #executor ⇒ Object
 - 
  
    
      #initialize(**options)  ⇒ Scheduler 
    
    
  
  
  
    constructor
  
  
  
  
  
  
  
    
A new instance of Scheduler.
 - #shutdown(wait: true) ⇒ Object
 
Constructor Details
#initialize(**options) ⇒ Scheduler
Returns a new instance of Scheduler.
      86 87 88 89 90  | 
    
      # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 86 def initialize(**) self.immediate = false @immediate_executor = Concurrent::ImmediateExecutor.new @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge()) end  | 
  
Instance Attribute Details
#immediate ⇒ Object
Returns the value of attribute immediate.
      84 85 86  | 
    
      # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 84 def immediate @immediate end  | 
  
Instance Method Details
#enqueue(job, queue_name:) ⇒ Object
      92 93 94  | 
    
      # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 92 def enqueue(job, queue_name:) executor.post(job, &:perform) end  | 
  
#enqueue_at(job, timestamp, queue_name:) ⇒ Object
      96 97 98 99 100 101 102 103  | 
    
      # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 96 def enqueue_at(job, , queue_name:) delay = - Time.current.to_f if !immediate && delay > 0 Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform) else enqueue(job, queue_name: queue_name) end end  | 
  
#executor ⇒ Object
      110 111 112  | 
    
      # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 110 def executor immediate ? @immediate_executor : @async_executor end  | 
  
#shutdown(wait: true) ⇒ Object
      105 106 107 108  | 
    
      # File 'activejob/lib/active_job/queue_adapters/async_adapter.rb', line 105 def shutdown(wait: true) @async_executor.shutdown @async_executor.wait_for_termination if wait end  |