Note to self: Akka-Streams: Zip first element of one source with every element of other source

28 Feb
2018

The code looked like this:

val noIndexSource: Source[Set[ProductId], NotUsed] = Source.fromFuture(noIndexRepository.findAll()) // basically a Source.single()
val productsSource: Source[product.Product, NotUsed] = productRepository.getAllProducts(tenant)

after looking at the implementation of zipWithIndex, I came up with this solution:

val source: Source[(product.Product, Set[ProductId]), NotUsed] = noIndexSource
  .flatMapConcat(x => productsSource.statefulMapConcat { () ⇒
    elem ⇒ {
      val zipped = (elem, x)
      immutable.Iterable[(product.Product, Set[ProductId])](zipped)
    }
  })

val filteredSource = source.filterNot(x => x._2.contains(x._1.id)).map(_._1)

val f = filteredSource.via(flow).runWith(Sink.ignore)(materializer)

Do you know of a easier way to accomplish this? Let me know in the comments!

Comment Form

top