请教一个关于并发控制的问题
现在有这样一个函数 processBatch ,负责读取数据,执行一些操作后再更新它们,相关的数据库操作都在事务内执行。伪代码如下:
function processBatch():
tx = db.beginTransaction()
// 1. 批量读取:取出最多 N 条“待处理”数据
items = tx.query("SELECT * FROM tasks WHERE status = 'PENDING' LIMIT N")
for item in items:
// 2. 业务处理
doBusinessLogic(item)
// 3. 更新状态
tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)
tx.commit()
// 线程 A
spawn threadA:
processBatch()
// 线程 B (几乎同时执行)
spawn threadB:
processBatch()
但由于 processBatch 在多个地方都会被调用,因此存在并发问题。线程 A 和线程 B 执行时可能查询到同一批数据,导致这批数据被处理两次。解决这个问题有两个方案:
方案 A:在 processBatch 的逻辑中增加锁,这样在任意时刻,该函数都不会并发执行
方案 B:调整数据库事务的隔离级别或锁表,即使 processBatch 并发执行了,底层的数据操作不会出现并发的情况
我的问题是:
哪个方案更符合最佳实践?原因是什么
在保持 processBatch 会被多个地方调用不变的前提下,有没有更好的方案?
如果想学习这类并发相关的问题和解决方案,应该搜索什么关键词
感谢各位赐教
感觉是外部事务的颗粒度太大了,
一次查询多条数据,但没有立即更新该批次数据的状态,肯定会导致查询到重复的数据的;
建议增加一层内存或缓存级别的互斥锁,锁任务的 id 以及一个任务一个事务,这样可以保证一个任务每次只有一个线程处理。
噗……不会并发执行的多线程?
互联网公司的习惯一般并发控制都放在业务逻辑上,不太依赖数据库,就是用方案 A ,当然你数据库还是需要设置合理的隔离级别。在你的业务场景里,遇到并发,没抢到锁的线程是等待,还是直接返回报错不处理了,如果是前者的话,可以把所有的处理逻辑放到一个有序队列里,依次执行。
简单的很
for item in items:
//乐观锁
boolean flag = tx.execute("UPDATE tasks SET status = 'processing' WHERE id = ? and status = 'PENDING' ", item.id)
if(!flag){
// 抛异常
}
// 2. 业务处理
doBusinessLogic(item)
// 3. 更新状态
tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)
tx.commit()
接上面的 加一个状态,处理中
tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)
-> tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ? and status = 'PENDING' ", item.id)
processBatch
也可以想办法做成串行的
boolean flag = tx.execute("UPDATE tasks SET status = 'processing' WHERE id = ? and status = 'PENDING' ", item.id)
多线程 只有一个成功。
是否需要抛异常,看业务决定
- 哥们不会还没用过 deepseek 和 chatgpt 吧
不要在数据库玩这个……
1.读写分离,写放队列执行
2.读到的数据可能会重复,处理掉,用 etcd ,zookeeper, rides 之类的
3.没写过后端,轻喷
加锁不够优雅,不如消息队列,不过数据少用不上队列,但你可以用生产者消费者的思维来改写,保证只有一个生产者就行了
悲观锁、乐观锁、队列
除了上面提到的,还可以考虑用某种查询条件对数据分区,让不同线程查询到不同的数据
方案 A 和方案 B 都有问题。
如果 doBusinessLogic 是 IO 密集型,推荐使用 ThreadPoolExecutor 。操作步骤如下:
- 设置 ThreadPoolExecutor 并发 workers ,例如 workers=5 也就是有 5 个并发同时处理;
- 函数 processBatch:将 pending 的任务 ID submit 到 ThreadPoolExecutor 队列里;
- ThreadPoolExecutor workers 处理:先 update tasks set status='RUNNING' where id=? and status='PENDING',如果返回更新记录数=0 ,就直接 return 不处理; (这是一个防冲突的技巧),再 doBusinessLogic(id),然后 update tasks set status='DONE' where id=?
好处是:突发高并发时,任务是加入到队列的,不会挤爆服务器;可以设置并发 workers 同时处理。
这不是并发的问题,是设计的问题。
简单点,给 processBatch 加一个 start_time 和 end_time 参数,保证查到不一样的数据。
两阶段乐观锁吧,
第一阶段先拿一部分,从 Waiting 改成 Running ,拿到了再去执行。
SELECT 取出 id 列表,遍历的时候按照随机顺序(如果业务逻辑允许),用 SELECT FOR UPDATE 锁住每一行,检测状态,把 PENDING 更新为 PROCESSING ,处理完成后再更新为 DONE 。这里要有一个机制,把停留在 PROCESSING 超时的任务重新放回 PENDING 或者标记为 DONE 。
如果类似任务比较多的话,可以引入一个任务调度系统,别自己搞了,要填的坑和细节非常多。
这完全没必要用锁,是设计的问题,流程调整下就好了。
我理解你这块是离线的业务对吧。
首先 select id from tasks where status = 'PENDING' 拿出全量需要处理的数据,做成一个离线文件或者放内存里(看你自己的数据规模决定)
接着实现一个缓冲,简单点可以在内存里面构造一个比如长度为 16 的队列,存放下一批需要处理的数据
然后是设置并发度,比如说 4 ,这一块你用线程/纤程/进程 实现都可以,依次从队列里面取任务,队列空了在获取下一批数据到队列里面。
这一块自己实现细节还挺多的,比如任务失败了如何重跑,需不需要做背压之类的。
我之前做过类似的,用框架实现,对应的代码就很简洁,伪代码类似这样
ids.toStream.buffer(16).mapPar(4)(row => processData(row))
你就不能给任务分一下片吗?
线程 1:WHERE status = 'PENDING' and id % 2 = 0
线程 2:WHERE status = 'PENDING' and id % 2 = 1
说实话, 在互联网干了七八年了,多线程的代码屈指可数,都是多进程
- 使用分布式锁 性能一般
- 使用数据库的乐观锁 加 version 开发成本太高,需要多维护一个 version 字段
推荐方案,将数据分批打散,比如 1000 条数据 2 个现成并行执行,那么 1-500 是线程 A 执行。501-1000 线程 B 执行。经典方案。
话说这种问题直接丢给 ai ,很多方案。以前的 deepseek R1 模型的方案完整 准确率高。现在好像不行啦。
gg 思密达。这类似发短信的系统,很多地方调用发短信接口,都先写入发送记录表,状态为待发送,然后起一个任务循环执行发送并改状态;
如果是在一个进程的前提下,可以用线程安全的先进先出队列,把 processBatch 添加到队列,另起一个线程来消费队列
贴一个 C# 实现的类
利用 lucky 在 openwrt 上,将 nas 和 emby 映射到公网,然后在 cf 上开了个服务接收端口变化,手机上看电影很爽。 另外,将 qbittorrent 的…
想给网站弄一个扫码支付个 0.1 元-0.5 元查看全文,不然真的一年几千的服务器费用都不够。 CSDN 再版? 第三方有几个 比如虎皮椒 面包多 pay 啥网站啊一年…
有个疑问一直想不太明白,java api 服务是提高单虚拟的 cpu 核心数和内存大小,调高最大线程数。还是干脆用 tomcat 的默认配置,部署多台机器,用 nginx 负载…