Kotlin协程buffer缓冲池,调度任务执行
import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.delay import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.runBlocking fun main() { runBlocking { val taskFlow = flow { repeat(10) { it -> val req = Req(it, (Math.random() * 9999).toInt()) emit(req) } } taskFlow.onStart { println("onStart") } .onCompletion { println("onCompletion") } .onEach { it -> println("onEach $it ${System.currentTimeMillis()}") }.buffer(capacity = 3, onBufferOverflow = BufferOverflow.SUSPEND) .collect { it -> it.load() } } } class Req { var id = 0 var priority = 0 var timestamp = 0L constructor(id: Int, priority: Int) { this.id = id this.priority = priority this.timestamp = System.currentTimeMillis() } suspend fun load() { println("$this loading ...") val time = (Math.random() * 100).toLong() delay(time) println("$this loading time cost=$time") } override fun toString(): String { return "Req(id=$id, priority=$priority, timestamp=$timestamp)" } }输出:
onStart
onEach Req(id=0, priority=9545, timestamp=1765433999255) 1765433999263
onEach Req(id=1, priority=6391, timestamp=1765433999265) 1765433999265
onEach Req(id=2, priority=3125, timestamp=1765433999265) 1765433999265
onEach Req(id=3, priority=5451, timestamp=1765433999265) 1765433999265
onEach Req(id=4, priority=9302, timestamp=1765433999265) 1765433999265
Req(id=0, priority=9545, timestamp=1765433999255) loading ...
Req(id=0, priority=9545, timestamp=1765433999255) loading time cost=41
Req(id=1, priority=6391, timestamp=1765433999265) loading ...
onEach Req(id=5, priority=3988, timestamp=1765433999316) 1765433999316
Req(id=1, priority=6391, timestamp=1765433999265) loading time cost=92
Req(id=2, priority=3125, timestamp=1765433999265) loading ...
onEach Req(id=6, priority=7803, timestamp=1765433999424) 1765433999424
Req(id=2, priority=3125, timestamp=1765433999265) loading time cost=82
Req(id=3, priority=5451, timestamp=1765433999265) loading ...
onEach Req(id=7, priority=1557, timestamp=1765433999516) 1765433999516
Req(id=3, priority=5451, timestamp=1765433999265) loading time cost=57
Req(id=4, priority=9302, timestamp=1765433999265) loading ...
onEach Req(id=8, priority=7678, timestamp=1765433999579) 1765433999579
Req(id=4, priority=9302, timestamp=1765433999265) loading time cost=68
Req(id=5, priority=3988, timestamp=1765433999316) loading ...
onEach Req(id=9, priority=3493, timestamp=1765433999655) 1765433999655
Req(id=5, priority=3988, timestamp=1765433999316) loading time cost=97
Req(id=6, priority=7803, timestamp=1765433999424) loading ...
onCompletion
Req(id=6, priority=7803, timestamp=1765433999424) loading time cost=79
Req(id=7, priority=1557, timestamp=1765433999516) loading ...
Req(id=7, priority=1557, timestamp=1765433999516) loading time cost=19
Req(id=8, priority=7678, timestamp=1765433999579) loading ...
Req(id=8, priority=7678, timestamp=1765433999579) loading time cost=57
Req(id=9, priority=3493, timestamp=1765433999655) loading ...
Req(id=9, priority=3493, timestamp=1765433999655) loading time cost=41Process finished with exit code 0
相关:
https://blog.csdn.net/zhangphil/article/details/154843029
https://blog.csdn.net/zhangphil/article/details/154840841
https://blog.csdn.net/zhangphil/article/details/132527122