Akka-Streams: Zip first element of one source with every element of other source

28 Feb
2018

The code looked like this:

val futureSet: Future[Set[ProductId]] = noIndexRepository.findAll() val noIndexSource: Source[Set[ProductId], NotUsed] = Source.fromFuture(futureSet) // basically a Source.single() val productsSource: Source[product.Product, NotUsed] = productRepository.getAllProducts(tenant)

after looking at the implementation of zipWithIndex, I came up with this solution:

val source: Source[(product.Product, Set[ProductId]), NotUsed] = noIndexSource .flatMapConcat(x => productsSource.statefulMapConcat { () ⇒ elem ⇒ { val zipped = (elem, x) immutable.Iterable[(product.Product, Set[ProductId])](zipped) } }) val filteredSource = source.filterNot(x => x._2.contains(x._1.id)).map(_._1) val f = filteredSource.via(flow).runWith(Sink.ignore)(materializer)

Do you know of a easier way to accomplish this? Let me know in the comments!

Comment Form

top