MIT 6.5830(6.830) Lab2

简介

Lab2主要要求实现的是一系列operators,包括了连接、筛选、插入和删除,在此基础之上实现聚簇函数,同时也需要给HeapFileBufferPool提供可以修改页内容和磁盘内容的方法,最后修改缓冲池中页的增加方法,在之前当池中页的数量超过最大容量时直接丢弃,而本实验中则需实现页面替换策略。

正如上一次实验最后的Exercise中所说

SimpleDB中所有的操作operators对象都需要实现DbIterator,它整个将会以一个树形的模式运作,通过将较低级别的运算符传递到较高级别的运算符的构造函数中,将运算符连接到一个执行计划中。在计划的顶部,与SimpleDB交互的程序只需在根运算符上调用getNext();然后,该运算符对其子级调用getNext(),依此类推,直到调用调用到叶运算符。它们从磁盘获取元组并将其传递到树(作为getNext()的返回参数);元组以这种方式向上传播计划,直到它们在根处被输出,或者被计划中的另一个运算符组合。

观察某一个test文件中对操作符的调用也可以发现这一点,是层层嵌套的

1
2
3
4
5
6
7
		SeqScan ss = new SeqScan(tid, table.getId(), "");
Filter filter = new Filter(predicate, ss);
Delete deleteOperator = new Delete(tid, filter);
// Query q = new Query(deleteOperator, tid);

// q.start();
deleteOperator.open();

只要理解了这一点,操作符还是很好实现的。

Exercises

Exercise1 Filter and Join

这里主要实现一个过滤和多表的连接操作,在文档中也说了,可以参考已经实现的投影Project和排序OrderBy来实现。在实现时,先实现PredicateJoinPredicate,这两个类主要根据给定的运算符和要比较的Field来判断是否可以加入结果集之中,对于连接操作也是判断两个元组是否满足连接条件。实现了这两个类之后,则要实现JoinFilter,对于过滤操作,只需要不断的读下一个元组判断是否满足条件即可。而对于连接操作,最简单的实现方法就是两层循环,分别判断两个表中的每两个元组是否满足连接条件,而这里还有其他的许多实现方法,比如可以使用索引、哈希、分块等方法,减少时间和空间的消耗。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//Filter.java
protected Tuple fetchNext() throws NoSuchElementException,
TransactionAbortedException, DbException {
// TODO: some code goes here
while (child.hasNext()) {
Tuple t = child.next();
Tuple newTuple = new Tuple(this.td);
newTuple.setRecordId(t.getRecordId());
for (int i = 0; i < td.numFields(); i++) {
newTuple.setField(i, t.getField(i));
}
if (this.p.filter(newTuple)) return newTuple;
}
return null;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//Join.java
protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// TODO: some code goes here
while (child1.hasNext() || curLeft != null) {
if (child1.hasNext() && curLeft == null) curLeft = child1.next();
while (child2.hasNext()) {
Tuple t1 = null;
if (curLeft == null) t1 = child1.next();
else t1 = curLeft;
Tuple t2 = child2.next();
if (p.filter(t1, t2)) {
TupleDesc td = getTupleDesc();
Tuple t = new Tuple(td);
for (int i = 0; i < t1.getTupleDesc().numFields(); i++) {
t.setField(i, t1.getField(i));
}
for (int i = 0; i < t2.getTupleDesc().numFields(); i++) {
t.setField(i + t1.getTupleDesc().numFields(), t2.getField(i));
}
return t;
}
}
curLeft = null;
child2.rewind();
}
return null;
}

Exercise 2 Aggregates

