关于 SpringBoot 中的并发请求外部接口的需求解惑
前言
老哥们,现在碰到一个需求,希望大家帮忙看看,有什么方案。
我是 Java 菜鸡,可能提到的某些点很傻很无知,望见谅。
需求
后台有这样一个接口 /demo ,前端请求到 /demo 后,代码需要按照顺序访问多个外部 HTTP 接口
比如外部接口有三个:
/api-a
/api-b
/api-c
前端请求 /demo 后,后端直接返回 response 200 "ok" 就行,不用阻塞。
每个外部接口的一些传参都依赖于前一个请求的返回值(/api-c 的传参依赖于/api-b ),所以顺序是一定的,只能一个个请求。
最终结果(最后一个接口 /api-c 响应后),返回结果存入数据库。
另外,中间可能会出现 timeout 或者其他异常,这些信息也需要存入数据库。
问题在于,请求的并发量略微大了些,大概一秒钟有上百个请求进来(可以简化成每一秒就有 120 个请求进入该接口)。
机器环境、语言、框架、数据库
机器:单个虚拟机,CPU 和内存都可以按需求往上调大,目前是 18 核 48GB 的配置。
语言:Java 21 (抱歉,其他语言不会,只能用这个)
框架:SpringBoot3
数据库:PG 、Redis 、Mongo (随意使用)
我自己的方案
我的方法很直接,把请求的外部方法的代码放在一个 service 函数里,然后加 注解。
然后配置 ThreadPoolTaskExecutor (就是网上都能搜到的那些配置)。
另外,为了追踪每一个任务线程的结果,在线程里,一开始就生成一个 UUID ,然后构造一个对象,每一步都把相应的信息(成功或者失败)存入这个对象,最后以这个 UUID 为主键存储到数据库里。
有更好的解决方案么
按照我自己的观察,如果线程数量给小了,就容易产生队列堆积,给大了,又不确定该给多大,难道只能测试?
我的理解大概是 100 个请求进来,假设外部 3 个接口,每个需要 5 秒,那么全部请求完就是 15 秒(忽略其他时延),100 * 15 = 1500 个线程,如果小于这个值,就会堆积在队列中。
我想知道是否能根据以下的变量,通过某种方法推算出这个接口的理论的上限?
机器配置( CPU 个数,内存大小,上下行带宽等)
请求外部接口的个数,平均每个外部接口的响应时间
其他参数
怎么计算,并且达到这个上限?有什么更好的方法么?
消息队列
换个思路, 用 MQ 去处理, 单独的服务去消费, 开几台消费者丰俭由人. 反正前端不用等结果. 不会影响接收请求的服务
都 Java21 了,直接整虚拟线程了,都不用管线程数。
"Java 21"? 要素察觉,虚拟线程
数据要尽可能不丢失就消息队列
用 java 自带的队列也可以,简单方便
消息队列
使用多个长连接,循环请求这个接口,检查上传带宽占用多少
消息队列/线程池都行
VirtualThreadTaskExecutor
随便怎么写,先加个监控布上去,有大量需求再调
用消息队列也可以,用线程池也行,如果是线程池,你要确定并发数有多少,确定并发时间会持续的时长,三个外部接口,每个请求需要花多少时间,外部接口能不能接受你的高并发要求?你要根据以上几点来考虑如何设置线程池大小.
mq 合适 如果你希望维护这个 MQ 就是用 eventbus 谷歌的 你可以理解为一种不需要部署的内部 MQ 或者就是线程池起线程监听 处理
用队列缓存前端进来的请求,后面开多少线程不是就随便搞
NIO 可以的,比较清晰看到。当然虚拟线程也可以 ,但是要用对才行。
可以试试响应式编程,因为你的后一个 api 请求依赖前一个的结果,这个的吞吐量理论上也很高,关键词 Reactive
直接起个 mq ,接口就直接推消息给 mq 就行了,然后写个消费程序,接受 mq 消息然后处理业务逻辑,如果程序报错就不要 ack ,走重试逻辑,如果请求数量很多并且是不间断地,处理不过来的话,就加消费程序就行了,这是标准处理方式
用虚拟线程最合适 ,虚拟线程依次串行访问 。还能大并发
mq 或者把任务存到数据库, 在通过定时任务/线程池拉取任务执行
这种问题的标准解法是消息队列,如果没有 kafka/rabbit ,那么 redis 的 list 可以一用。
用 reactor 的异步 http 请求或者用虚拟线程
使用 MQ 会不会堆积呢,需求是前端对 /demo 发起请求后,后端逻辑(就是顺序请求外部接口的方法)必须要立即执行,引入中间件会不会增加中间的时延。
如果不想把程序写很大(MQ + 消费者之类的), 直接请求进来存数据库, 对应某个字段为任务是否执行, 然后 scheduler 去找没有执行的数据, 执行并把结果添上, scheduler 后面想加多少线程就随意了.
其实就是相当于用数据库直接当 mq 用, 前台请求只是插入挺简单的速度回很快
这还不上队列么
每秒都进来 120 了,上游就是 360 ,就算上游不限流,响应要是慢一点不得干爆你
java21 ,springboot3.2 使用虚拟线程
用 MQ 主要是为了解耦,可以更灵活。如果写在一起,如果请求外部接口出问题了,可能拖垮对前端接口的响应。所以单独拎出来消费者。担心中间延迟,就多些消费者,保证消息不堆积,基本就不会太多的延迟。(因为你异步请求外部接口,本身就已经是默认有延迟了,不是同步操作)
基本就大家说的 MQ 或者虚拟线程,瓶颈其实在外部接口,不在机器配置上
100 的 qps ,别说 MQ 了,你就是直接落库也没事啊
还有就是一点你要搞清楚挤压不完全时因为你的 线程设置
还要看你调用接口的 策略 好比你现在是 5 秒每个*3 就算你无限资源往上加 人家接口扛不住了变 10 秒每个或者全挂了也不是你想要的结果
mq 更多是解耦 有了 mq 你也要确认消费的并发来调整 mq 消费者的数量或者并发数
- 有段时间会使用 spring flow ,每个请求是个 Mono 或者 Flow ,多个请求将这些流 zip / combine 在一起,再根据业务要求返回不同接口。但无论是响应式还是 ReactiveX ,都有一些自己的问题。
- 现在更多会用 Ktor
如果你的这种请求特别多,且大,我觉得纯异步队列方案可能是最好的
请求进来,丢给线程池去异步处理,如果任务数超出了线程池的队列长度,就暂存数据库,再起一个定时任务,定时消费数据库中的任务。
前面用个 mq 接受数据,
消费数据可以看下 Nio 相关工具,比如说 spring 的 webclient 。把自己当成网关压力给到后方 abc 服务这么高的配置不至于顶不住啊,多几个实例
mq 异步处理+定时任务扫描处理失败的
只要是 mq 不管什么形式的都可能会堆积,具体是看你的业务,能不能忍受堆积,堆积了一段时间的任务是继续处理还是扔掉。
请求量这么大肯定需要 MQ 进行存放, 至于后续你可以使用虚拟线程, 也可以试试 CompletableFuture 的函数编程方式.
非常非常感谢大家的回复,我正在开始了解 Java 21 的虚拟线程,希望能用上
之前碰到了 OOM 的问题:java.lang.OutOfMemoryError: unable to create native thread: possibly out of memory or process/resource limits reached
因为我看程序根本没吃满内存,所以我改了 Xms Xmx Xss ,似乎都没啥用,然后我就在一个虚拟机开了俩实例,nginx 负载均衡,结果就再也没报这个错误了。
你是用 tomcat 吗?可以搜索一下 tomcat 配置优化
Spring webclient 响应式请求
Jdk21 的话虚拟线程也是不错的,这是发挥虚拟线程能力的典型场景
感觉这个 100qps 应该用不上 mq
我只关心顺序调用的中间有失败的,前面成功的接口怎么回滚?🤨
能队列的还是队列,万一重启异步进程的请求信息不是丢了都?🤨
搞个队列啊,前端请求过来就记下来然后直接返回,自己开个线程或者进程慢慢去跑所有记下来的请求
加线程池监控慢慢调就行
你这个不是后面的依赖前面的数据吗?直接一个方法里面挨个请求呀,用得着 么? api/a 的数据没回来,你调 api/b 时怎么传参?接口超时不应该抛出异常么?通过捕获异常来打日志就行。
至于你每秒钟有 120 请求,这就看你的/demo 接口的并发能力了。压测一下就知道 QPS 了,然后根据需要的机器数量,在 nginx 上做个负载均衡就行。
歪个楼,对于会前端的 Java 菜鸡。
先不考虑性能和数据完整性问题,java 有类似前端这样的写法吗?
demoB = async ()=>{
const a = await api_a()
const b = await api_a(a)
const c = await api_a(b)
await save(c)
}
demo = ()=>{
demoB()
return 200
},
全 await 不就是 java 默认的同步调用?
我觉得你的描述是,1 、前端请求后,后端只要接收到请求,即可返回结果,业务操作可异步进行
2 、异步进行时,无法确认自己线程池该给多少线程是最佳方案
3 、链式调用的时延较高,异步等待时间过长,队列堆积
个人想法:当无法确认外部接口的响应时间时,可通过 MQ 进行消息传递。
三个 TOPIC ,和你的思路一致,每当有一个/demo 被请求,则直接发送消息与参数至 TOPIC-A 中,然后 CONSUMER-A 去处理。A 处理完则发消息到 B ,B 消费完则发送消息至 C 并被消费。
通过 MQ 的方式,首先可以保证消息不丢失,且链式不出问题,日志记录、报错回滚与重试都更方便。
至于 MQ 消费者的线程个数,这个没所谓的,基本上都是有则新建线程,等待一段时间后回收线程。
其次如果觉得一个线程同一时间只能消费一条消息太慢的话,可以批量消费,通过 Future.get 来实现异步。
jdk21 的虚拟线程不太懂。
spring5 的 webclient ,基于 reactor 模型的,a 接口的 subscribe 中调 b 接口并结果落库,b 的 subscribe 中调 c 接口并结果落库,以此类推。
这个其实主要看你是每秒都有这么多请求,还是只是偶尔有这么多请求,如果每秒都是的话,你的消费者就得大,无论是线程还是消息队列都一样,不然就会一直堆积;如果只是偶尔这么多请求,线程池小一点也没关系,慢慢消化就行了
我也遇到过类似的需求,只不过那个需求更加消耗资源,直接丢到 serverless 了,想调几百几千次都行,哈哈,把并发的烦恼完全抛掉
demo 接口調用處是異步的,不是阻塞
虚拟线程或者用 Webflux 非阻塞来实现超高并发,当然要求底层数据库访问也要支持
webflux 或是 RXjava ,前端就是 rxjs ,就是专门用来解决这类问题的,建议你了解下响应式编程,或是说函数式编程,不过如果你是习惯了面向对象编程可能不是太习惯,楼上很多回答都极不专业。
赞同。特别是最后一句,我看到这种问题都要上 mq 都血压上升
哥,信号量啊
长话短说. 请教高手解释解释脚本做了什么. 看得出来 zzh 是挖矿的,但是 newinit.sh 里面的一堆操作是啥呀? 昨天就操作了adduser postgres sud…
是不是有锁 指针可以调用所有方法 不是吧 非指针类型也可以调用所有方法啊. go 不是会自动处理吗? 是不是某些情况下返回的类型要给接口赋值的时候. 会遇到什么不支持…
开着 play store 的自动更新,醒来发现图标变了,点进去一直恶意弹广告。如果发现自己的图标也变了,不要点进去,直接卸载掉去 GitHub 下载最新的,并把 play s…