关于php的共享内存的使用和研究之深入剖析swoole table

话说回来,究竟swoole的底层是怎么做到了使用行锁,来实现进程访问冲突解决与高性能的呢?这里确实值得研究一下。

首先来看一下swooletable中用来存储的基本数据结构swTableRow:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
typedef struct _swTableRow
{
sw_atomic_t lock;// 原子锁,所谓的效率更高的行锁,这个要等下看看了。
/**
* 1:used, 0:empty
*/
uint8_t active;//是否启用状态
/**
* next slot
*/
struct _swTableRow *next;//链表结构
/**
* Hash Key
*/
char key[SW_TABLE_KEY_SIZE];//大小64,意味着单哈希key的长度
char data[0];//真实数据
} swTableRow;

然后是用来遍历行的索引数据结构swTable_iterator:

1
2
3
4
5
6
typedef struct
{
uint32_t absolute_index;
uint32_t collision_index;
swTableRow *row;
} swTable_iterator;

然后是包含了多行内容的swTable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
typedef struct
{
swHashMap *columns;// 一个table,包含多列中的列信息
uint16_t column_num;
swLock lock;
uint32_t size;
uint32_t mask;
uint32_t item_size;
/**
* total rows that in active state(shm)
*/
sw_atomic_t row_num;
swTableRow **rows;// 一列包含多行,所以是个二维的数组
swMemoryPool *pool;
uint32_t compress_threshold;
swTable_iterator *iterator;
void *memory;
} swTable;

用来存储swooletable中每一列信息的swTableColumn:

1
2
3
4
5
6
7
8
9
10
typedef struct
{
uint8_t type; // 结构类型,可选是int、浮点、字符串
uint32_t size; // 声明的大小,
swString* name;
uint16_t index;
} swTableColumn;
// 此结构体即为执行$_swooleTable->column('ip',\swoole_table::TYPE_STRING, 64)时设置结构体中内容

几个对外暴露的api如下:

1
2
3
4
5
6
swTable* swTable_new(uint32_t rows_size);
int swTable_create(swTable *table);
void swTable_free(swTable *table);
int swTableColumn_add(swTable *table, char *name, int len, int type, int size);
swTableRow* swTableRow_set(swTable *table, char *key, int keylen, sw_atomic_t **rowlock);
swTableRow* swTableRow_get(swTable *table, char *key, int keylen, sw_atomic_t **rowlock);

先来看看创建swooletable的时候会发生什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
swTable* swTable_new(uint32_t rows_size)
{
// 隐含限制,单个swoole table 最大128M,还是挺狠的
if (rows_size >= 0x80000000)
{
rows_size = 0x80000000;
}
// 16进制转换,这应该也是文档里面说的,创建需要2的倍数的原因,比较好处理一些
else
{
uint32_t i = 10;
while ((1U << i) < rows_size)
{
i++;
}
rows_size = 1 << i;
}
// 统一申请内存
swTable *table = SwooleG.memory_pool->alloc(SwooleG.memory_pool, sizeof(swTable));
if (table == NULL)
{
return NULL;
}
// 给table创建锁,独一无二
if (swMutex_create(&table->lock, 1) < 0)
{
swWarn("mutex create failed.");
return NULL;
}
// 预创建迭代器
table->iterator = sw_malloc(sizeof(swTable_iterator));
if (!table->iterator)
{
swWarn("malloc failed.");
return NULL;
}
// 预创建存储列信息的哈希表,这里同样隐含了,最多32列的限制条件,同时制定了析构函数
table->columns = swHashMap_new(SW_HASHMAP_INIT_BUCKET_N, (swHashMap_dtor)swTableColumn_free);
if (!table->columns)
{
return NULL;
}
// 结构体变量初始化
table->size = rows_size;
table->mask = rows_size - 1;
bzero(table->iterator, sizeof(swTable_iterator));
table->memory = NULL;
return table;
}

