sqldump

(coffee) => code

File Uploads in Scala via Apache HttpComponents

In order to upload photos to Facebook from the server-side, the Graph API requires you to make a POST request with the image attached as multipart/form-data. Since I couldn’t find any native Scala libraries that make this task easy or intuitive, I went with the battle-tested Apache HttpComponents library.

Since the app was going to upload photos to Facebook in an on-demand fashion, I went with an HTTPClient that could be reused and was able handle multiple requests concurrently.

1
2
3
4
val httpClient = {
  val connManager = new PoolingHttpClientConnectionManager()
  HttpClients.custom().setConnectionManager(connManager).build()
}

The next step was to create an entity that contains all the POST data:

1
2
3
4
5
val accessToken = "MY_FACEBOOK_ACCESS_TOKEN"
val file = "/absolute/path/to/file.jpg"
val reqEntity = MultipartEntityBuilder.create()
reqEntity.addPart("source", new FileBody(file))
reqEntity.addPart("access_token", new StringBody(accessToken, ContentType.TEXT_PLAIN))

In this case source is the name of the form field whose value is the image file. The next step is to create a POST request and execute it using our httpClient.

1
2
3
4
5
val uri = new URI("https://graph.facebook.com/me/photos")

val httpPost = new HttpPost(uri)
httpPost.setEntity(reqEntity.build())
val response = httpClient.execute(httpPost, HttpClientContext.create())

Use EntityUtils to read the response text:

1
2
3
val entity = response.getEntity
val result = EntityUtils.toString(entity)
if(response != null) response.close()

Remember to close the response after you’re done reading from it. Here’s a full example including all the imports:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import java.io.File
import java.net.URI
import org.apache.http.client.methods.{HttpPost, CloseableHttpResponse}
import org.apache.http.client.protocol.HttpClientContext
import org.apache.http.entity.mime.MultipartEntityBuilder
import org.apache.http.entity.mime.content.{StringBody, FileBody}
import org.apache.http.entity.ContentType
import org.apache.http.util.EntityUtils
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager
import org.apache.http.impl.client.HttpClients
import scala.util.Try

object Uploader {

  lazy val httpClient = {
    val connManager = new PoolingHttpClientConnectionManager()
    HttpClients.custom().setConnectionManager(connManager).build()
  }

  def uploadToFacebook(file: File, accessToken: String): Try[String] = {
    val uri = new URI("https://graph.facebook.com/me/photos")

    Try({
      // Create the entity
      val reqEntity = MultipartEntityBuilder.create()

      // Attach the file
      reqEntity.addPart("source", new FileBody(file))

      // Attach the access token as plain text
      val tokenBody = new StringBody(accessToken, ContentType.TEXT_PLAIN)
      reqEntity.addPart("access_token", tokenBody)

      // Create POST request
      val httpPost = new HttpPost(uri)
      httpPost.setEntity(reqEntity.build())

      // Execute the request in a new HttpContext
      val ctx = HttpClientContext.create()
      val response: CloseableHttpResponse = httpClient.execute(httpPost, ctx)

      // Read the response
      val entity = response.getEntity
      val result = EntityUtils.toString(entity)

      // Close the response
      if(response != null) response.close()

      result
    })
  }
}

Akka and Back-pressure

Two steps are needed in order to correctly apply back-pressure in an Akka system:

Step 1: Bounded Mailboxes and Push Timeouts

The default mailbox for an actor is an UnboundedMailbox backed by Java’s ConcurrentLinkedQueue. As the name indicates, this mailbox grows without bound and will end up crashing the JVM with an OutOfMemoryError if the consumer significantly slower than the producer. If we want to be able to signal the producer to slow down, the first step is to switch to a BoundedMailbox backed by Java’s LinkedBlockingQueue that will block the producer if the mailbox is full. More info about different types of mailboxes can be found here.

Blocking the producer forever is not a good solution because: Rule #1 of Akka => don't block inside actors. The solution to this problem is provided to us by Akka in the form of a push timeout for an Actor’s mailbox. A push timeout is exactly what it sounds like: when you try to push a message to an actor’s mailbox, if the mailbox is full, the action will timeout and the message will get routed to the DeadLetterActorRef.

Configuring an actor to use a bounded mailbox with a 1000 message capacity and a push timeout of 100ms requires the following addition to the application.conf:

1
2
3
4
5
bounded-mailbox {
  mailbox-type = "akka.dispatch.BoundedMailbox"
  mailbox-capacity = 1000
  mailbox-push-timeout-time = 100ms
}

The actor can then be initialized as follows

1
val actor = system.actorOf(Props[MyActorClass].withMailbox("bounded-mailbox"))

Step 2: DeadLetter Watcher

