Akka-Streams: Zip first element of one source with every element of other source

28 Feb
2018

The code looked like this:

val futureSet: Future[Set[ProductId]] = noIndexRepository.findAll()
val noIndexSource: Source[Set[ProductId], NotUsed] = Source.fromFuture(futureSet) // basically a Source.single() 
val productsSource: Source[product.Product, NotUsed] = productRepository.getAllProducts(tenant)Code language: Scala (scala)

after looking at the implementation of zipWithIndex, I came up with this solution:

val source: Source[(product.Product, Set[ProductId]), NotUsed] = noIndexSource
  .flatMapConcat(x => productsSource.statefulMapConcat { () ⇒
    elem ⇒ {
      val zipped = (elem, x)
      immutable.Iterable[(product.Product, Set[ProductId])](zipped)
    }
  })

val filteredSource = source.filterNot(x => x._2.contains(x._1.id)).map(_._1)

val f = filteredSource.via(flow).runWith(Sink.ignore)(materializer)Code language: Scala (scala)

Do you know of a easier way to accomplish this? Let me know in the comments!

Comment Form

top