PostgreSQL 源码解读(234)- 查询#127(NOT IN实现#5)
本节简单解释了PostgreSQL NOT IN在执行时写入临时表空间的实现。
测试数据如下:
[local]:5432 pg12@testdb=# select count(*) from tbl;
count
-------
1
(1 row)
Time: 6.009 ms
[local]:5432 pg12@testdb=# select count(*) from t_big_null;
count
----------
10000001
(1 row)
[local]:5432 pg12@testdb=#
一、数据结构
Tuplestorestate
Tuplestore相关操作的私有状态。
/*
* Possible states of a Tuplestore object. These denote the states that
* persist between calls of Tuplestore routines.
*/
typedef enum
{
TSS_INMEM, /* Tuples still fit in memory */
TSS_WRITEFILE, /* Writing to temp file */
TSS_READFILE /* Reading from temp file */
} TupStoreStatus;
/*
* Private state of a Tuplestore operation.
*/
struct Tuplestorestate
{
TupStoreStatus status; /* 状态枚举值;enumerated value as shown above */
int eflags; /* capability flags (OR of pointers' flags) */
bool backward; /* store extra length words in file? */
bool interXact; /* keep open through transactions? */
bool truncated; /* tuplestore_trim has removed tuples? */
int64 availMem; /* remaining memory available, in bytes */
int64 allowedMem; /* total memory allowed, in bytes */
int64 tuples; /* number of tuples added */
BufFile *myfile; /* underlying file, or NULL if none */
MemoryContext context; /* memory context for holding tuples */
ResourceOwner resowner; /* resowner for holding temp files */
/*
* These function pointers decouple the routines that must know what kind
* of tuple we are handling from the routines that don't need to know it.
* They are set up by the tuplestore_begin_xxx routines.
*
* (Although tuplestore.c currently only supports heap tuples, I've copied
* this part of tuplesort.c so that extension to other kinds of objects
* will be easy if it's ever needed.)
*
* Function to copy a supplied input tuple into palloc'd space. (NB: we
* assume that a single pfree() is enough to release the tuple later, so
* the representation must be "flat" in one palloc chunk.) state->availMem
* must be decreased by the amount of space used.
*/
void *(*copytup) (Tuplestorestate *state, void *tup);
/*
* Function to write a stored tuple onto tape. The representation of the
* tuple on tape need not be the same as it is in memory; requirements on
* the tape representation are given below. After writing the tuple,
* pfree() it, and increase state->availMem by the amount of memory space
* thereby released.
*/
void (*writetup) (Tuplestorestate *state, void *tup);
/*
* Function to read a stored tuple from tape back into memory. 'len' is
* the already-read length of the stored tuple. Create and return a
* palloc'd copy, and decrease state->availMem by the amount of memory
* space consumed.
*/
void *(*readtup) (Tuplestorestate *state, unsigned int len);
/*
* This array holds pointers to tuples in memory if we are in state INMEM.
* In states WRITEFILE and READFILE it's not used.
*
* When memtupdeleted > 0, the first memtupdeleted pointers are already
* released due to a tuplestore_trim() operation, but we haven't expended
* the effort to slide the remaining pointers down. These unused pointers
* are set to NULL to catch any invalid accesses. Note that memtupcount
* includes the deleted pointers.
*/
void **memtuples; /* array of pointers to palloc'd tuples */
int memtupdeleted; /* the first N slots are currently unused */
int memtupcount; /* number of tuples currently present */
int memtupsize; /* allocated length of memtuples array */
bool growmemtuples; /* memtuples' growth still underway? */
/*
* These variables are used to keep track of the current positions.
*
* In state WRITEFILE, the current file seek position is the write point;
* in state READFILE, the write position is remembered in writepos_xxx.
* (The write position is the same as EOF, but since BufFileSeek doesn't
* currently implement SEEK_END, we have to remember it explicitly.)
*/
TSReadPointer *readptrs; /* array of read pointers */
int activeptr; /* index of the active read pointer */
int readptrcount; /* number of pointers currently valid */
int readptrsize; /* allocated length of readptrs array */
int writepos_file; /* file# (valid if READFILE state) */
off_t writepos_offset; /* offset (valid if READFILE state) */
};
#define COPYTUP(state,tup) ((*(state)->copytup) (state, tup))
#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
#define READTUP(state,len) ((*(state)->readtup) (state, len))
#define LACKMEM(state) ((state)->availMem < 0)
#define USEMEM(state,amt) ((state)->availMem -= (amt))
#define FREEMEM(state,amt) ((state)->availMem += (amt))
TSReadPointer
tuplestore读指针
/*
* Possible states of a Tuplestore object. These denote the states that
* persist between calls of Tuplestore routines.
*/
typedef enum
{
TSS_INMEM, /* Tuples still fit in memory */
TSS_WRITEFILE, /* Writing to temp file */
TSS_READFILE /* Reading from temp file */
} TupStoreStatus;
/*
* State for a single read pointer. If we are in state INMEM then all the
* read pointers' "current" fields denote the read positions. In state
* WRITEFILE, the file/offset fields denote the read positions. In state
* READFILE, inactive read pointers have valid file/offset, but the active
* read pointer implicitly has position equal to the temp file's seek position.
*
* Special case: if eof_reached is true, then the pointer's read position is
* implicitly equal to the write position, and current/file/offset aren't
* maintained. This way we need not update all the read pointers each time
* we write.
*/
typedef struct
{
int eflags; /* capability flags */
bool eof_reached; /* read has reached EOF */
int current; /* next array index to read */
int file; /* temp file# */
off_t offset; /* byte offset in file */
} TSReadPointer;
BufFile
该数据结构表示包含一个或多个物理文件的buffered file(每一个通过fd.c管理的虚拟文件描述符进行访问)
/*
* We break BufFiles into gigabyte-sized segments, regardless of RELSEG_SIZE.
* The reason is that we'd like large BufFiles to be spread across multiple
* tablespaces when available.
* BufFiles会拆分为以几个GB为单位的segments而不管RELSEG_SIZE的大小.
* 原因是我们倾向于在可用时把很大的BufFiles在多个表空间中分布.
*/
#define MAX_PHYSICAL_FILESIZE 0x40000000
#define BUFFILE_SEG_SIZE (MAX_PHYSICAL_FILESIZE / BLCKSZ)
/*
* This data structure represents a buffered file that consists of one or
* more physical files (each accessed through a virtual file descriptor
* managed by fd.c).
* 该数据结构表示包含一个或多个物理文件的buffered file(每一个通过fd.c管理的虚拟文件描述符进行访问)
*/
struct BufFile
{
//集合中物理文件的数量
int numFiles; /* number of physical files in set */
/* all files except the last have length exactly MAX_PHYSICAL_FILESIZE */
//------- 除了最后一个文件,其他文件的大小为MAX_PHYSICAL_FILESIZE
//使用numFiles分配的数组
File *files; /* palloc'd array with numFiles entries */
//跨事务?
bool isInterXact; /* keep open over transactions? */
//脏数据?
bool dirty; /* does buffer need to be written? */
//是否只读?
bool readOnly; /* has the file been set to read only? */
//如共享,段文件的空间大小
SharedFileSet *fileset; /* space for segment files if shared */
//如共享,该BufFile的名称
const char *name; /* name of this BufFile if shared */
/*
* resowner is the ResourceOwner to use for underlying temp files. (We
* don't need to remember the memory context we're using explicitly,
* because after creation we only repalloc our arrays larger.)
* 用于临时文件的ResourceOwner
*/
ResourceOwner resowner;
/*
* "current pos" is position of start of buffer within the logical file.
* Position as seen by user of BufFile is (curFile, curOffset + pos).
* "current pos" 是逻辑文件中buffer的起始位置.
* BufFile用户看到的位置是((curFile, curOffset + pos))
*/
//文件索引,当前位置的第(0..n)部分
int curFile; /* file index (0..n) part of current pos */
//当前位置的偏移部分
off_t curOffset; /* offset part of current pos */
//buffer中的下一个R/W位置
int pos; /* next read/write position in buffer */
//buffer中的有效字节数
int nbytes; /* total # of valid bytes in buffer */
PGAlignedBlock buffer;
};
二、源码解读
tuplestore_puttupleslot
把接收到的tuple放到tuplestore中
/*
* Accept one tuple and append it to the tuplestore.
* 把接收到的tuple放到tuplestore中
*
* Note that the input tuple is always copied; the caller need not save it.
* 要注意的是输入元组通常已被拷贝,调用者不需要存储该元组。
*
* If the active read pointer is currently "at EOF", it remains so (the read
* pointer implicitly advances along with the write pointer); otherwise the
* read pointer is unchanged. Non-active read pointers do not move, which
* means they are certain to not be "at EOF" immediately after puttuple.
* This curious-seeming behavior is for the convenience of nodeMaterial.c and
* nodeCtescan.c, which would otherwise need to do extra pointer repositioning
* steps.
* 如果活动的读指针当前正处于EOF位置,那么仍会保留现状(读指针默认与写指针同步)。
* 否则的话,读指针是不变的。非活动读指针不会移动,意味着在puttuple后没有马上就处于EOF状态下。
* 这种看似奇怪的行为是便于nodeMaterial.c和nodeCtescan.c的处理,否则需要额外的指针重定位。
*
* tuplestore_puttupleslot() is a convenience routine to collect data from
* a TupleTableSlot without an extra copy operation.
* tuplestore_puttupleslot()例程不需要额外的拷贝动作从TupleTableSlot中收集数据。
*/
void
tuplestore_puttupleslot(Tuplestorestate *state,
TupleTableSlot *slot)
{
MinimalTuple tuple;
MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
/*
* Form a MinimalTuple in working memory
* 在工作内存中组装MinimalTuple
*/
tuple = ExecCopySlotMinimalTuple(slot);
USEMEM(state, GetMemoryChunkSpace(tuple));
tuplestore_puttuple_common(state, (void *) tuple);
MemoryContextSwitchTo(oldcxt);
}
tuplestore_puttuple_common
tuplestore_puttupleslot函数的实现
static void
tuplestore_puttuple_common(Tuplestorestate *state, void *tuple)
{
TSReadPointer *readptr;
int i;
ResourceOwner oldowner;
state->tuples++;
switch (state->status)
{
case TSS_INMEM:
/*
* Update read pointers as needed; see API spec above.
* 需要时更新读指针
*/
readptr = state->readptrs;
for (i = 0; i < state->readptrcount; readptr++, i++)
{
if (readptr->eof_reached && i != state->activeptr)
{
//已达末尾,且指针非活动,则设置相应的状态和位置
readptr->eof_reached = false;
readptr->current = state->memtupcount;
}
}
/*
* Grow the array as needed. Note that we try to grow the array
* when there is still one free slot remaining --- if we fail,
* there'll still be room to store the incoming tuple, and then
* we'll switch to tape-based operation.
* 需要时扩展数组大小.
* 注意:在仍有一个空闲slot剩余时尝试增大数组,如果失败仍有空间存储进入的元组,
* 然后切换至tape-based操作.
*/
if (state->memtupcount >= state->memtupsize - 1)
{
(void) grow_memtuples(state);
Assert(state->memtupcount < state->memtupsize);
}
/* Stash the tuple in the in-memory array */
//指向tuple
state->memtuples[state->memtupcount++] = tuple;
/*
* Done if we still fit in available memory and have array slots.
* 仍有可用内存并有数组slots,已完成所有工作,可返回.
*/
if (state->memtupcount < state->memtupsize && !LACKMEM(state))
return;
//否则的话,需要落盘
/*
* Nope; time to switch to tape-based operation. Make sure that
* the temp file(s) are created in suitable temp tablespaces.
* 切换至tape-base操作.
* 确保临时文件在合适的temp表空间中创建.
*/
PrepareTempTablespaces();
/* associate the file with the store's resource owner */
//关联文件与存储资源宿主
oldowner = CurrentResourceOwner;
CurrentResourceOwner = state->resowner;
state->myfile = BufFileCreateTemp(state->interXact);
CurrentResourceOwner = oldowner;
/*
* Freeze the decision about whether trailing length words will be
* used. We can't change this choice once data is on tape, even
* though callers might drop the requirement.
* 关于是否使用结尾长度字需要"冻结"此决定.
* 一旦数据落盘就不能改变此选择,即使调用者可能会放弃此要求.
*/
state->backward = (state->eflags & EXEC_FLAG_BACKWARD) != 0;
state->status = TSS_WRITEFILE;
dumptuples(state);
break;
case TSS_WRITEFILE:
/*
* Update read pointers as needed; see API spec above. Note:
* BufFileTell is quite cheap, so not worth trying to avoid
* multiple calls.
* 需要时更新读指针.
* 注意:BufFileTell执行效率很高,因此不值得尝试避免循环多次调用.
*/
readptr = state->readptrs;
for (i = 0; i < state->readptrcount; readptr++, i++)
{
if (readptr->eof_reached && i != state->activeptr)
{
readptr->eof_reached = false;
BufFileTell(state->myfile,
&readptr->file,
&readptr->offset);
}
}
//#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
WRITETUP(state, tuple);
break;
case TSS_READFILE:
/*
* Switch from reading to writing.
* 从读切换至写.
*/
if (!state->readptrs[state->activeptr].eof_reached)
BufFileTell(state->myfile,
&state->readptrs[state->activeptr].file,
&state->readptrs[state->activeptr].offset);
if (BufFileSeek(state->myfile,
state->writepos_file, state->writepos_offset,
SEEK_SET) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek in tuplestore temporary file: %m")));
state->status = TSS_WRITEFILE;
/*
* Update read pointers as needed; see API spec above.
* 需要时更新读指针.
*/
readptr = state->readptrs;
for (i = 0; i < state->readptrcount; readptr++, i++)
{
if (readptr->eof_reached && i != state->activeptr)
{
readptr->eof_reached = false;
readptr->file = state->writepos_file;
readptr->offset = state->writepos_offset;
}
}
//#define WRITETUP(state,tup) ((*(state)->writetup) (state, tup))
WRITETUP(state, tuple);
break;
default:
elog(ERROR, "invalid tuplestore state");
break;
}
}
void
BufFileTell(BufFile *file, int *fileno, off_t *offset)
{
*fileno = file->curFile;
*offset = file->curOffset + file->pos;
}
三、跟踪分析
执行SQL:
[local]:5432 pg12@testdb=# select * from tbl a where a.id not in (select b.id from t_big_null b);
启动gdb,进入断点
(gdb) b tuplestore_puttupleslot
Breakpoint 1 at 0xab9134: file tuplestore.c, line 712.
(gdb) c
Continuing.
Breakpoint 1, tuplestore_puttupleslot (state=0x1efec78, slot=0x1efd4e0) at tuplestore.c:712
712 MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
(gdb)
输入参数
(gdb) n
717 tuple = ExecCopySlotMinimalTuple(slot);
(gdb)
718 USEMEM(state, GetMemoryChunkSpace(tuple));
(gdb)
720 tuplestore_puttuple_common(state, (void *) tuple);
(gdb) p *state
$1 = {status = TSS_INMEM, eflags = 2, backward = false, interXact = false, truncated = false,
availMem = 4177840, allowedMem = 4194304, tuples = 0, myfile = 0x0, context = 0x1efce00, resowner = 0x1e5d308,
copytup = 0xaba7bd <copytup_heap>, writetup = 0xaba811 <writetup_heap>, readtup = 0xaba9d9 <readtup_heap>,
memtuples = 0x1f18ed0, memtupdeleted = 0, memtupcount = 0, memtupsize = 2048, growmemtuples = true,
readptrs = 0x1f056a0, activeptr = 0, readptrcount = 1, readptrsize = 8, writepos_file = 0, writepos_offset = 0}
(gdb) p *slot
$2 = {type = T_TupleTableSlot, tts_flags = 16, tts_nvalid = 0, tts_ops = 0xc3e780 <TTSOpsBufferHeapTuple>,
tts_tupleDescriptor = 0x7f16f33f5378, tts_values = 0x1efd550, tts_isnull = 0x1efd558, tts_mcxt = 0x1efce00,
tts_tid = {ip_blkid = {bi_hi = 0, bi_lo = 0}, ip_posid = 1}, tts_tableOid = 49155}
(gdb) p slot->tts_values[0]
$3 = 0
(gdb)
进入tuplestore_puttuple_common
(gdb) step
tuplestore_puttuple_common (state=0x1efec78, tuple=0x1f05ce8) at tuplestore.c:771
771 state->tuples++;
(gdb)
当前状态TSS_INMEM
(gdb) p state->status
$4 = TSS_INMEM
(gdb)
如需要,更新读指针(无需更新)
(gdb) n
773 switch (state->status)
(gdb)
780 readptr = state->readptrs;
(gdb)
781 for (i = 0; i < state->readptrcount; readptr++, i++)
(gdb) p *readptr
$5 = {eflags = 2, eof_reached = true, current = 0, file = 2139062143, offset = 9187201950435737471}
(gdb) n
783 if (readptr->eof_reached && i != state->activeptr)
(gdb) p state->readptrcount
$6 = 1
(gdb) p state->activeptr
$7 = 0
(gdb) n
781 for (i = 0; i < state->readptrcount; readptr++, i++)
(gdb)
如需要,扩展数组(实际不需要)
(gdb)
796 if (state->memtupcount >= state->memtupsize - 1)
(gdb) p state->memtupcount
$8 = 0
(gdb) p state->memtupsize - 1
$9 = 2047
(gdb) n
803 state->memtuples[state->memtupcount++] = tuple;
(gdb)
放入到内存中,返回
(gdb) n
808 if (state->memtupcount < state->memtupsize && !LACKMEM(state))
(gdb)
809 return;
(gdb)
退出函数
(gdb)
892 }
(gdb)
tuplestore_puttupleslot (state=0x1efec78, slot=0x1efd4e0) at tuplestore.c:722
722 MemoryContextSwitchTo(oldcxt);
(gdb)
723 }
(gdb)
ExecMaterial (pstate=0x1efd1b8) at nodeMaterial.c:149
149 ExecCopySlot(slot, outerslot);
(gdb)
使用ignore N遍后,state->status状态变为TSS_WRITEFILE
(gdb) ignore 4 4194303
Will ignore next 4194303 crossings of breakpoint 4.
(gdb) c
Continuing.
Breakpoint 3, tuplestore_puttuple_common (state=0x160ba38, tuple=0x7f2cd90cc0b0) at tuplestore.c:771
771 state->tuples++;
(gdb)
...
tuplestore_puttupleslot (state=0x160ba38, slot=0x160a2a0) at tuplestore.c:722
722 MemoryContextSwitchTo(oldcxt);
(gdb) c
Continuing.
Breakpoint 3, tuplestore_puttuple_common (state=0x160ba38, tuple=0x7f2cd90cc0e8) at tuplestore.c:771
771 state->tuples++;
(gdb) p *state
$9 = {status = TSS_WRITEFILE, eflags = 2, backward = false, interXact = false, truncated = false,
availMem = 3669944, allowedMem = 4194304, tuples = 4192545, myfile = 0x162ad80, context = 0x1609bc0,
resowner = 0x1579170, copytup = 0xaba7bd <copytup_heap>, writetup = 0xaba811 <writetup_heap>,
readtup = 0xaba9d9 <readtup_heap>, memtuples = 0x7f2cd914a050, memtupdeleted = 0, memtupcount = 0,
memtupsize = 65535, growmemtuples = false, readptrs = 0x1627590, activeptr = 0, readptrcount = 1,
readptrsize = 8, writepos_file = 0, writepos_offset = 0}
(gdb) n
773 switch (state->status)
(gdb)
841 readptr = state->readptrs;
(gdb)
842 for (i = 0; i < state->readptrcount; readptr++, i++)
(gdb)
844 if (readptr->eof_reached && i != state->activeptr)
(gdb)
842 for (i = 0; i < state->readptrcount; readptr++, i++)
(gdb)
853 WRITETUP(state, tuple);
(gdb)
854 break;
(gdb) p *state->myfile
$10 = {numFiles = 1, files = 0x7f2cd934c008, isInterXact = false, dirty = true, readOnly = false, fileset = 0x0,
name = 0x0, resowner = 0x1579170, curFile = 0, curOffset = 58687488, pos = 8156, nbytes = 8156, buffer = {
data = "\000\t\030\000\335\366?\000\016\000\000\000\001\000\000\t\030\000\336\366?\000\016\000\000\000\001\000\000\t\030\000\337\366?\000\016\000\000\000\001\000\000\t\030\000\340\366?\000\016\000\000\000\001\000\000\t\030\000\341\366?\000\016\000\000\000\001\000\000\t\030\000\342\366?\000\016\000\000\000\001\000\000\t\030\000\343\366?\000\016\000\000\000\001\000\000\t\030\000\344\366?\000\016\000\000\000\001\000\000\t\030\000\345\366?\000\016\000\000\000\001\000\000\t\030\000\346\366?\000\016\000\000\000\001\000\000\t\030\000\347\366?\000\016\000\000\000\001\000\000\t\030\000\350\366?\000\016\000\000\000\001\000\000\t\030\000\351\366?\000\016\000\000\000\001\000\000\t\030\000\352\366?\000\016\000\000\000\001\000\000\t\030\000"..., force_align_d = 1.7780737478550286e-307,
force_align_i64 = 18004352582551808}}
...
DONE
四、参考资料
N/A