When an actor’s mailbox is full and sent messages start timing out, they get routed to the DeadLetterActorRef via the Event Stream of the actor system. Akka allows actors to subscribe to event streams and listen in on all, or a filtered subset of, the messages flying around in the actor system. Since the dead letters service also utilizes the event stream infrastructure, we can subscribe to all DeadLetter messages being published in the stream and signal the producer to slow down.

The following snipped can be used to get an actor subscribed to all the DeadLetter messages in a system

1
system.eventStream.subscribe(listeningActor, classOf[DeadLetter])

Tying it all together with an Example

In this example, a fast producer sends messages to a slow consumer. The slow consumer has a bounded mailbox of size 10 and a push timeout of 10ms.

SlowReceiver

The SlowReceiver blocks for 100ms after printing each message it receives. The blocking is only present to ensure that it’s mailbox fills up.

1
2
3
4
5
6
7
8
9
10
11
12
13
import akka.actor._

class SlowReceiver extends Actor with ActorLogging {
  override def postStop() {
    log.info("SlowReceiver#postStop")
  }

  def receive: Actor.Receive = {
    case msg: String =>
      log.info(s"Received: $msg")
      Thread.sleep(100)
  }
}
FastSender

The FastSender waits for a kickoff message and then sends 15 messages to the SlowReceiver and a PoisonPill to itself. After terminating itself, the actor’s postStop hook schedules a PoisonPill to be sent to the SlowReceiver 3 seconds after the FastSender has been terminated.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import akka.actor._
import scala.concurrent.duration._

class FastSender(slow: ActorRef) extends Actor with ActorLogging {
  override def postStop() {
    log.info("FastSender#postStop")
    context.system.scheduler.scheduleOnce(2.seconds, slow, PoisonPill)(context.dispatcher)
  }

  def receive: Actor.Receive = {
    case _ =>
      for(i <- 1 to 15) {
        slow ! s"[$i]"
      }
      log.info("Done sending all")
      self ! PoisonPill
  }
}
Watcher

The Watcher watches for and prints DeadLetters being sent to the SlowReceiver. It also context.watches the SlowReceiver and terminates the actor system when the SlowReceiver is killed.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import akka.actor._

class Watcher(target: ActorRef) extends Actor with ActorLogging {
  private val targetPath = target.path

  override def preStart() {
    context.watch(target)
  }

  def receive: Actor.Receive = {
    case d: DeadLetter =>
      if(d.recipient.path.equals(targetPath)) {
        log.info(s"Timed out message: ${d.message.toString}")
      }

    case Terminated(`target`) =>
      context.system.shutdown()
  }
}
BackPressureTest App

The App that ties it all together.

1
2
3
4
5
6
7
8
9
10
11
object BackPressureTest extends App {
  case object Ping

  val sys = ActorSystem("testSys")
  val slow = sys.actorOf(Props[SlowReceiver].withMailbox("bounded-mailbox"), "slow")
  val fast = sys.actorOf(Props(classOf[FastSender], slow), "fast")
  val watcher = sys.actorOf(Props(classOf[Watcher], slow), "watcher")
  sys.eventStream.subscribe(watcher, classOf[DeadLetter])

  fast ! Ping
}
Output

