One of the projects I'm working on involves scraping information from Amazon product listings. Currently it uses delayed job, but there's an issue with long-running processes dying off.
To deal with this we are considering a switch to Amazon's Simple Workflow Service. It moves the task queue to Amazon's servers and client programs then poll for tasks.
For the purpose of this article I'm going to do a simplified version of the application that just pulls down prices. The data structures for that are pretty straightforward:
rails new swf_scraper
rails generate scaffold Product asin:string
rails generate scaffold Record price:float product_id:integer
Connect the records to the products by adding has_many
and belongs_to
to the product and record models respectively.
Accessing the workflow service requires adding aws-sdk
and aws-flow
gems to the Gemfile
.
SWF uses workflows to define the order of activity execution. The workflow for this project is:
class ScrapeWorkflow extend AWS::Flow::Workflows workflow :queue_scrape do { :version => "1.1", :task_list => SWF_WORKFLOW_TASK_LIST, :execution_start_to_close_timeout => 10 * 60, } end activity_client(:activity){ {:from_class => "ScrapeActivity"} } def queue_scrape(asin) scrape_future = Future.new.set scrape_future = activity.send_async(:scrape, asin) # wait_for_all(scrape_future) end end
There is just one task in this workflow: scrape the asin. The activity is where the actual processing takes place:
class ScrapeActivity extend AWS::Flow::Activities activity :scrape do { :version => "1.1", :default_task_list => SWF_ACTIVITY_TASK_LIST, :default_task_schedule_to_start_timeout => 10 * 60, :default_task_start_to_close_timeout => 30, } end def initialize @count = 0 end def scrape(asin) begin @count += 1 url = "http://www.amazon.com/dp/" + asin response = HTTParty.get(URI.encode(url)) doc = Nokogiri::HTML(response) price_div = doc.at_css('.priceLarge') price = (price_div.nil? or price_div.text[/[0-9\.,]+/].nil?) ? nil : price_div.text[/[0-9\.,]+/].gsub(/,/, '').to_f unless price price_div = doc.at_css('.a-color-price.a-size-large') price = (price_div.nil? or price_div.text[/[0-9\.,]+/].nil?) ? nil : price_div.text[/[0-9\.,]+/].gsub(/,/, '').to_f end if price product = Product.find_by_asin(asin) product.records.create( price: price ) end puts "#{@count} Scraped: #{asin}: #{price}" rescue => e puts "Error: #{e.message}" end end end
The last piece of the puzzle is actually queuing the jobs and running the workflow and activity. This is accomplished with a rake task:
require "#{Rails.root}/app/helpers/application_helper" include ApplicationHelper require "#{ENV['GEM_HOME']}/gems/aws-flow-1.0.0/lib/aws/decider.rb" require "#{Rails.root}/config/initializers/swf.rb" require "#{Rails.root}/lib/scrape_activity.rb" require "#{Rails.root}/lib/scrape_workflow.rb" namespace :swf do desc 'Start activity worker' task :activity => :environment do swf, domain = swf_domain activity_worker = AWS::Flow::ActivityWorker.new(swf.client, domain, SWF_ACTIVITY_TASK_LIST, ScrapeActivity) { {:use_forking => false} } activity_worker.start end desc 'Start workflow worker' task :workflow => :environment do swf, domain = swf_domain worker = AWS::Flow::WorkflowWorker.new(swf.client, domain, SWF_WORKFLOW_TASK_LIST, ScrapeWorkflow) worker.start end desc 'Queue activities' task :scrape => :environment do swf, domain = swf_domain my_workflow_client = AWS::Flow::workflow_client(swf.client, domain) { {:from_class => "ScrapeWorkflow"} } Product.all.each do |product| $workflow_execution = my_workflow_client.start_execution(product.asin) end end end
Setting up the client and domain is done by a helper method:
module ApplicationHelper def swf_domain @swf = AWS::SimpleWorkflow.new begin @domain = @swf.domains.create(SWF_DOMAIN, "10") rescue AWS::SimpleWorkflow::Errors::DomainAlreadyExistsFault => e @domain = @swf.domains[SWF_DOMAIN] end return @swf, @domain end end