From c04fecb4d9f7753e0cbff7edd03ec68f8721cdce Mon Sep 17 00:00:00 2001 From: David Teigland Date: Thu, 10 May 2012 10:18:07 -0500 Subject: dlm: use rsbtbl as resource directory Remove the dir hash table (dirtbl), and use the rsb hash table (rsbtbl) as the resource directory. It has always been an unnecessary duplication of information. This improves efficiency by using a single rsbtbl lookup in many cases where both rsbtbl and dirtbl lookups were needed previously. This eliminates the need to handle cases of rsbtbl and dirtbl being out of sync. In many cases there will be memory savings because the dir hash table no longer exists. Signed-off-by: David Teigland --- fs/dlm/dir.c | 287 ++++++++++++++--------------------------------------------- 1 file changed, 69 insertions(+), 218 deletions(-) (limited to 'fs/dlm/dir.c') diff --git a/fs/dlm/dir.c b/fs/dlm/dir.c index dc5eb598b81..278a75cda44 100644 --- a/fs/dlm/dir.c +++ b/fs/dlm/dir.c @@ -23,50 +23,6 @@ #include "lock.h" #include "dir.h" - -static void put_free_de(struct dlm_ls *ls, struct dlm_direntry *de) -{ - spin_lock(&ls->ls_recover_list_lock); - list_add(&de->list, &ls->ls_recover_list); - spin_unlock(&ls->ls_recover_list_lock); -} - -static struct dlm_direntry *get_free_de(struct dlm_ls *ls, int len) -{ - int found = 0; - struct dlm_direntry *de; - - spin_lock(&ls->ls_recover_list_lock); - list_for_each_entry(de, &ls->ls_recover_list, list) { - if (de->length == len) { - list_del(&de->list); - de->master_nodeid = 0; - memset(de->name, 0, len); - found = 1; - break; - } - } - spin_unlock(&ls->ls_recover_list_lock); - - if (!found) - de = kzalloc(sizeof(struct dlm_direntry) + len, GFP_NOFS); - return de; -} - -void dlm_clear_free_entries(struct dlm_ls *ls) -{ - struct dlm_direntry *de; - - spin_lock(&ls->ls_recover_list_lock); - while (!list_empty(&ls->ls_recover_list)) { - de = list_entry(ls->ls_recover_list.next, struct dlm_direntry, - list); - list_del(&de->list); - kfree(de); - } - spin_unlock(&ls->ls_recover_list_lock); -} - /* * We use the upper 16 bits of the hash value to select the directory node. * Low bits are used for distribution of rsb's among hash buckets on each node. @@ -78,144 +34,53 @@ void dlm_clear_free_entries(struct dlm_ls *ls) int dlm_hash2nodeid(struct dlm_ls *ls, uint32_t hash) { - struct list_head *tmp; - struct dlm_member *memb = NULL; - uint32_t node, n = 0; - int nodeid; - - if (ls->ls_num_nodes == 1) { - nodeid = dlm_our_nodeid(); - goto out; - } + uint32_t node; - if (ls->ls_node_array) { + if (ls->ls_num_nodes == 1) + return dlm_our_nodeid(); + else { node = (hash >> 16) % ls->ls_total_weight; - nodeid = ls->ls_node_array[node]; - goto out; - } - - /* make_member_array() failed to kmalloc ls_node_array... */ - - node = (hash >> 16) % ls->ls_num_nodes; - - list_for_each(tmp, &ls->ls_nodes) { - if (n++ != node) - continue; - memb = list_entry(tmp, struct dlm_member, list); - break; + return ls->ls_node_array[node]; } - - DLM_ASSERT(memb , printk("num_nodes=%u n=%u node=%u\n", - ls->ls_num_nodes, n, node);); - nodeid = memb->nodeid; - out: - return nodeid; } int dlm_dir_nodeid(struct dlm_rsb *r) { - return dlm_hash2nodeid(r->res_ls, r->res_hash); -} - -static inline uint32_t dir_hash(struct dlm_ls *ls, char *name, int len) -{ - uint32_t val; - - val = jhash(name, len, 0); - val &= (ls->ls_dirtbl_size - 1); - - return val; -} - -static void add_entry_to_hash(struct dlm_ls *ls, struct dlm_direntry *de) -{ - uint32_t bucket; - - bucket = dir_hash(ls, de->name, de->length); - list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); + return r->res_dir_nodeid; } -static struct dlm_direntry *search_bucket(struct dlm_ls *ls, char *name, - int namelen, uint32_t bucket) +void dlm_recover_dir_nodeid(struct dlm_ls *ls) { - struct dlm_direntry *de; - - list_for_each_entry(de, &ls->ls_dirtbl[bucket].list, list) { - if (de->length == namelen && !memcmp(name, de->name, namelen)) - goto out; - } - de = NULL; - out: - return de; -} - -void dlm_dir_remove_entry(struct dlm_ls *ls, int nodeid, char *name, int namelen) -{ - struct dlm_direntry *de; - uint32_t bucket; - - bucket = dir_hash(ls, name, namelen); - - spin_lock(&ls->ls_dirtbl[bucket].lock); - - de = search_bucket(ls, name, namelen, bucket); - - if (!de) { - log_error(ls, "remove fr %u none", nodeid); - goto out; - } - - if (de->master_nodeid != nodeid) { - log_error(ls, "remove fr %u ID %u", nodeid, de->master_nodeid); - goto out; - } - - list_del(&de->list); - kfree(de); - out: - spin_unlock(&ls->ls_dirtbl[bucket].lock); -} + struct dlm_rsb *r; -void dlm_dir_clear(struct dlm_ls *ls) -{ - struct list_head *head; - struct dlm_direntry *de; - int i; - - DLM_ASSERT(list_empty(&ls->ls_recover_list), ); - - for (i = 0; i < ls->ls_dirtbl_size; i++) { - spin_lock(&ls->ls_dirtbl[i].lock); - head = &ls->ls_dirtbl[i].list; - while (!list_empty(head)) { - de = list_entry(head->next, struct dlm_direntry, list); - list_del(&de->list); - put_free_de(ls, de); - } - spin_unlock(&ls->ls_dirtbl[i].lock); + down_read(&ls->ls_root_sem); + list_for_each_entry(r, &ls->ls_root_list, res_root_list) { + r->res_dir_nodeid = dlm_hash2nodeid(ls, r->res_hash); } + up_read(&ls->ls_root_sem); } int dlm_recover_directory(struct dlm_ls *ls) { struct dlm_member *memb; - struct dlm_direntry *de; char *b, *last_name = NULL; - int error = -ENOMEM, last_len, count = 0; + int error = -ENOMEM, last_len, nodeid, result; uint16_t namelen; + unsigned int count = 0, count_match = 0, count_bad = 0, count_add = 0; log_debug(ls, "dlm_recover_directory"); if (dlm_no_directory(ls)) goto out_status; - dlm_dir_clear(ls); - last_name = kmalloc(DLM_RESNAME_MAXLEN, GFP_NOFS); if (!last_name) goto out; list_for_each_entry(memb, &ls->ls_nodes, list) { + if (memb->nodeid == dlm_our_nodeid()) + continue; + memset(last_name, 0, DLM_RESNAME_MAXLEN); last_len = 0; @@ -230,7 +95,7 @@ int dlm_recover_directory(struct dlm_ls *ls) if (error) goto out_free; - schedule(); + cond_resched(); /* * pick namelen/name pairs out of received buffer @@ -267,87 +132,71 @@ int dlm_recover_directory(struct dlm_ls *ls) if (namelen > DLM_RESNAME_MAXLEN) goto out_free; - error = -ENOMEM; - de = get_free_de(ls, namelen); - if (!de) + error = dlm_master_lookup(ls, memb->nodeid, + b, namelen, + DLM_LU_RECOVER_DIR, + &nodeid, &result); + if (error) { + log_error(ls, "recover_dir lookup %d", + error); goto out_free; + } + + /* The name was found in rsbtbl, but the + * master nodeid is different from + * memb->nodeid which says it is the master. + * This should not happen. */ + + if (result == DLM_LU_MATCH && + nodeid != memb->nodeid) { + count_bad++; + log_error(ls, "recover_dir lookup %d " + "nodeid %d memb %d bad %u", + result, nodeid, memb->nodeid, + count_bad); + print_hex_dump_bytes("dlm_recover_dir ", + DUMP_PREFIX_NONE, + b, namelen); + } + + /* The name was found in rsbtbl, and the + * master nodeid matches memb->nodeid. */ + + if (result == DLM_LU_MATCH && + nodeid == memb->nodeid) { + count_match++; + } + + /* The name was not found in rsbtbl and was + * added with memb->nodeid as the master. */ + + if (result == DLM_LU_ADD) { + count_add++; + } - de->master_nodeid = memb->nodeid; - de->length = namelen; last_len = namelen; - memcpy(de->name, b, namelen); memcpy(last_name, b, namelen); b += namelen; left -= namelen; - - add_entry_to_hash(ls, de); count++; } } - done: + done: ; } out_status: error = 0; - log_debug(ls, "dlm_recover_directory %d entries", count); + dlm_set_recover_status(ls, DLM_RS_DIR); + + log_debug(ls, "dlm_recover_directory %u in %u new", + count, count_add); out_free: kfree(last_name); out: - dlm_clear_free_entries(ls); return error; } -static int get_entry(struct dlm_ls *ls, int nodeid, char *name, - int namelen, int *r_nodeid) -{ - struct dlm_direntry *de, *tmp; - uint32_t bucket; - - bucket = dir_hash(ls, name, namelen); - - spin_lock(&ls->ls_dirtbl[bucket].lock); - de = search_bucket(ls, name, namelen, bucket); - if (de) { - *r_nodeid = de->master_nodeid; - spin_unlock(&ls->ls_dirtbl[bucket].lock); - if (*r_nodeid == nodeid) - return -EEXIST; - return 0; - } - - spin_unlock(&ls->ls_dirtbl[bucket].lock); - - if (namelen > DLM_RESNAME_MAXLEN) - return -EINVAL; - - de = kzalloc(sizeof(struct dlm_direntry) + namelen, GFP_NOFS); - if (!de) - return -ENOMEM; - - de->master_nodeid = nodeid; - de->length = namelen; - memcpy(de->name, name, namelen); - - spin_lock(&ls->ls_dirtbl[bucket].lock); - tmp = search_bucket(ls, name, namelen, bucket); - if (tmp) { - kfree(de); - de = tmp; - } else { - list_add_tail(&de->list, &ls->ls_dirtbl[bucket].list); - } - *r_nodeid = de->master_nodeid; - spin_unlock(&ls->ls_dirtbl[bucket].lock); - return 0; -} - -int dlm_dir_lookup(struct dlm_ls *ls, int nodeid, char *name, int namelen, - int *r_nodeid) -{ - return get_entry(ls, nodeid, name, namelen, r_nodeid); -} - static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) { struct dlm_rsb *r; @@ -358,10 +207,10 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) bucket = hash & (ls->ls_rsbtbl_size - 1); spin_lock(&ls->ls_rsbtbl[bucket].lock); - rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, 0, &r); + rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r); if (rv) rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss, - name, len, 0, &r); + name, len, &r); spin_unlock(&ls->ls_rsbtbl[bucket].lock); if (!rv) @@ -371,7 +220,7 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, char *name, int len) list_for_each_entry(r, &ls->ls_root_list, res_root_list) { if (len == r->res_length && !memcmp(name, r->res_name, len)) { up_read(&ls->ls_root_sem); - log_error(ls, "find_rsb_root revert to root_list %s", + log_debug(ls, "find_rsb_root revert to root_list %s", r->res_name); return r; } @@ -429,6 +278,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, be_namelen = cpu_to_be16(0); memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); offset += sizeof(__be16); + ls->ls_recover_dir_sent_msg++; goto out; } @@ -437,6 +287,7 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, offset += sizeof(__be16); memcpy(outbuf + offset, r->res_name, r->res_length); offset += r->res_length; + ls->ls_recover_dir_sent_res++; } /* @@ -449,8 +300,8 @@ void dlm_copy_master_names(struct dlm_ls *ls, char *inbuf, int inlen, be_namelen = cpu_to_be16(0xFFFF); memcpy(outbuf + offset, &be_namelen, sizeof(__be16)); offset += sizeof(__be16); + ls->ls_recover_dir_sent_msg++; } - out: up_read(&ls->ls_root_sem); } -- cgit v1.2.3