Running the BackPressureTest app gives the following output:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
[INFO] [01/13/2014 14:00:56.303] [akka://testSys/user/slow] Received: [1]
[INFO] [01/13/2014 14:00:56.315] [akka://testSys/user/watcher] Timed out message: [12]
[INFO] [01/13/2014 14:00:56.326] [akka://testSys/user/watcher] Timed out message: [13]
[INFO] [01/13/2014 14:00:56.337] [akka://testSys/user/watcher] Timed out message: [14]
[INFO] [01/13/2014 14:00:56.347] [akka://testSys/user/fast] Done sending all
[INFO] [01/13/2014 14:00:56.347] [akka://testSys/user/watcher] Timed out message: [15]
[INFO] [01/13/2014 14:00:56.350] [akka://testSys/user/fast] FastSender#postStop
[INFO] [01/13/2014 14:00:56.403] [akka://testSys/user/slow] Received: [2]
[INFO] [01/13/2014 14:00:56.504] [akka://testSys/user/slow] Received: [3]
[INFO] [01/13/2014 14:00:56.605] [akka://testSys/user/slow] Received: [4]
[INFO] [01/13/2014 14:00:56.705] [akka://testSys/user/slow] Received: [5]
[INFO] [01/13/2014 14:00:56.807] [akka://testSys/user/slow] Received: [6]
[INFO] [01/13/2014 14:00:56.907] [akka://testSys/user/slow] Received: [7]
[INFO] [01/13/2014 14:00:57.008] [akka://testSys/user/slow] Received: [8]
[INFO] [01/13/2014 14:00:57.109] [akka://testSys/user/slow] Received: [9]
[INFO] [01/13/2014 14:00:57.209] [akka://testSys/user/slow] Received: [10]
[INFO] [01/13/2014 14:00:57.310] [akka://testSys/user/slow] Received: [11]
[INFO] [01/13/2014 14:00:58.367] [akka://testSys/user/slow] SlowReceiver#postStop
[DEBUG] [01/13/2014 14:00:58.373] [EventStream] shutting down: StandardOutLogger started
[DEBUG] [01/13/2014 14:00:58.373] [EventStream] shutting down: StandardOutLogger started
[DEBUG] [01/13/2014 14:00:58.375] [EventStream] all default loggers stopped

Process finished with exit code 0
Back-pressure Strategy

While this example doesn’t actually implement back-pressure, it provides the infrastructure for applying a back-pressure strategy. A possible strategy would be to send FastSender a SlowDown message from within the Watcher for each dead letter received. The SlowDown case class could be defined as

1
case class SlowDown(dl: DeadLetter)

When FastSender receives a SlowDown message, it could either throttle itself or tell its upstream systems to slow down. The SlowDown message also encapsulates the relevant DeadLetter object to allow for retry logic.

ExecutionContext for Long Running Tasks

Rule #1 of Akka: don’t block inside actors. If you do have blocking / high latency calls, wrap them in a future and toss them into a different execution context specifically meant for high latency tasks.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import java.util.concurrent.Executors

class ClassyActor extends Actor {
  val numThreads = 10
  val pool = Executors.newFixedThreadPool(numThreads)
  val ctx = ExecutionContext.fromExecutorService(pool)

  def receive: Actor.Receive = {
    // An message that needs some high latency work done
    case m: Message =>
      val future = Future {
        // do something with m: Message
      } (ctx)

      future.onComplete({
        case Success(s) =>
          // do something when the task successfully completed
        case Failure(f) =>
          // do something when the task failed
      }) (ctx)
  }
}

Getting the Titan-0.3.2 Java Driver to Work in Scala

Attempting to use the Java driver for Titan-0.3.2 (the current stable Titan release) with Cassandra v1.2.6 in a Scala project throws the following Astyanax error:

1
2
3
java.lang.NoSuchMethodError: 
org.apache.cassandra.thrift.TBinaryProtocol: 
method (Lorg/apache/thrift/transport/TTransport;)V not found

This can be traced back to a bug in Astyanax v1.56.37 which was fixed in v1.56.43. This can be fixed by ensuring that this dependency is listed above the Titan driver in your pom.xml or sbt.build.

Setup Ruby (2.x) on Rails (4.x) in 10 Lines or Less

As a first time user of Ruby and Rails, I was able to setup a sane environment on Ubuntu Server 12.04 (x64) in less than 10 commands:

1
2
3
4
5
6
7
8
cd $HOME
curl -L https://get.rvm.io | bash
source $HOME/.rvm/scripts/rvm
rvm requirements
rvm install ruby
rvm use ruby --default
rvm rubygems current
gem install rails

And, if you already have a rails app to run, continue along with:

1
2
3
cd myapp
bundle install
rails server

This will get you up and running with a dev server.

Prevent Facebook From Tracking You on the Web

A simple search for “facebook like button tracking you” turns up a bunch of articles and blog posts about how Facebook is creeping on you wherever you go; even if you’re not signed in or actively clicking their ubiquitous like button (example). I’ve come up with a simple solution to the problem.

Caveat: This will prevent you from being able to use Facebook.

Step 1: Add the following entries to your hosts file

1
2
3
127.0.0.1 facebook.com
127.0.0.1 www.facebook.com
127.0.0.1 connect.facebook.net

If you’re not familiar with editing your hosts file, here’s a tutorial.

Step 2: Reboot

And done!

Joda Time With Scala

When trying to use Joda-Time in a Scala project, I encountered a rather cryptic error:

1
2
scala: error while loading Instant, class file '/Users/asdf/.m2/repository/joda-time/joda-time/2.3/joda-time-2.3.jar(org/joda/time/Instant.class)' is broken
(class java.lang.RuntimeException/bad constant pool tag 9 at byte 48)

This can be solved by adding the joda-convert dependency into Maven/SBT.

Pip Install Scipy on Ubuntu

On Ubuntu 12.04 Server, a pip install scipy barfs with

1
library dfftpack has Fortran sources but no Fortran compiler found

because it wants you to

1
2
3
4
sudo apt-get install libamd2.2.0 libblas3gf libc6 libgcc1 \
libgfortran3 liblapack3gf libumfpack5.4.0 libstdc++6 \
build-essential gfortran python-all-dev \
libatlas-base-dev

and,

1
pip install numpy

before you try to

1
pip install scipy