Ciro Santilli OurBigBook.com $£ Sponsor €¥ 中国独裁统治 China Dictatorship 新疆改造中心、六四事件、法轮功、郝海东、709大抓捕、2015巴拿马文件 邓家贵、低端人口、西藏骚乱
In this example, posts have tags. When a post is deleted, we check to see if there are now any empty tags, and now we want to delete any empty tags that the post deletion may have created.
If we are creating and deleting posts concurrently, a naive implementation might wrongly delete the tags of a newly created post.
This could be due to a concurrency issue of the following types.
Failure case 1:
  • thread 2: delete old post
  • thread 2: find all tags with 0 posts. Finds tag0 from the deleted old post which is now empty.
  • thread 1: create new post, which we want to have tag tag0
  • thread 1: try to create a new tag tag0, but don't because it already exists, this is done using SQLite's INSERT OR IGNORE INTO or PostgreSQL's INSERT ... ON CONFLICT DO NOTHING
  • thread 1: assign tag0 to the new post by adding an entry to the join table
  • thread 2: delete all tags with 0 posts. It still sees from its previous search that tag0 is empty, and deletes it, which then cascades into the join table
which would result in the new post incorrectly not having the tag0.
Failure case 2:
  • thread 2: delete old post
  • thread 2: find all tags with 0 posts
  • thread 1: create new post
  • thread 1: try to create a new tag tag0, but don't because it already exists
  • thread 2: delete all tags with 0 posts. It still sees from its previous search that tag0 is empty, and deletes it
  • thread 1: assign tag0 to the new post
which leads to a foreign key failure, because the tag does not exist anymore when the assignment happens.
Failure case 3:
  • thread 2: delete old post
  • thread 1: create new post, which we want to have tag tag0
  • thread 1: try to create a new tag tag0, and succeed because it wasn't present
  • thread 2: find all tags with 0 posts, finds the tag that was just created
  • thread 2: delete all tags with 0 posts, deleting the new tag
  • thread 1: assign tag0 to the new post
which leads to a foreign key failure, because the tag does not exist anymore when the assignment happens.
Sample executions:
  • node --unhandled-rejections=strict ./parallel_create_delete_empty_tag.js p 9 1000 'READ COMMITTED': PostgreSQL, 9 tags, DELETE/CREATE the tag0 test tag 1000 times, use READ COMMITTED
    Execution often fails, although not always. The failure is always:
    error: insert or update on table "PostTag" violates foreign key constraint "PostTag_tagId_fkey"
    because the:
    INSERT INTO "PostTag"
    tries to insert a tag that was deleted in the other thread, as it didn't have any corresponding posts, so this is the foreign key failure.
    TODO: we've never managed to observe the failure case in which tag0 is deleted. Is it truly possible? And if not, by which guarantee?
  • node --unhandled-rejections=strict ./parallel_create_delete_empty_tag.js p 9 1000 'READ COMMITTED' 'FOR UPDATE': do a SELECT ... FOR UPDATE before trying to INSERT.
    This is likely correct and the fastest correct method according to our quick benchmarking, about 20% faster than REPEATABLE READ.
    We are just now 100% sure it is corret becase we can't find out if the SELECT in the DELETE subquery could first select some rows, which are then locked by the tag creator, and only then locked by DELETE after selection. Or does it re-evaludate the SELECT even though it is in a subquery?
  • node --unhandled-rejections=strict ./parallel_create_delete_empty_tag.js p 9 1000 'REPEATABLE READ': repeatable read
    We've never observed any failures with this level. This should likely fix the foreign key issue according to the PostgreSQL docs, since:
    • the DELETE "Post" commit cannot start to be seen only in the middle of the thread 1 transaction
    • and then if DELETE happened, the thread 1 transaction will detect it, ROLLBACK, and re-run. TODO how does it detect the need rollback? Is it because of the foreign key? It is very hard to be sure about this kind of thing, just can't find the information. Related: postgreSQL serialization failure.
  • node --unhandled-rejections=strict ./parallel_create_delete_empty_tag.js p 9 1000 'SERIALIZABLE': serializable
  • node --unhandled-rejections=strict ./parallel_create_delete_empty_tag.js p 9 1000 'NONE': magic value, don't use any transaction. Can blow up of course, since even less restrictions than READ COMMITTED
All executions use 2 threads.
Some theoretical notes:
  • Failure case 3 is averted by a READ COMMITTED transaction, because thread 2 won't see the uncommitted tag that thread 1 created, and therefore won't be able to delete it
stackoverflow.com/questions/10935850/when-to-use-select-for-update from SELECT FOR UPDATE also talks about a similar example, and has relevant answers.
nodejs/sequelize/raw/parallel_create_delete_empty_tag.js
#!/usr/bin/env node

// https://cirosantilli.com/file/sequelize/raw/parallel_create_delete_empty_tag.js

const assert = require('assert')
const { Worker, isMainThread, parentPort, workerData } = require('worker_threads')
const { DataTypes, Op } = require('sequelize')
const common = require('../common')
const sequelizes = common.sequelizes(2, __filename, process.argv[2])
const sequelize = sequelizes[0]
const sequelize2 = sequelizes[1]

let n
if (process.argv.length > 3) {
  n = parseInt(process.argv[3], 10)
} else {
  n = 10
}
let iters
if (process.argv.length > 4) {
  iters = parseInt(process.argv[4], 10)
} else {
  iters = 10
}
const isolation = process.argv[5]
const for_update = process.argv[6]

