In this example, posts have tags. When a post is deleted, we check to see if there are now any empty tags, and now we want to delete any empty tags that the post deletion may have created.
If we are creating and deleting posts concurrently, a naive implementation might wrongly delete the tags of a newly created post.
This could be due to a concurrency issue of the following types.
Failure case 1:which would result in the new post incorrectly not having the
- thread 2: delete old post
- thread 2: find all tags with 0 posts. Finds
tag0
from the deleted old post which is now empty. - thread 1: create new post, which we want to have tag
tag0
- thread 1: try to create a new tag
tag0
, but don't because it already exists, this is done using SQLite'sINSERT OR IGNORE INTO
or PostgreSQL'sINSERT ... ON CONFLICT DO NOTHING
- thread 1: assign
tag0
to the new post by adding an entry to the join table - thread 2: delete all tags with 0 posts. It still sees from its previous search that
tag0
is empty, and deletes it, which then cascades into the join table
tag0
.Failure case 2:which leads to a foreign key failure, because the tag does not exist anymore when the assignment happens.
- thread 2: delete old post
- thread 2: find all tags with 0 posts
- thread 1: create new post
- thread 1: try to create a new tag
tag0
, but don't because it already exists - thread 2: delete all tags with 0 posts. It still sees from its previous search that
tag0
is empty, and deletes it - thread 1: assign
tag0
to the new post
Failure case 3:which leads to a foreign key failure, because the tag does not exist anymore when the assignment happens.
- thread 2: delete old post
- thread 1: create new post, which we want to have tag
tag0
- thread 1: try to create a new tag
tag0
, and succeed because it wasn't present - thread 2: find all tags with 0 posts, finds the tag that was just created
- thread 2: delete all tags with 0 posts, deleting the new tag
- thread 1: assign
tag0
to the new post
Sample executions:All executions use 2 threads.
node --unhandled-rejections=strict ./parallel_create_delete_empty_tag.js p 9 1000 'READ COMMITTED'
: PostgreSQL, 9 tags, DELETE/CREATE thetag0
test tag 1000 times, useREAD COMMITTED
Execution often fails, although not always. The failure is always:because the:error: insert or update on table "PostTag" violates foreign key constraint "PostTag_tagId_fkey"
tries to insert a tag that was deleted in the other thread, as it didn't have any corresponding posts, so this is the foreign key failure.INSERT INTO "PostTag"
TODO: we've never managed to observe the failure case in whichtag0
is deleted. Is it truly possible? And if not, by which guarantee?node --unhandled-rejections=strict ./parallel_create_delete_empty_tag.js p 9 1000 'READ COMMITTED' 'FOR UPDATE'
: do aSELECT ... FOR UPDATE
before trying toINSERT
.This is likely correct and the fastest correct method according to our quick benchmarking, about 20% faster thanREPEATABLE READ
.We are just now 100% sure it is corret becase we can't find out if theSELECT
in theDELETE
subquery could first select some rows, which are then locked by the tag creator, and only then locked byDELETE
after selection. Or does it re-evaludate theSELECT
even though it is in a subquery?node --unhandled-rejections=strict ./parallel_create_delete_empty_tag.js p 9 1000 'REPEATABLE READ'
: repeatable readWe've never observed any failures with this level. This should likely fix the foreign key issue according to the PostgreSQL docs, since:- the
DELETE "Post"
commit cannot start to be seen only in the middle of the thread 1 transaction - and then if DELETE happened, the thread 1 transaction will detect it, ROLLBACK, and re-run. TODO how does it detect the need rollback? Is it because of the foreign key? It is very hard to be sure about this kind of thing, just can't find the information. Related: postgreSQL serialization failure.
- the
node --unhandled-rejections=strict ./parallel_create_delete_empty_tag.js p 9 1000 'SERIALIZABLE'
: serializablenode --unhandled-rejections=strict ./parallel_create_delete_empty_tag.js p 9 1000 'NONE'
: magic value, don't use any transaction. Can blow up of course, since even less restrictions thanREAD COMMITTED
Some theoretical notes:
- Failure case 3 is averted by a
READ COMMITTED
transaction, because thread 2 won't see the uncommitted tag that thread 1 created, and therefore won't be able to delete it
stackoverflow.com/questions/10935850/when-to-use-select-for-update from SELECT FOR UPDATE also talks about a similar example, and has relevant answers.
nodejs/sequelize/raw/parallel_create_delete_empty_tag.js
#!/usr/bin/env node
// https://cirosantilli.com/file/sequelize/raw/parallel_create_delete_empty_tag.js
const assert = require('assert')
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads')
const { DataTypes, Op } = require('sequelize')
const common = require('../common')
const sequelizes = common.sequelizes(2, __filename, process.argv[2])
const sequelize = sequelizes[0]
const sequelize2 = sequelizes[1]
let n
if (process.argv.length > 3) {
n = parseInt(process.argv[3], 10)
} else {
n = 10
}
let iters
if (process.argv.length > 4) {
iters = parseInt(process.argv[4], 10)
} else {
iters = 10
}
const isolation = process.argv[5]
const for_update = process.argv[6]
;(async () => {
try {
await common.drop(sequelize, 'PostTag')
await common.drop(sequelize, 'Tag')
await common.drop(sequelize, 'Post')
await Promise.all([
`CREATE TABLE "Post" (id INTEGER PRIMARY KEY, title TEXT NOT NULL UNIQUE)`,
`CREATE TABLE "Tag" (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE)`,
].map(s => sequelize.query(s)))
await sequelize.query(`
CREATE TABLE "PostTag" (
"postId" INTEGER NOT NULL,
"tagId" INTEGER NOT NULL,
PRIMARY KEY ("postId", "tagId"),
FOREIGN KEY ("postId") REFERENCES "Post"(id) ON DELETE CASCADE,
FOREIGN KEY ("tagId") REFERENCES "Tag"(id) ON DELETE CASCADE
)
`)
// Initialize database with post0 <-> tag0, so exactly one tag per post.
// Except for tag0, which has no posts to start with.
let arr = []
for (let i = 0; i < n; i++) {
arr.push(sequelize.query(`INSERT INTO "Post" VALUES (${i}, 'post${i}')`))
arr.push(sequelize.query(`INSERT INTO "Tag" VALUES (${i}, 'tag${i}')`))
}
await Promise.all(arr)
arr = []
for (let i = 1; i < n; i++) {
arr.push(sequelize.query(`INSERT INTO "PostTag" VALUES (${i}, ${i})`))
}
await Promise.all(arr)
// This thread will create a bunch of new posts with tag0
let rows, meta, done = false
;(async () => {
for (let i = 0; i < iters; i++) {
const postId = i + n
const newTagId = postId
const tagName = `tag0`
const postTitle = `post${postId}`
await sequelize.query(`INSERT INTO "Post" VALUES (${postId}, '${postTitle}')`)
await common.transaction(sequelize, isolation, async sequelize => {
let tagId;
if (for_update !== undefined) {
// If the tag is present, this SELECT will prevent it from being deleted in another thread.
;[rows, meta] = await sequelize.query(`SELECT id FROM "Tag" WHERE name = '${tagName}' ${for_update}`)
if (rows.length === 0) {
// If we insert a new tag, it cannot be seen from the DELETE thread until COMMIT,
// so it cannot be deleted either, and we are fine.
await sequelize.query(`INSERT INTO "Tag" VALUES (${newTagId}, '${tagName}')`)
tagId = newTagId
} else {
tagId = rows[0].id
}
} else {
if (sequelize.options.dialect === 'sqlite') {
await sequelize.query(`INSERT OR IGNORE INTO "Tag" VALUES (${newTagId}, '${tagName}')`)
} else {
await sequelize.query(`INSERT INTO "Tag" VALUES (${newTagId}, '${tagName}') ON CONFLICT DO NOTHING`)
}
// Now we need to get the tag id, because we don't know if a new one was inserted or not, and there's no
// way to get it back with the INSERT yet:
// https://stackoverflow.com/questions/13244393/sqlite-insert-or-ignore-and-return-original-rowid
;[rows, meta] = await sequelize.query(`SELECT id FROM "Tag" WHERE name = '${tagName}'`)
tagId = rows[0].id
}
await sequelize.query(`INSERT INTO "PostTag" VALUES (${postId}, ${tagId})`)
})
// Now check that the tag wasn't deleted by the other thread.
;[rows, meta] = await sequelize.query(`
SELECT
"Post".id AS id,
"Post".title AS title
FROM "Post"
INNER JOIN "PostTag"
ON "Post"."id" = "PostTag"."postId"
INNER JOIN "Tag"
ON "PostTag"."tagId" = "Tag".id
AND "Tag".name = '${tagName}'
ORDER BY "Post".id ASC
`)
assert.strictEqual(rows[0].title, postTitle)
// DELETE this new post just to keep its count at 0. This would be done from the other thread in the real example,
// but then it would be harder to synchronize things to get a large number of such conflict cases happening.
// We could do it by DELETEing all posts with a given tag from that thread however.
await sequelize.query(`DELETE FROM "Post" WHERE id = ${postId}`)
}
})().finally(() => { done = true })
// This thread will repeatedly delete all tags with 0 posts.
while (!done) {
await common.transaction(sequelize2, isolation, async sequelize2 => {
// PostgreSQL says that this acquires the SELECT FOR UPDATE lock.
// But it is hard to decide if this is part of the SQL standard or not.
await sequelize2.query(`
DELETE FROM "Tag"
WHERE "Tag".id IN (
SELECT
"Tag".id
FROM "Tag"
LEFT OUTER JOIN "PostTag"
ON "Tag".id = "PostTag"."tagId"
LEFT OUTER JOIN "Post"
ON "PostTag"."postId" = "Post".id
GROUP BY "Tag".id
HAVING COUNT("Post".id) = 0
)
`)
})
}
} catch (e) {
console.error(e)
}
await Promise.all(sequelizes.map(s => s.close()))
})();