实现聚簇函数,在SimpleDb中只有IntString两种数据类型,所以对于Int类型的数据要实现MAX, MIN, AVG, COUNT,SUM,而对于String的数据则只需实现COUNT即可,这里也是按照文档,分别实现了两个帮助类IntegerAggregatorStringAggregator,主要实现mergeTupleIntoGroup,即将读取到的当前元组按照分组和计算规则进行分类合并,iterator返回分组计算结果的迭代器。对于返回结果的形式文档也给了说明,如果进行了GroupBy操作,则需返回两列(groupValue, aggregateValue),而没有经过分组的话只需返回全部元组的结果一列(aggregateValue)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
//IntegerAggregator.java
private static final long serialVersionUID = 1L;
private int gbfield;
private Type gbfieldtype;
private int afield;
private Op op;
private Map<Field, Integer> group;
private Map<Field, Integer> avgcnt;
private Integer nogroup;
private Integer nogroupCnt;
public void mergeTupleIntoGroup(Tuple tup) {
// TODO: some code goes here
Field field = tup.getField(gbfield);
IntField af =(IntField) tup.getField(afield);
int v = af.getValue();
if (op == Op.MIN) {
if (gbfieldtype == null) {
nogroup = nogroup == null ? v : Math.min(nogroup, v);
}
else {
if (group.containsKey(field)) {
group.put(field, Math.min(group.get(field), v));
} else {
group.put(field, v);
}
}
}
if (op == Op.MAX) {
if (gbfieldtype == null) {
nogroup = nogroup == null ? v : Math.max(nogroup, v);
}
else {
if (group.containsKey(field)) {
group.put(field, Math.max(group.get(field), v));
} else {
group.put(field, v);
}
}
}
if (op == Op.COUNT) {
if (gbfieldtype == null) {
nogroup = nogroup == null ? 1 : nogroup + 1;
}
else group.put(field, group.getOrDefault(field, 0) + 1);
}
if (op == Op.SUM) {
if (gbfieldtype == null) {
nogroup = nogroup == null ? v : nogroup + v;
}
else group.put(field, group.getOrDefault(field, 0) + v);
}
if (op == Op.AVG) {
if (gbfieldtype == null) {
nogroup = nogroup == null ? v : nogroup + v;
nogroupCnt ++;
}
else {
group.put(field, group.getOrDefault(field, 0) + v);
avgcnt.put(field, avgcnt.getOrDefault(field, 0) + 1);
}
}

}
public OpIterator iterator() {
// TODO: some code goes here

return new OpIterator() {
private TupleDesc td;
private Iterator<Map.Entry<Field, Integer>> itgroup;
private Iterator<Map.Entry<Field, Integer>> itcnt;
private boolean isNoGroupRead;
@Override
public void open() throws DbException, TransactionAbortedException {
itgroup = group.entrySet().iterator();
itcnt = avgcnt.entrySet().iterator();
isNoGroupRead = false;
if (gbfieldtype == null) {
Type []t = new Type[]{Type.INT_TYPE};
String []n = new String[]{"aggregateVal"};
td = new TupleDesc(t, n);
} else {
Type []t = new Type[]{gbfieldtype, Type.INT_TYPE};
String []n = new String[]{"groupVal", "aggregateVal"};
td = new TupleDesc(t, n);
}

}

@Override
public boolean hasNext() throws DbException, TransactionAbortedException {
return gbfieldtype == null ? !isNoGroupRead : itgroup.hasNext();
}

@Override
public Tuple next() throws DbException, TransactionAbortedException, NoSuchElementException {
Tuple tuple = new Tuple(td);
if (!hasNext()) throw new NoSuchElementException();
if (op == Op.AVG) {
if (gbfieldtype == null) {
int ans = nogroup / nogroupCnt;
tuple.setField(0, new IntField(ans));
isNoGroupRead = true;
} else {
Map.Entry<Field, Integer> grpupNext = itgroup.next();
int val = grpupNext.getValue();
int cnt = avgcnt.get(grpupNext.getKey());
int ans = val / cnt;
tuple.setField(0, grpupNext.getKey());
tuple.setField(1, new IntField(ans));
}
} else {
if (gbfieldtype == null) {
tuple.setField(0, new IntField(nogroup));
isNoGroupRead = true;
}
else {
Map.Entry<Field, Integer> grpupNext = itgroup.next();
tuple.setField(0, grpupNext.getKey());
tuple.setField(1, new IntField(grpupNext.getValue()));
}
}
return tuple;
}

@Override
public void rewind() throws DbException, TransactionAbortedException {
close();
open();
}

@Override
public TupleDesc getTupleDesc() {
return td;
}

@Override
public void close() {
td = null;
itgroup = null;
itcnt = null;
isNoGroupRead = true;
}
};
}

StringAggregator与之类似

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
//Aggregate.java
private static final long serialVersionUID = 1L;
private OpIterator child;
private Aggregator aggtor;
private int afield;
private int gfield;
private Aggregator.Op op;
private TupleDesc td;
private TupleDesc rtd;
private OpIterator oit;

public Aggregate(OpIterator child, int afield, int gfield, Aggregator.Op aop) {
// TODO: some code goes here
this.child = child;
this.afield = afield;
this.gfield = gfield;
this.op = aop;
this.td = child.getTupleDesc();
if (td.getFieldType(afield) == Type.INT_TYPE)
this.aggtor = new IntegerAggregator(gfield, gfield == -1 ? null : td.getFieldType(gfield), afield, op);
if (td.getFieldType(afield) == Type.STRING_TYPE)
this.aggtor = new StringAggregator(gfield, gfield == -1 ? null : td.getFieldType(gfield), afield, op);
this.oit = null;
if (gfield == -1) {
Type []t = new Type[]{Type.INT_TYPE};
String []n = new String[]{td.getFieldName(afield)};
rtd = new TupleDesc(t, n);
} else {
Type []t = new Type[]{td.getFieldType(gfield), Type.INT_TYPE};
String []n = new String[]{td.getFieldName(gfield), td.getFieldName(afield)};
rtd = new TupleDesc(t, n);
}
}
public void open() throws NoSuchElementException, DbException,
TransactionAbortedException {
// TODO: some code goes here
child.open();
while (child.hasNext()) {
Tuple next = child.next();
aggtor.mergeTupleIntoGroup(next);
}
oit = aggtor.iterator();
oit.open();
super.open();
}
protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// TODO: some code goes here
while (oit.hasNext()) return oit.next();
return null;
}

