Simple Workflow Service

Will Holcomb

31 July 2013

One of the projects I'm working on involves scraping information from Amazon product listings. Currently it uses delayed job, but there's an issue with long-running processes dying off.

To deal with this we are considering a switch to Amazon's Simple Workflow Service. It moves the task queue to Amazon's servers and client programs then poll for tasks.

For the purpose of this article I'm going to do a simplified version of the application that just pulls down prices. The data structures for that are pretty straightforward:

  1. rails new swf_scraper
  2. rails generate scaffold Product asin:string
  3. rails generate scaffold Record price:float product_id:integer

Connect the records to the products by adding has_many and belongs_to to the product and record models respectively.

Accessing the workflow service requires adding aws-sdk and aws-flow gems to the Gemfile.

SWF uses workflows to define the order of activity execution. The workflow for this project is:

class ScrapeWorkflow
  extend AWS::Flow::Workflows

  workflow :queue_scrape do
    {
      :version => "1.1",
      :task_list => SWF_WORKFLOW_TASK_LIST,
      :execution_start_to_close_timeout => 10 * 60,
    }
  end

  activity_client(:activity){ {:from_class => "ScrapeActivity"} }

  def queue_scrape(asin)
    scrape_future = Future.new.set
    scrape_future = activity.send_async(:scrape, asin)
    # wait_for_all(scrape_future)
  end
end

There is just one task in this workflow: scrape the asin. The activity is where the actual processing takes place:

class ScrapeActivity
  extend AWS::Flow::Activities

  activity :scrape  do
    {
      :version => "1.1",
      :default_task_list => SWF_ACTIVITY_TASK_LIST,
      :default_task_schedule_to_start_timeout => 10 * 60,
      :default_task_start_to_close_timeout => 30,
    }
  end

  def initialize
    @count = 0
  end
    
  def scrape(asin)
    begin
      @count += 1

      url = "http://www.amazon.com/dp/" + asin
      response = HTTParty.get(URI.encode(url))
      doc = Nokogiri::HTML(response)

      price_div = doc.at_css('.priceLarge')
      price = (price_div.nil? or price_div.text[/[0-9\.,]+/].nil?) ? nil : price_div.text[/[0-9\.,]+/].gsub(/,/, '').to_f
      
      unless price
        price_div = doc.at_css('.a-color-price.a-size-large')
        price = (price_div.nil? or price_div.text[/[0-9\.,]+/].nil?) ? nil : price_div.text[/[0-9\.,]+/].gsub(/,/, '').to_f
      end

      if price
        product = Product.find_by_asin(asin)
        product.records.create( price: price )
      end

      puts "#{@count} Scraped: #{asin}: #{price}"
    rescue => e
      puts "Error: #{e.message}"
    end
  end
end

The last piece of the puzzle is actually queuing the jobs and running the workflow and activity. This is accomplished with a rake task:

require "#{Rails.root}/app/helpers/application_helper"
include ApplicationHelper

require "#{ENV['GEM_HOME']}/gems/aws-flow-1.0.0/lib/aws/decider.rb"
require "#{Rails.root}/config/initializers/swf.rb"
require "#{Rails.root}/lib/scrape_activity.rb"
require "#{Rails.root}/lib/scrape_workflow.rb"

namespace :swf do
  desc 'Start activity worker'
  task :activity => :environment do
    swf, domain = swf_domain
    activity_worker = AWS::Flow::ActivityWorker.new(swf.client, domain, SWF_ACTIVITY_TASK_LIST, ScrapeActivity) { {:use_forking => false} }
    activity_worker.start
  end

  desc 'Start workflow worker'
  task :workflow => :environment do
    swf, domain = swf_domain
    worker = AWS::Flow::WorkflowWorker.new(swf.client, domain, SWF_WORKFLOW_TASK_LIST, ScrapeWorkflow)
    worker.start
  end

  desc 'Queue activities'
  task :scrape => :environment do
    swf, domain = swf_domain
    my_workflow_client = AWS::Flow::workflow_client(swf.client, domain) { {:from_class => "ScrapeWorkflow"} }

    Product.all.each do |product|
      $workflow_execution = my_workflow_client.start_execution(product.asin)
    end
  end  
end

Setting up the client and domain is done by a helper method:

module ApplicationHelper
  def swf_domain
    
    @swf = AWS::SimpleWorkflow.new
    begin
      @domain = @swf.domains.create(SWF_DOMAIN, "10")
    rescue AWS::SimpleWorkflow::Errors::DomainAlreadyExistsFault => e
      @domain = @swf.domains[SWF_DOMAIN]
    end
    
    return @swf, @domain
  end
end