MIT 6.5830(6.830) Lab4

简介

在Lab4之中需要为SimpleDB加入事务功能,主要实现严格二阶段锁。说实话,从这里开始实验变得难多了,和前面的真不是一个难度的。并且我这里还只是实现简单的页级锁,如果要实现行级锁的话更难,同时对于死锁检测也只是使用了最简单的超时重试,不知道后面的B+树和回滚恢复有多难。。希望能坚持做完吧。

在SimpleDB中,每个事务都会有一个Transaction对象,我们用TransactionId来唯一标识一个事务,TransactionIdTransaction对象创建时自动获取。事务开始前,会创建一个Transaction对象,TrasactionId 会被传入到 sql 执行树的每一个 operator 算子中,加锁时根据加锁页面、锁的类型、加锁的事务id去进行加锁。

二阶段锁

在系统中的每一个事务遵从封锁协议,封锁协议的一组规则规定事务何时可以对数据项们进行加锁、解锁。

对于两阶段封锁协议:两阶段封锁协议要求每个事务分两个节点提出加锁和解锁申请:

  • 增长阶段:事务可以获得锁,但不能释放锁;
  • 缩减阶段:事务可以释放锁,但不能获得新锁。

最初,事务处于增长阶段,事务根据需要获得锁。一旦该事务释放了锁,它就进入了缩减阶段,并且不能再发出加锁请求。

严格两阶段封锁协议不仅要求封锁是两阶段,还要求事务持有的所有排他锁必须在事务提交后方可释放。这个要求保证未提交事务所写的任何数据在该事务提交之前均已排他方式加锁,防止了其他事务读这些数据。

强两阶段封锁协议。它要求事务提交之前不释放任何锁。在该条件下,事务可以按其提交的顺序串行化。

锁转换:在两阶段封锁协议中,我们允许进行锁转换。我们用升级表示从共享到排他的转换,用降级表示从排他到共享的转换。锁升级只能发送在增长阶段,锁降级只能发生在缩减阶段。

Exercises

Exercise1 Granting Locks

练习一是整个事务实现的基础,之后所有的实验都基于这里所定义的锁获取与释放的方法。在二阶段封锁协议中,有两种锁分别是shared共享锁和exclusive排它锁,在文档中对什么时候需要获得什么锁进行了相关规定

  • 当事务需要读一个对象时,需要获得它的共享锁;
  • 当事务需要写一个对象时,需要获得它的排它锁;
  • 一个对象上能具有多个对象的共享锁;
  • 对于一个对象,只有一个事务能获得它的排它锁;
  • 如果在一个对象上只有一个事务具有共享锁,则它可以被升级为排它锁。

具体在实现时,为了方便,可以定义LockManager类来管理每一个Page对应的事务和锁,相关定义如下,使用一个Map来保存每一页所具有的事务,而对于事务和锁的关系同样使用一个Map去存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private enum LockType {
SHARED,
EXCLUSIVE
}
private static class Lock {
TransactionId tid;
LockType lockType;
Lock(TransactionId tid, LockType type) {
this.tid = tid;
this.lockType = type;
}
}
private class LockManager {
ConcurrentHashMap<PageId, ConcurrentHashMap<TransactionId, Lock>> pageLocks;
LockManager() {
this.pageLocks = new ConcurrentHashMap<>();
}
}

接下来实现获取锁acquireLock方法,按照之前的要求和具体代码实现,可以写出如下判断流程

  1. 如果该页上还没有锁,则新建事务-锁对应表并插入;
  2. 如果对应页已包含该事务的锁
    • 请求锁是共享锁的话,直接返回true
    • 请求锁是排它锁的话,若之前存的就是排它锁,则返回true;否则尝试进行升级,即看是否只有这一个锁
  3. 该页面上没有该事务的锁,遍历锁,若存在排它锁,则直接返回false,此时无法加锁;没有排它锁的话,此时只有共享锁可以加。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public synchronized boolean aquirLock(TransactionId tid, PageId pid, LockType lockType) {
String name = Thread.currentThread().getName();
if (!pageLocks.containsKey(pid)) {
ConcurrentHashMap<TransactionId, Lock> map = new ConcurrentHashMap<>();
map.put(tid, new Lock(tid, lockType));
pageLocks.put(pid, map);
System.out.println(name + ":current page " + pid.getPageNumber() +
" has no lock, create and add " + lockType + ", tid: " + tid.getId());
return true;
}
ConcurrentHashMap<TransactionId, Lock> map = pageLocks.get(pid);
if (map.containsKey(tid)) {
Lock lock = map.get(tid);
if (lockType == LockType.SHARED) {
System.out.println(name + ":current page " + pid.getPageNumber() +
" has SHARED lock, add " + lockType + ", tid: " + tid.getId());
return true;
}
if (lockType == LockType.EXCLUSIVE) {
if (lock.lockType == LockType.EXCLUSIVE) {
System.out.println(name + ":current page " + pid.getPageNumber() +
" has EXCLUSIVE lock, add " + lockType + ", tid: " + tid.getId());
return true;
}
if (map.size() == 1) {
lock.lockType = LockType.EXCLUSIVE;
map.put(tid, lock);
pageLocks.put(pid, map);
System.out.println(name + ":current page " + pid.getPageNumber() +
" upgrade lock " + lockType + ", tid: " + tid.getId());
return true;
} else {
//System.out.println(name + ":current page " + pid.getPageNumber() +
" cant upgrade lock " + lockType + ", tid: " + tid.getId());
return false;
}
}
}
for (Map.Entry<TransactionId, Lock> entry : map.entrySet()) {
if (entry.getValue().lockType == LockType.EXCLUSIVE) {
//System.out.println(name + ":current page " + pid.getPageNumber() +
" already has " + LockType.EXCLUSIVE + ", tid: " + tid.getId());
return false;
}
}
if (lockType == LockType.SHARED) {
map.put(tid, new Lock(tid, LockType.SHARED));
pageLocks.put(pid, map);
System.out.println(name + ":current page " + pid.getPageNumber() +
" has serval SHARED, add " + LockType.SHARED + ", tid: " + tid.getId());
return true;
}
if (lockType == LockType.EXCLUSIVE) {
if (map.size() != 0) return false;
System.out.println(name + ":current page " + pid.getPageNumber() +
" has no lock so " + LockType.EXCLUSIVE + ", tid: " + tid.getId());
map.put(tid, new Lock(tid, LockType.EXCLUSIVE));
pageLocks.put(pid, map);
return true;
}
return false;
}

