The code looked like this:
val futureSet: Future[Set[ProductId]] = noIndexRepository.findAll()
val noIndexSource: Source[Set[ProductId], NotUsed] = Source.fromFuture(futureSet) // basically a Source.single()
val productsSource: Source[product.Product, NotUsed] = productRepository.getAllProducts(tenant)
Code language: Scala (scala)
after looking at the implementation of zipWithIndex, I came up with this solution:
val source: Source[(product.Product, Set[ProductId]), NotUsed] = noIndexSource
.flatMapConcat(x => productsSource.statefulMapConcat { () ⇒
elem ⇒ {
val zipped = (elem, x)
immutable.Iterable[(product.Product, Set[ProductId])](zipped)
}
})
val filteredSource = source.filterNot(x => x._2.contains(x._1.id)).map(_._1)
val f = filteredSource.via(flow).runWith(Sink.ignore)(materializer)
Code language: Scala (scala)
Do you know of a easier way to accomplish this? Let me know in the comments!