Exercise 3 HeapFile Mutability

在这个实验中,主要实现对Tuple的插入和删除操作,这两个操作需要在HeapPageHeapFileBufferpool中全部实现,同时,也可以实现HeapFile中的writePage方法,这个方法将就是将指定页号的Page写入到磁盘的文件之中。这里的插入和删除操作的调用逻辑是这样的,实际上是BufferPool调用HeapFile中的InsertTupleDeleteTuple方法,然后HeapFile在调用对应Page提供的方法,最后返回给BufferPool所以改变的页,进行更新。

对于InsertTuple方法, 主要是在每一页中查找空闲的slot,可以根据header来找,如果没有的话就需要新创建一页并接入到HeapFile之中。

DeleteTuple方法的话是查找元组并删除,可以根据给定的RecordId进行查找,并将header中对应位置为0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//HeapPage.java
public void deleteTuple(Tuple t) throws DbException {
// TODO: some code goes here
// not necessary for lab1
if (getNumUnusedSlots() == getNumTuples()) throw new DbException("empty");

int tupleNumber = t.getRecordId().getTupleNumber();
if (tuples[tupleNumber] != null) {
markSlotUsed(tupleNumber, false);
tuples[tupleNumber] = null;
return;
}
throw new DbException("no tuple");
}
public void insertTuple(Tuple t) throws DbException {
// TODO: some code goes here
// not necessary for lab1

if (getNumUnusedSlots() == 0) throw new DbException("full");
if (t.getTupleDesc() == null) throw new DbException("no desc");
for (int i = 0; i < getNumTuples(); i++) {
if (!isSlotUsed(i)) {
t.setRecordId(new RecordId(pid, i));
tuples[i] = t;
markSlotUsed(i, true);
return;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//HeapFile.java
public void writePage(Page page) throws IOException {
// TODO: some code goes here
// not necessary for lab1
int pageNumber = page.getId().getPageNumber();
if (pageNumber > numPages()) throw new IOException();
RandomAccessFile r = new RandomAccessFile(file, "rw");
r.seek(pageNumber * BufferPool.getPageSize());
r.write(page.getPageData());
r.close();
}
public List<Page> insertTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException {
// TODO: some code goes here

List<Page> list = new ArrayList<>();

for (int i = 0; i < numPages(); i++) {
HeapPage page = (HeapPage) Database.getBufferPool().getPage
(tid, new HeapPageId(getId(), i), Permissions.READ_WRITE);
try {
page.insertTuple(t);
} catch (DbException e) {
continue;
}
list.add(page);
return list;
}
BufferedOutputStream bw = new BufferedOutputStream(new FileOutputStream(file, true));
byte[] emptyData = HeapPage.createEmptyPageData();
bw.write(emptyData);
bw.close();
// load into cache
HeapPage p = (HeapPage) Database.getBufferPool().getPage(tid,
new HeapPageId(getId(), numPages() - 1), Permissions.READ_WRITE);
p.insertTuple(t);
list.add(p);
return null;

// not necessary for lab1
}
public List<Page> deleteTuple(TransactionId tid, Tuple t) throws DbException,
TransactionAbortedException {
// TODO: some code goes here
// not necessary for lab1
HeapPage page = (HeapPage) Database.getBufferPool().getPage
(tid, t.getRecordId().getPageId(), Permissions.READ_WRITE);
try {
page.deleteTuple(t);
} catch (DbException e) {
e.printStackTrace();
return null;
}
List<Page> list = new ArrayList<>();
list.add(page);
return list;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//BufferPool.java
public void insertTuple(TransactionId tid, int tableId, Tuple t)
throws DbException, IOException, TransactionAbortedException {
// TODO: some code goes here
// not necessary for lab1
HeapFile databaseFile = (HeapFile) Database.getCatalog().getDatabaseFile(tableId);
List<Page> list = databaseFile.insertTuple(tid, t);
if (list == null || list.size() == 0) throw new DbException("");
updatePage(list, tid);
}
public void deleteTuple(TransactionId tid, Tuple t)
throws DbException, IOException, TransactionAbortedException {
// TODO: some code goes here
// not necessary for lab1
List<Page> list = Database.getCatalog().getDatabaseFile(t.getRecordId().getPageId().getTableId()).deleteTuple(tid, t);
if (list == null || list.size() == 0) throw new DbException("");
updatePage(list, tid);
}
public void updatePage(List<Page> list, TransactionId tid) throws DbException {
if (list != null) {
for (Page p : list) {
p.markDirty(true, tid);
if (pages.size() == maxSize) evictPage();
pages.put(p.getId().hashCode(), p);
queue.offer(p.getId());
}
}
}

Exercise 4 Insertion and deletion

在完成了基本的方法之后,就可以实现完整的插入和删除操作,具体实现与之前写的JoinFilter类似,只不过这里需要注意一个操作不能被调用两次,第二次被调用就返回null,同时返回的结果是一个Tuple,记录被影响的行数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//Insert.java
private static final long serialVersionUID = 1L;
private TransactionId tid;
private OpIterator child;
private int tableId;
private TupleDesc td;
private int cnt;
private boolean called;
public Insert(TransactionId t, OpIterator child, int tableId)
throws DbException {
// TODO: some code goes here
if (!child.getTupleDesc().equals(Database.getCatalog().getTupleDesc(tableId))) throw new DbException("not match");
this.tid = t;
this.child = child;
this.tableId = tableId;
Type []types = new Type[]{Type.INT_TYPE};
String []s = new String[]{"unnamed"};
this.td = new TupleDesc(types, s);
this.cnt = 0;
this.called = false;
}
protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// TODO: some code goes here
if (called) {
return null;
}
called = true;
Tuple ans = new Tuple(td);
while (child.hasNext()) {
Tuple tuple = child.next();
try {
Database.getBufferPool().insertTuple(tid, tableId, tuple);
cnt ++;
} catch (Exception e) {
ans.setField(0, new IntField(cnt));
return ans;
}

}
ans.setField(0, new IntField(cnt));
return ans;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//Delete.java
private static final long serialVersionUID = 1L;
private TransactionId tid;
private OpIterator child;
private TupleDesc td;
private int cnt;
private boolean called;
protected Tuple fetchNext() throws TransactionAbortedException, DbException {
// TODO: some code goes here
if (called) return null;
called = true;
Tuple ans = new Tuple(td);
while (child.hasNext()) {
Tuple next = child.next();
try {
Database.getBufferPool().deleteTuple(tid, next);
cnt ++;
} catch (Exception e) {
ans.setField(0, new IntField(cnt));
return ans;
}
}
ans.setField(0, new IntField(cnt));
return ans;
}

Exercise 5 Page eviction

最后要实现的就是BufferPool中的页面置换策略。在实现之前,需要先完成flushPageflushAllPage,这两个方法负责将标记为dirty 的页刷到磁盘上,然后保证他们是最新的之后就可以进行替换。页面置换策略有很多,这与操作系统中分页的置换一样,我这里图省事就用了队列实现了最简单的FIFO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//BufferPool.java
private Map<Integer, Page> pages;
private Queue<PageId> queue;
private synchronized void flushPage(PageId pid) throws IOException {
// TODO: some code goes here
// not necessary for lab1
Page page = pages.get(pid.hashCode());
TransactionId tid = page.isDirty();
if (tid != null) {
Database.getCatalog().getDatabaseFile(pid.getTableId()).writePage(page);
page.markDirty(false, null);
}
}
public synchronized void flushAllPages() throws IOException {
// TODO: some code goes here
// not necessary for lab1
for(Map.Entry<Integer, Page> entry : pages.entrySet()) {
flushPage(entry.getValue().getId());
}
}
private synchronized void evictPage() throws DbException {
// TODO: some code goes here
// not necessary for lab1
PageId pid = queue.poll();
try {
assert pid != null;
flushPage(pid);
} catch (IOException e) {
e.printStackTrace();
}
pages.remove(pid.hashCode());
}

测试

所有测试全部通过,测试顺序为

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ant runtest -Dtest=PredicateTest
JoinPredicateTest
FilterTest
JoinTest
ant runsystest -Dtest=FilterTest
JoinTest
ant runtest -Dtest=IntegerAggregatorTest
StringAggregatorTest
AggregateTest
ant runsystest -Dtest=AggregateTest
ant runtest -Dtest=HeapPageWriteTest
HeapFileWriteTest
BufferPoolWriteTest
InsertTest
ant runsystest -Dtest=InsertTest
DeleteTest
EvictionTest

时间记录

开始时间:2022.11.28

结束时间:2022.11.29

耗费:2天