释放锁,只用删除相应项就行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public synchronized boolean releaseLock(TransactionId tid, PageId pid) {
String name = Thread.currentThread().getName();
if (!pageLocks.containsKey(pid) || !pageLocks.get(pid).containsKey(tid)) return false;
ConcurrentHashMap<TransactionId, Lock> map = pageLocks.get(pid);
System.out.println(name + ":current trx release lock on " +
pid.getPageNumber() + " type is " + map.get(tid).lockType + ", tid: " + tid.getId());
map.remove(tid);
if (map.size() == 0) {
pageLocks.remove(pid);
} else {
pageLocks.put(pid, map);
}
this.notifyAll();
return true;
}

此时已经可以给页上锁了,而给页上锁的操作只需要在BufferPool.getPage()中实现即可,因为按照设计,无论什么操作的第一部都是调用这个方法。所以修改加上获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Page getPage(TransactionId tid, PageId pid, Permissions perm)
throws TransactionAbortedException, DbException {
// TODO: some code goes here
LockType lockType = LockType.SHARED;
if (perm == Permissions.READ_WRITE) lockType = LockType.EXCLUSIVE;
long l = System.currentTimeMillis();
long timeout = new Random().nextInt(200) + 100;
while (!lockManager.aquirLock(tid, pid, lockType)) {
if (System.currentTimeMillis() - l > timeout) {
transactionComplete(tid, false);
throw new TransactionAbortedException();
}
}
//.....
}

Exercise 2 Lock Lifetime

这个练习主要是确保之前所说的,那就是无论什么操作即插入、删除、遍历等都要先访问BufferPool.getPage()方法,注意这里要修改HeapFile中的insertTuple方法,因为之前每次访问一个页时都上了锁,但有时这个页满了就不需要再用了,但应该释放加上的锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public List<Page> insertTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException {
// TODO: some code goes here

List<Page> list = new ArrayList<>();
// System.out.println("total pages :" + numPages());
for (int i = 0; i < numPages(); i++) {
HeapPageId heapPageId = new HeapPageId(getId(), i);
HeapPage page = (HeapPage) Database.getBufferPool().getPage
(tid, heapPageId, Permissions.READ_WRITE);
//System.out.println("this is page " + page.getId().getPageNumber() +
", has " + page.getNumUnusedSlots() + " slots left, pages's dirty is " + page.isDirty());
if (page.getNumUnusedSlots() == 0) {
Database.getBufferPool().unsafeReleasePage(tid, heapPageId);
continue;
}
page.insertTuple(t);
list.add(page);
return list;
}
BufferedOutputStream bw = new BufferedOutputStream(new FileOutputStream(file, true));
byte[] emptyData = HeapPage.createEmptyPageData();
bw.write(emptyData);
bw.close();
// load into cache
HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid,
new HeapPageId(getId(), numPages() - 1), Permissions.READ_WRITE);
p.insertTuple(t);
list.add(p);
return null;
// not necessary for lab1
}

Exercise 3 Implementing NO STEAL

