One of the projects I'm working on involves scraping information from Amazon product listings. Currently it uses delayed job, but there's an issue with long-running processes dying off.
To deal with this we are considering a switch to Amazon's Simple Workflow Service. It moves the task queue to Amazon's servers and client programs then poll for tasks.
For the purpose of this article I'm going to do a simplified version of the application that just pulls down prices. The data structures for that are pretty straightforward:
rails new swf_scraperrails generate scaffold Product asin:stringrails generate scaffold Record price:float product_id:integerConnect the records to the products by adding has_many and belongs_to to the product and record models respectively.
Accessing the workflow service requires adding aws-sdk and aws-flow gems to the Gemfile.
SWF uses workflows to define the order of activity execution. The workflow for this project is:
class ScrapeWorkflow
  extend AWS::Flow::Workflows
  workflow :queue_scrape do
    {
      :version => "1.1",
      :task_list => SWF_WORKFLOW_TASK_LIST,
      :execution_start_to_close_timeout => 10 * 60,
    }
  end
  activity_client(:activity){ {:from_class => "ScrapeActivity"} }
  def queue_scrape(asin)
    scrape_future = Future.new.set
    scrape_future = activity.send_async(:scrape, asin)
    # wait_for_all(scrape_future)
  end
end
    There is just one task in this workflow: scrape the asin. The activity is where the actual processing takes place:
class ScrapeActivity
  extend AWS::Flow::Activities
  activity :scrape  do
    {
      :version => "1.1",
      :default_task_list => SWF_ACTIVITY_TASK_LIST,
      :default_task_schedule_to_start_timeout => 10 * 60,
      :default_task_start_to_close_timeout => 30,
    }
  end
  def initialize
    @count = 0
  end
    
  def scrape(asin)
    begin
      @count += 1
      url = "http://www.amazon.com/dp/" + asin
      response = HTTParty.get(URI.encode(url))
      doc = Nokogiri::HTML(response)
      price_div = doc.at_css('.priceLarge')
      price = (price_div.nil? or price_div.text[/[0-9\.,]+/].nil?) ? nil : price_div.text[/[0-9\.,]+/].gsub(/,/, '').to_f
      
      unless price
        price_div = doc.at_css('.a-color-price.a-size-large')
        price = (price_div.nil? or price_div.text[/[0-9\.,]+/].nil?) ? nil : price_div.text[/[0-9\.,]+/].gsub(/,/, '').to_f
      end
      if price
        product = Product.find_by_asin(asin)
        product.records.create( price: price )
      end
      puts "#{@count} Scraped: #{asin}: #{price}"
    rescue => e
      puts "Error: #{e.message}"
    end
  end
end
    The last piece of the puzzle is actually queuing the jobs and running the workflow and activity. This is accomplished with a rake task:
require "#{Rails.root}/app/helpers/application_helper"
include ApplicationHelper
require "#{ENV['GEM_HOME']}/gems/aws-flow-1.0.0/lib/aws/decider.rb"
require "#{Rails.root}/config/initializers/swf.rb"
require "#{Rails.root}/lib/scrape_activity.rb"
require "#{Rails.root}/lib/scrape_workflow.rb"
namespace :swf do
  desc 'Start activity worker'
  task :activity => :environment do
    swf, domain = swf_domain
    activity_worker = AWS::Flow::ActivityWorker.new(swf.client, domain, SWF_ACTIVITY_TASK_LIST, ScrapeActivity) { {:use_forking => false} }
    activity_worker.start
  end
  desc 'Start workflow worker'
  task :workflow => :environment do
    swf, domain = swf_domain
    worker = AWS::Flow::WorkflowWorker.new(swf.client, domain, SWF_WORKFLOW_TASK_LIST, ScrapeWorkflow)
    worker.start
  end
  desc 'Queue activities'
  task :scrape => :environment do
    swf, domain = swf_domain
    my_workflow_client = AWS::Flow::workflow_client(swf.client, domain) { {:from_class => "ScrapeWorkflow"} }
    Product.all.each do |product|
      $workflow_execution = my_workflow_client.start_execution(product.asin)
    end
  end  
end
    Setting up the client and domain is done by a helper method:
module ApplicationHelper
  def swf_domain
    
    @swf = AWS::SimpleWorkflow.new
    begin
      @domain = @swf.domains.create(SWF_DOMAIN, "10")
    rescue AWS::SimpleWorkflow::Errors::DomainAlreadyExistsFault => e
      @domain = @swf.domains[SWF_DOMAIN]
    end
    
    return @swf, @domain
  end
end