我个人比较关注关于锁的这一块,所以看了下swMutex_create方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int swMutex_create(swLock *lock, int use_in_process)
{
int ret;
bzero(lock, sizeof(swLock));
lock->type = SW_MUTEX;
pthread_mutexattr_init(&lock->object.mutex.attr);
if (use_in_process == 1)
{
pthread_mutexattr_setpshared(&lock->object.mutex.attr, PTHREAD_PROCESS_SHARED);
}
if ((ret = pthread_mutex_init(&lock->object.mutex._lock, &lock->object.mutex.attr)) < 0)
{
return SW_ERR;
}
lock->lock = swMutex_lock;
lock->unlock = swMutex_unlock;
lock->trylock = swMutex_trylock;
lock->free = swMutex_free;
return SW_OK;
}

这里使用了posix thread中的用于线程同步的mutex函数来创建和初始化互斥锁。参照http://blog.sina.com.cn/s/blog_4176c2800100tabf.html 中的说明,这里swoole应该创建的是PTHREAD_MUTEX_TIMED_NP 普通锁,当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。

同时创建锁也给出了一个参数use_in_process, 如果是在进程间使用,那么意味着锁在进程间共享,这也就对应了swooletable的第一种使用方式:在server启动之前创建,否则就是我们上文中的使用方式:在每个进程中单独的使用。

注意,这里swoole table使用了互斥锁,这是阻塞的,当某线程无法获取互斥量时,该线程会被直接挂起,该线程不再消耗CPU时间,当其他线程释放互斥量后,操作系统会激活那个被挂起的线程,让其投入运行。由于table之间加锁的频率比较低,所以使用互斥锁是划算的。

再看下指定了swooletable中的列信息之后,进行swTable_create时发生了什么:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int swTable_create(swTable *table)
{
// 数据初始化
...
// 真正申请了共享内存,计算出了最终需要的大小
void *memory = sw_shm_malloc(memory_size);
if (memory == NULL)
{
return SW_ERR;
}
// 变量初始化
...
}

最后看一下我们最关注的,对于行内容的get、set、del:

先看get方法,每次get,都更新一下自旋锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
swTableRow* swTableRow_get(swTable *table, char *key, int keylen, sw_atomic_t **rowlock)
{
//参数校验
...
// 根据哈希算法获取相应的行
swTableRow *row = swTable_hash(table, key, keylen);
// 获取行中存储的初始的原子锁
sw_atomic_t *lock = &row->lock;
// 对应swSpinLock_create方法,其中调用pthread_spin_init进行自旋锁初始化
sw_spinlock(lock);
// 自旋锁赋值
*rowlock = lock;
// 遍历table,找对应的列中的行
...
}

再看set方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
swTableRow* swTableRow_set(swTable *table, char *key, int keylen, sw_atomic_t **rowlock)
{
//参数校验
...
// 更新自旋锁
swTableRow *row = swTable_hash(table, key, keylen);
sw_atomic_t *lock = &row->lock;
sw_spinlock(lock);
*rowlock = lock;
if (row->active)
{
for (;;)
{
if (strncmp(row->key, key, keylen) == 0)
{
break;
}
else if (row->next == NULL)
{
//!!! 锁住table
table->lock.lock(&table->lock);
swTableRow *new_row = table->pool->alloc(table->pool, 0);
// !!! 创建完成,解锁table
table->lock.unlock(&table->lock);
if (!new_row)
{
return NULL;
}
//add row_num
bzero(new_row, sizeof(swTableRow));
// 多线程全局变量自加,确保行数全局唯一,对应__sync_fetch_and_add方法!!
sw_atomic_fetch_add(&(table->row_num), 1);
row->next = new_row;
row = new_row;
break;
}
else
{
row = row->next;
}
}
}
else
{
// 多线程全局变量自加,确保行数全局唯一,对应__sync_fetch_and_add方法!!
sw_atomic_fetch_add(&(table->row_num), 1);
}
memcpy(row->key, key, keylen);
row->active = 1;
return row;
}

del方法也比较类似的,这里就不讲了,仔细看看还是很有意思。核心点在于:

  • 对互斥锁、自旋锁的灵活使用
  • 对多线程下的全局变量处理
  • 对共享内存的把控与操作
  • 对内存的分配与正确回收

swoole的源码的确有很多可取之处,涉及到了很多系统和存储的基本的只是,非常值得学习。
那么,关于php使用本机存储系列,也就到此为止吧!

热评文章