对一个页的修改只有在事务提交之后才会被写入回磁盘之上,这意味着我们可以从磁盘中读一个页来覆盖当前的脏页来回滚一个事务,而这里需要实现NO STEAL的协议,要求页置换策略不能换掉脏页。修改页面置换算法,如果缓冲池中全是脏页的话,直接抛异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private synchronized void evictPage() throws DbException {
// TODO: some code goes here
// not necessary for lab1
Deque<PageId> tmp =new ArrayDeque<>();
PageId pid = null;
while (!queue.isEmpty()) {
PageId p = queue.pollFirst();
//System.out.println("evit page ,cur page's dirty is " +p.getPageNumber() );
if (pages.get(p.hashCode()).isDirty() == null) {
pid = p;
break;
} else tmp.offerLast(p);
}
if (pid == null) throw new DbException("no page is not dirty");
while (!tmp.isEmpty()) queue.addFirst(tmp.pollLast());
try {
flushPage(pid);
lockManager.releasePageLocks(pid);
pages.remove(pid.hashCode());
} catch (IOException e) {
e.printStackTrace();
}
}

Exercise 4 Transactions

当一个查询计划结束之后,也就意味着事务的结束,此时在BufferPool中的transactionComplete()方法将会被调用,该方法将会处理两种情况,分别是commitabort,前者需要将与事务有关的脏页全部刷到磁盘上并释放锁;而后者则需要从磁盘中重新加载页覆盖脏页来实现回滚,同时释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private class LockManager {
public synchronized void commitPage(TransactionId tid) {
for (Map.Entry<PageId, ConcurrentHashMap<TransactionId, Lock>> next : pageLocks.entrySet()) {
if (next.getValue().containsKey(tid)) {
try {
flushPage(next.getKey());
releaseLock(tid, next.getKey());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
public synchronized void abortPage(TransactionId tid) {
for (Map.Entry<PageId, ConcurrentHashMap<TransactionId, Lock>> next : pageLocks.entrySet()) {
if (next.getValue().containsKey(tid)) {
PageId pid = next.getKey();
Catalog catalog = Database.getCatalog();
Page page = catalog.getDatabaseFile(pid.getTableId()).readPage(pid);
pages.put(pid.hashCode(), page);
releaseLock(tid, pid);
}
}
}
public void transactionComplete(TransactionId tid, boolean commit) {
// TODO: some code goes here
// not necessary for lab1|lab2
if (commit) {
lockManager.commitPage(tid);
} else {
lockManager.abortPage(tid);
// restorePages(tid);
}
}

Exercise 5 Deadlocks and Aborts

最后就是实现死锁的检测,如果出现死锁则直接抛异常。这里最简单的就是实现计时器,超时没有获得锁直接抛异常;而另一种方法就是实现事务的依赖图,当图中出现环时即代表出现死锁。这里只实现了最简单的超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Page getPage(TransactionId tid, PageId pid, Permissions perm)
throws TransactionAbortedException, DbException {
// TODO: some code goes here
LockType lockType = LockType.SHARED;
if (perm == Permissions.READ_WRITE) lockType = LockType.EXCLUSIVE;
long l = System.currentTimeMillis();
long timeout = new Random().nextInt(200) + 100;
while (!lockManager.aquirLock(tid, pid, lockType)) {
if (System.currentTimeMillis() - l > timeout) {
transactionComplete(tid, false);
throw new TransactionAbortedException();
}
}
//.....
}

踩坑记录

这个实验真的坑巨多无比,其中大部分都是因为之前的实验所产生的。因为加上了事务,所以很多之前写过的方法都要改,而改的过程中也会遗漏一些东西,造成隐藏极深的bug。

在我做最后一个实验的测试TransactionTestTwo时,即起两个线程进行事务操作,始终过不了,一直报expected <2> but get <1>的错误,我一开始是以为锁的获取有问题,然后一顿改没用,然后又修改提交和回滚的方法也没有用。后来我去看了测试的文件,发现他所进行的操作是先向页中初始化一个值为0的元组,然后一个线程取出来加1并删除原来的,另一个同样取出来加1并删除原来的,所以最后得到结果2。

为了找到bug在哪,我开始向代码中加入日志进行查看,后来终于发现了一些端倪,那就是有一个线程在死锁并回滚释放锁之后就不再获取锁了,这明显是不对的,同时在测试文件中也是一个死循环,如果捕获到死锁异常的话是会继续尝试获取锁的,再然后我就发现他根本没捕获到死锁异常。。最后我发现了在Insert.java中的fetchNext()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// TODO: some code goes here
if (called) {
return null;
}
called = true;
Tuple ans = new Tuple(td);
while (child.hasNext()) {
Tuple tuple = child.next();
try {
Database.getBufferPool().insertTuple(tid, tableId, tuple);
cnt ++;
} catch (Exception e) {
ans.setField(0, new IntField(cnt));
return ans;
}
}
ans.setField(0, new IntField(cnt));
return ans;
}

之前写的时候直接把所有异常都捕获了。。根本没抛出去,改成IOException就对了。

测试

通过所有测试,测试顺序为

1
2
3
4
5
6
7
8
9
ant runtest -Dtest=LockingTest
TransactionTest
AbortEvictionTest
DeadLockTest
ant runsystest -Dtest=TransactionTestOne
TransactionTestTwo
TransactionTestFive
TransactionTestTen
TransactionAllDirty

时间记录

开始时间:2022.12.3

结束时间:2022.12.4

耗时:2天