;(async () => {
try {
await common.drop(sequelize, 'PostTag')
await common.drop(sequelize, 'Tag')
await common.drop(sequelize, 'Post')
await Promise.all([
  `CREATE TABLE "Post" (id INTEGER PRIMARY KEY, title TEXT NOT NULL UNIQUE)`,
  `CREATE TABLE "Tag" (id INTEGER PRIMARY KEY, name TEXT NOT NULL UNIQUE)`,
].map(s => sequelize.query(s)))
await sequelize.query(`
CREATE TABLE "PostTag" (
  "postId" INTEGER NOT NULL,
  "tagId" INTEGER NOT NULL,
  PRIMARY KEY ("postId", "tagId"),
  FOREIGN KEY ("postId") REFERENCES "Post"(id) ON DELETE CASCADE,
  FOREIGN KEY ("tagId") REFERENCES "Tag"(id) ON DELETE CASCADE
)
`)

// Initialize database with post0 <-> tag0, so exactly one tag per post.
// Except for tag0, which has no posts to start with.
let arr = []
for (let i = 0; i < n; i++) {
  arr.push(sequelize.query(`INSERT INTO "Post" VALUES (${i}, 'post${i}')`))
  arr.push(sequelize.query(`INSERT INTO "Tag" VALUES (${i}, 'tag${i}')`))
}
await Promise.all(arr)
arr = []
for (let i = 1; i < n; i++) {
  arr.push(sequelize.query(`INSERT INTO "PostTag" VALUES (${i}, ${i})`))
}
await Promise.all(arr)

// This thread will create a bunch of new posts with tag0
let rows, meta, done = false
;(async () => {
  for (let i = 0; i < iters; i++) {
    const postId = i + n
    const newTagId = postId
    const tagName = `tag0`
    const postTitle = `post${postId}`
    await sequelize.query(`INSERT INTO "Post" VALUES (${postId}, '${postTitle}')`)
    await common.transaction(sequelize, isolation, async sequelize => {
      let tagId;
      if (for_update !== undefined) {
        // If the tag is present, this SELECT will prevent it from being deleted in another thread.
        ;[rows, meta] = await sequelize.query(`SELECT id FROM "Tag" WHERE name = '${tagName}' ${for_update}`)
        if (rows.length === 0) {
          // If we insert a new tag, it cannot be seen from the DELETE thread until COMMIT,
          // so it cannot be deleted either, and we are fine.
          await sequelize.query(`INSERT INTO "Tag" VALUES (${newTagId}, '${tagName}')`)
          tagId = newTagId
        } else {
          tagId = rows[0].id
        }
      } else {
        if (sequelize.options.dialect === 'sqlite') {
          await sequelize.query(`INSERT OR IGNORE INTO "Tag" VALUES (${newTagId}, '${tagName}')`)
        } else {
          await sequelize.query(`INSERT INTO "Tag" VALUES (${newTagId}, '${tagName}') ON CONFLICT DO NOTHING`)
        }
        // Now we need to get the tag id, because we don't know if a new one was inserted or not, and there's no
        // way to get it back with the INSERT yet:
        // https://stackoverflow.com/questions/13244393/sqlite-insert-or-ignore-and-return-original-rowid
        ;[rows, meta] = await sequelize.query(`SELECT id FROM "Tag" WHERE name = '${tagName}'`)
        tagId = rows[0].id
      }
      await sequelize.query(`INSERT INTO "PostTag" VALUES (${postId}, ${tagId})`)
    })

    // Now check that the tag wasn't deleted by the other thread.
    ;[rows, meta] = await sequelize.query(`
SELECT
  "Post".id AS id,
  "Post".title AS title
FROM "Post"
INNER JOIN "PostTag"
  ON "Post"."id" = "PostTag"."postId"
INNER JOIN "Tag"
  ON "PostTag"."tagId" = "Tag".id
  AND "Tag".name = '${tagName}'
ORDER BY "Post".id ASC
`)
    assert.strictEqual(rows[0].title, postTitle)

    // DELETE this new post just to keep its count at 0. This would be done from the other thread in the real example,
    // but then it would be harder to synchronize things to get a large number of such conflict cases happening.
    // We could do it by DELETEing all posts with a given tag from that thread however.
    await sequelize.query(`DELETE FROM "Post" WHERE id = ${postId}`)
  }
})().finally(() => { done = true })

// This thread will repeatedly delete all tags with 0 posts.
while (!done) {
  await common.transaction(sequelize2, isolation, async sequelize2 => {
    // PostgreSQL says that this acquires the SELECT FOR UPDATE lock.
    // But it is hard to decide if this is part of the SQL standard or not.
    await sequelize2.query(`
DELETE FROM "Tag"
WHERE "Tag".id IN (
  SELECT
    "Tag".id
  FROM "Tag"
  LEFT OUTER JOIN "PostTag"
    ON "Tag".id = "PostTag"."tagId"
  LEFT OUTER JOIN "Post"
    ON "PostTag"."postId" = "Post".id
  GROUP BY "Tag".id
  HAVING COUNT("Post".id) = 0
)
`)
  })
}
} catch (e) {
  console.error(e)
}
await Promise.all(sequelizes.map(s => s.close()))
})();

Ancestors

  1. SQL parallel update example
  2. SQL isolation level example
  3. SQL transaction isolation level
  4. SQL transaction
  5. SQL feature
  6. SQL
  7. Relational database management system
  8. Relational database
  9. Database
  10. Software
  11. Computer
  12. Information technology
  13. Area of technology
  14. Technology
  15. Ciro Santilli's Homepage