I spent a couple hours yesterday debugging what I thought was a Protobuf serialization issue, which turns out to be an unlawful Monoid-like use of aggregateByKey
in Scio.
The Problem
Both Scio and Spark have aggregate
and aggregateByKey
transformations that look like this:
// on SCollection[V]
def aggregate[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[U]
// on SCollection[(K, V)]
def aggregateByKey[U](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): SCollection[(K, U)]
And we have some business logic that looks like this:
case class Count(id: String, count: Int)
val z = Count("", 0) // zeroValue
def seqOp(acc: Count, v: Count) = Count(v.id, acc.count + v.count)
def combOp(x: Count, y: Count) = Count(x.id, x.count + y.count)
sc.parallelize(Seq(Count("a", 10), Count("a", 100), Count("b", 5), Count("b", 50)))
.groupBy(_.id)
.aggregateByKey(z)(seqOp, combOp)
This code however, only works correctly locally with DirectRunner
and always produces results with id == ""
when running on Dataflow service with the DataflowRunner
. Can you spot the bug?
Monoid laws
You might notice that zeroValue
and combOp
together resemble a Monoid, which should satisfy the identity law:
more ...