MIT 6.5830(6.830) Lab6

简介

lab6为实现完整的日志系统,主要利用日志来进行回滚与恢复。对于遗弃的事务要进行回滚,而当系统崩溃时,利用检查点,对于未提交的事务进行回滚,已提交的事务进行重做。

在日志系统实现时,最关键的是弄清日志的存储逻辑。在该系统中,所有的记录存储在一个文件之中,在LogFile.java中给出了如下说明,按照这些规定,我们可以画出图,根据这个图的话,使用RandomAccessFile就可随意的读取想要的数据,并根据给定的事务id进行回滚与恢复。

  • The first long integer of the file represents the offset of the last written checkpoint, or -1 if there are no checkpoints
  • All additional data in the log consists of log records. Log records are variable length.
  • Each log record begins with an integer type and a long integer transaction id.
  • Each log record ends with a long integer file offset representing the position in the log file where the record began.
  • There are five record types: ABORT, COMMIT, UPDATE, BEGIN, and CHECKPOINT
  • ABORT, COMMIT, and BEGIN records contain no additional data
  • UPDATE RECORDS consist of two entries, a before image and an after image. These images are serialized Page objects, and can be accessed with the LogFile.readPageData() and LogFile.writePageData() methods. See LogFile.print() for an example.
  • CHECKPOINT records consist of active transactions at the time the checkpoint was taken and their first log record on disk. The format of the record is an integer count of the number of transactions, as well as a long integer transaction id and a long integer first record offset for each active transaction.

image-20221213185051791

Exercises

按照手册,在BufferPool指定的位置加上日志操作。

Exercise 1 Rollback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void rollback(TransactionId tid)
throws NoSuchElementException, IOException {
synchronized (Database.getBufferPool()) {
synchronized (this) {
preAppend();
// TODO: some code goes here
Long firstLogRecord = tidToFirstLogRecord.get(tid.getId());
raf.seek(firstLogRecord);
Set<PageId> set = new HashSet<>();
while (true) {
try {
int type = raf.readInt();
long txid = raf.readLong();
switch (type) {
case UPDATE_RECORD :
Page beforeImage = readPageData(raf);
Page afterImage = readPageData(raf);
PageId pageId = beforeImage.getId();
if (txid == tid.getId() && !set.contains(pageId)) {
set.add(pageId);
Database.getBufferPool().removePage(pageId);
Database.getCatalog().getDatabaseFile(pageId.getTableId()).writePage(beforeImage);
}
break;
case CHECKPOINT_RECORD:
int txCnt = raf.readInt();
while (txCnt -- > 0) {
raf.readLong();
raf.readLong();
}
break;
default:
//others
break;
}
raf.readLong();
} catch (EOFException e) {
break;
}
}
}
}
}

Exercise 2 Recovery

未提交的事务全部回滚,而检查点之后提交的事务进行重做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
public void recover() throws IOException {
synchronized (Database.getBufferPool()) {
synchronized (this) {
recoveryUndecided = false;
// TODO: some code goes here
raf = new RandomAccessFile(logFile, "rw");
long ckp = raf.readLong();
Set<Long> committedId = new HashSet<>();
Map<Long, List<Page>> beforePages = new HashMap<>();
Map<Long, List<Page>> afterPages = new HashMap<>();
// if (ckp != -1L) {
// raf.seek(ckp);
// int type = raf.readInt();
// long txid = raf.readLong();
// System.out.println(type == CHECKPOINT_RECORD);
// int numTxs = raf.readInt();
// while (numTxs -- > 0) {
// raf.readLong();
// raf.readLong();
// }
// raf.readLong();
// }
while (true) {
try {
int type = raf.readInt();
long txid = raf.readLong();
System.out.println("ckp : " + ckp + ", txid : " + txid + ", type :" + type + ", curPos = " + (raf.getFilePointer() - 12));
switch (type) {
case UPDATE_RECORD:
Page beforeImage = readPageData(raf);
Page afterImage = readPageData(raf);
List<Page> l1 = beforePages.getOrDefault(txid, new ArrayList<>());
l1.add(beforeImage);
beforePages.put(txid, l1);
List<Page> l2 = afterPages.getOrDefault(txid, new ArrayList<>());
l2.add(afterImage);
afterPages.put(txid, l2);
break;
case COMMIT_RECORD:
committedId.add(txid);
break;
case CHECKPOINT_RECORD:
int numTxs = raf.readInt();
while (numTxs -- > 0) {
raf.readLong();
raf.readLong();
}
break;
default:
break;
}
//end
raf.readLong();

} catch (EOFException e) {
break;
}
}
for (long txid :beforePages.keySet()) {
if (!committedId.contains(txid)) {
List<Page> pages = beforePages.get(txid);
for (Page p : pages) {
Database.getCatalog().getDatabaseFile(p.getId().getTableId()).writePage(p);
}
}
}
for (long txid : committedId) {
if (afterPages.containsKey(txid)) {
List<Page> pages = afterPages.get(txid);
for (Page page : pages) {
Database.getCatalog().getDatabaseFile(page.getId().getTableId()).writePage(page);
}
}
}
}
}
}

测试

只有一个测试

1
ant runsystest -Dtest=LogTest

时间记录

开始时间:2022.12.14

结束时间:2022.12.14

耗时:1天