暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

openGauss数据库源码解析系列文章——安全管理源码解析(二)

openGauss 2023-07-31
139

在上篇openGauss数据库源码解析系列文章——安全管理源码解析(一)我们围绕安全管理整体架构和代码概览、安全认证原理介绍和代码解析进行了简单介绍。本篇将继续角色管理、对象权限管理的学习,全文阅读需要35分钟,欢迎收藏阅读。

三、角色管理

角色是拥有数据库对象和权限的实体,在不同的环境中角色可以认为是一个用户、一个组或者兼顾两者。角色管理包含了角色的创建、修改、删除、权限授予和回收操作。

3.1  角色创建

如果在openGauss上需要创建一个角色,可以使用SQL命令CREATE ROLE,其语法为:

    CREATE ROLE role_name [ [ WITH ] option [ ... ] ] [ ENCRYPTED | UNENCRYPTED ] { PASSWORD | IDENTIFIED BY } { 'password' | DISABLE };

    创建角色是通过函数CreateRole实现的,其函数接口为:

      void CreateRole(CreateRoleStmt* stmt)

      其中,CreateRoleStmt为创建角色时所需的数据结构,具体数据结构代码如下:

        typedef struct CreateRoleStmt {
           NodeTag type;
           RoleStmtType stmt_type;  /* 将要创建的角色类型 ROLE/USER/GROUP  */
           char* role;              /* 角色名 */
           List* options;            /* 角色属性列表 */
        } CreateRoleStmt;

        字段stmt_type是枚举类型,相关代码如下:

          typedef enum RoleStmtType {
          ROLESTMT_ROLE,    /* 代表创建角色 */
          ROLESTMT_USER,    /* 代表创建用户 */
          ROLESTMT_GROUP,  /* 代表创建组用户 */
          } RoleStmtType;

          字段option用来存储角色的属性信息,具体的数据结构为:

            typedef struct DefElem {
               NodeTag type;
               char* defnamespace;     * 节点对应的命名空间  */
               char* defname;          /* 节点对应的角色属性名  */
               Node* arg;              /* 表示值或类型名  */
               DefElemAction defaction;  /* SET/ADD/DROP 等其他未指定的行为  */
            } DefElem;

            在上述的关键数据结构基础之上,完整的创建角色流程如图14所示。

            图14  openGauss角色创建流程

            创建角色时先判断所要创建的角色类型。如果是创建用户,则设置其canlogin属性为true,因为用户默认具有登录权限。而创建角色和创建组时,若角色属性参数没有声明的话,则canlogin属性默认为false。相关代码如下:

              /* 默认值可能因原始语句类型而异 */
              switch (stmt->stmt_type) {
              case ROLESTMT_ROLE:
                     break;
                 case ROLESTMT_USER:
                      canlogin = true;
                      break;
                  case ROLESTMT_GROUP:
                      break;
                  default:
                      break;
              }

              检查完所要创建的角色类型以后,开始循环获取角色属性options中的内容,并将其转换成对应的角色属性值类型。相关代码如下:

                /* 从node tree中获取option */
                foreach (option, stmt->options) {
                   DefElem* defel = (DefElem*)lfirst(option);


                   if (strcmp(defel->defname, "password") == 0 || strcmp(defel->defname, "encryptedPassword") == 0 ||
                       strcmp(defel->defname, "unencryptedPassword") == 0) {
                       if (dpassword != NULL) {
                           clean_role_password(dpassword);
                           ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
                       }
                       dpassword = defel;
                       if (strcmp(defel->defname, "encryptedPassword") == 0)
                           encrypt_password = true;
                       else if (strcmp(defel->defname, "unencryptedPassword") == 0) {
                           clean_role_password(dpassword);
                           ereport(ERROR,
                               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                   errmsg("Permission denied to create role with option UNENCRYPTED.")));
                       }
                   } else if (strcmp(defel->defname, "sysid") == 0) {
                       ereport(NOTICE, (errmsg("SYSID can no longer be specified")));
                   } else if (strcmp(defel->defname, "inherit") == 0) {
                       if (dinherit != NULL) {
                           clean_role_password(dpassword);
                           ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
                       }
                       dinherit = defel;
                   } else if (strcmp(defel->defname, "createrole") == 0) {
                       if (dcreaterole != NULL) {
                           clean_role_password(dpassword);
                           ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
                       }
                       dcreaterole = defel;
                   } else if (strcmp(defel->defname, "createdb") == 0) {
                       if (dcreatedb != NULL) {
                           clean_role_password(dpassword);
                           ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
                       }
                       dcreatedb = defel;
                   } else if (strcmp(defel->defname, "useft") == 0) {
                       if (duseft != NULL) {
                           clean_role_password(dpassword);
                           ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
                       }
                       duseft = defel;
                ……

                根据对应的参数信息转换需要的角色属性值类型,如提取issuper值和createrole值等。相关代码如下:

                  if (dissuper != NULL)
                         issuper = intVal(dissuper->arg) != 0;
                     if (dinherit != NULL)
                         inherit = intVal(dinherit->arg) != 0;
                     if (dcreaterole != NULL)
                         createrole = intVal(dcreaterole->arg) != 0;
                     if (dcreatedb != NULL)
                         createdb = intVal(dcreatedb->arg) != 0;
                    ……

                  在完成了转换以后,将角色属性值以及角色的信息一起构建一个pg_authid的元组,再写回系统表并更新索引。作相关代码如下:

                    /* 检查pg_authid relation,确认该角色没有存在*/
                    Relation pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock);
                       TupleDesc pg_authid_dsc = RelationGetDescr(pg_authid_rel);


                       if (OidIsValid(get_role_oid(stmt->role, true))) {
                           str_reset(password);
                           ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("role \"%s\" already exists", stmt->role)));
                    }
                    ……
                       /* 创建一个插入的tuple */
                       errno_t errorno = memset_s(new_record, sizeof(new_record), 0, sizeof(new_record));
                       securec_check(errorno, "\0", "\0");
                       errorno = memset_s(new_record_nulls, sizeof(new_record_nulls), false, sizeof(new_record_nulls));
                       securec_check(errorno, "\0", "\0");


                       new_record[Anum_pg_authid_rolname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->role));


                       new_record[Anum_pg_authid_rolsuper - 1] = BoolGetDatum(issuper);
                       new_record[Anum_pg_authid_rolinherit - 1] = BoolGetDatum(inherit);
                       new_record[Anum_pg_authid_rolcreaterole - 1] = BoolGetDatum(createrole);
                       new_record[Anum_pg_authid_rolcreatedb - 1] = BoolGetDatum(createdb);
                       
                       new_record[Anum_pg_authid_rolcatupdate - 1] = BoolGetDatum(issuper);
                       new_record[Anum_pg_authid_rolcanlogin - 1] = BoolGetDatum(canlogin);
                       new_record[Anum_pg_authid_rolreplication - 1] = BoolGetDatum(isreplication);
                       new_record[Anum_pg_authid_rolauditadmin - 1] = BoolGetDatum(isauditadmin);
                       new_record[Anum_pg_authid_rolsystemadmin - 1] = BoolGetDatum(issystemadmin);
                    new_record[Anum_pg_authid_rolconnlimit - 1] = Int32GetDatum(connlimit);
                    ……
                       HeapTuple tuple = heap_form_tuple(pg_authid_dsc, new_record, new_record_nulls);


                       if (u_sess->proc_cxt.IsBinaryUpgrade && OidIsValid(u_sess->upg_cxt.binary_upgrade_next_pg_authid_oid)) {
                           HeapTupleSetOid(tuple, u_sess->upg_cxt.binary_upgrade_next_pg_authid_oid);
                           u_sess->upg_cxt.binary_upgrade_next_pg_authid_oid = InvalidOid;
                       }


                       roleid = simple_heap_insert(pg_authid_rel, tuple);


                       if (IsUnderPostmaster) {
                           if (OidIsValid(rpoid) && (rpoid != DEFAULT_POOL_OID))
                               recordDependencyOnRespool(AuthIdRelationId, roleid, rpoid);


                           u_sess->wlm_cxt->wlmcatalog_update_user = true;
                    }
                    ……

                    完成更新以后,将新创建的角色加入指定存在的父角色中。相关代码如下:

                      /* 将新角色添加到指定的现有角色中 */
                         foreach (item, addroleto) {
                             char* oldrolename = strVal(lfirst(item));
                             Oid oldroleid = get_role_oid(oldrolename, false);


                             AddRoleMems(
                                 oldrolename, oldroleid, list_make1(makeString(stmt->role)), list_make1_oid(roleid), GetUserId(), false);
                         }


                         AddRoleMems(stmt->role, roleid, adminmembers, roleNamesToIds(adminmembers), GetUserId(), true);
                         AddRoleMems(stmt->role, roleid, rolemembers, roleNamesToIds(rolemembers), GetUserId(), false);

                      至此就完成了整个角色创建的过程。

                      3.2  角色管理

                      1. 修改角色属性

                      修改一个数据库角色可以使用SQL命令ALTER ROLE。角色属性的修改是通过调用AlterRole函数来实现的,该函数只有一个类型为AlterRoleStmt结构的参数。相关代码如下:

                        typedef struct AlterRoleStmt {
                           NodeTag  type;
                           char*  role; * 角色的名称 */
                           List*  options; * 需要修改的属性列表 */
                           int  action;  /* +1增加成员关系, -1删除成员关系 */
                           RoleLockType  lockstatus; * 角色锁定状态 */
                        } AlterRoleStmt;

                        修改角色的流程如图15所示。

                        图15  openGauss角色管理流程图

                        调用函数AlterRole修改用户角色属性时,首先循环判断options,依次提取要修改的角色属性;然后查看系统表pg_authid判断是否已存在该角色,如果不存在则提示报错;再进行相应的权限判断,检查执行者是否有权限去更改该角色的属性;最后构建一个新的元组,将要更改的属性更新到新元组中,存入系统表pg_authid。同时AlterRole函数也可以用来调整角色的成员关系,结构体中的action字段值设置为1和-1分别表示增加和删除成员关系,该选项将在授予和回收角色章节具体描述。AlterRole函数的具体实现代码如下:

                          void AlterRole(AlterRoleStmt* stmt)
                          {
                             . . .
                             /* 循环提取角色的属性options */
                             foreach (option, stmt->options) {
                                 DefElem* defel = (DefElem*)lfirst(option);


                                 if (strcmp(defel->defname, "password") == 0 || strcmp(defel->defname, "encryptedPassword") == 0 ||
                                     strcmp(defel->defname, "unencryptedPassword") == 0) {
                                     if (dpassword != NULL) {
                                         clean_role_password(dpassword);
                                         ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
                                     }
                                     dpassword = defel;
                                     if (strcmp(defel->defname, "encryptedPassword") == 0)
                                         encrypt_password = true;
                                     else if (strcmp(defel->defname, "unencryptedPassword") == 0) {
                                         clean_role_password(dpassword);
                                         ereport(ERROR,
                                             (errcode(ERRCODE_INVALID_ROLE_SPECIFICATION),
                                                 errmsg("Permission denied to create role with option UNENCRYPTED.")));
                                     }
                                 } else if (strcmp(defel->defname, "createrole") == 0) {
                                     if (dcreaterole != NULL) {
                                         clean_role_password(dpassword);
                                         ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
                                     }
                                     dcreaterole = defel;
                                 } else if (strcmp(defel->defname, "inherit") == 0) {
                                     if (dinherit != NULL) {
                                         clean_role_password(dpassword);
                                         ereport(ERROR, (errcode(ERRCODE_SYNTAX_ERROR), errmsg("conflicting or redundant options")));
                                     }
                                     dinherit = defel;
                                 }
                          . . .
                                 else {
                                     clean_role_password(dpassword);
                                     ereport(ERROR,
                                         (errcode(ERRCODE_UNRECOGNIZED_NODE_TYPE), errmsg("option \"%s\" not recognized", defel->defname)));
                                 }
                             }
                          /* 将提取的属性赋值给对应的变量 */
                             if (dpassword != NULL && dpassword->arg != NULL) {
                                 head = list_head((List*)dpassword->arg);
                                 if (head != NULL) {
                                     pwdargs = (A_Const*)linitial((List*)dpassword->arg);
                                     if (pwdargs != NULL) {
                                         password = strVal(&pwdargs->val);
                                     }
                                     if (lnext(head)) {
                                         pwdargs = (A_Const*)lsecond((List*)dpassword->arg);
                                         if (pwdargs != NULL) {
                                             replPasswd = strVal(&pwdargs->val);
                                         }
                                     }
                                 }
                             }
                             if (dinherit != NULL)
                                 inherit = intVal(dinherit->arg);
                             if (dcreaterole != NULL)
                                 createrole = intVal(dcreaterole->arg);
                          . . .
                             /* 查看要修改的角色是否存在,不存在则提示报错 */
                             Relation pg_authid_rel = heap_open(AuthIdRelationId, RowExclusiveLock);


                             HeapTuple tuple = SearchSysCache1(AUTHNAME, PointerGetDatum(stmt->role));
                             if (!HeapTupleIsValid(tuple)) {
                                 str_reset(password);
                                 str_reset(replPasswd);


                                 if (!have_createrole_privilege())
                                     ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("Permission denied.")));
                                 else
                                     ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("role \"%s\" does not exist", stmt->role)));
                             }
                          roleid = HeapTupleGetOid(tuple);
                          . . .
                          /* 检查是否有权限更改相应角色的属性,权限不足则提示报错 */
                             if (roleid == BOOTSTRAP_SUPERUSERID) {
                                 if (!(issuper < 0 && inherit < 0 && createrole < 0 && createdb < 0 && canlogin < 0 && isreplication < 0 &&
                                         isauditadmin < 0 && issystemadmin < 0 && isvcadmin < 0 && useft < 0 && dconnlimit == NULL &&
                                         rolemembers == NULL && validBegin == NULL && validUntil == NULL && drespool == NULL &&
                                         dparent == NULL && dnode_group == NULL && dspacelimit == NULL && dtmpspacelimit == NULL &&
                                         dspillspacelimit == NULL)) {
                                     str_reset(password);
                                     str_reset(replPasswd);
                                     ereport(ERROR,
                                         (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                             errmsg("Permission denied to change privilege of the initial account.")));
                                 }
                             }
                             if (dpassword != NULL && roleid == BOOTSTRAP_SUPERUSERID && GetUserId() != BOOTSTRAP_SUPERUSERID) {
                                 str_reset(password);
                                 str_reset(replPasswd);
                                 ereport(ERROR,
                                     (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
                                         errmsg("Permission denied to change password of the initial account.")));
                             }
                             . . .
                             } else if (!have_createrole_privilege()) {
                                 if (!(inherit < 0 && createrole < 0 && createdb < 0 && canlogin < 0 && isreplication < 0 && isauditadmin < 0 &&
                                         issystemadmin < 0 && isvcadmin < 0 && useft < 0 && dconnlimit == NULL && rolemembers == NULL &&
                                         validBegin == NULL && validUntil == NULL && !*respool && !OidIsValid(parentid) && dnode_group == NULL &&
                                         !spacelimit && !tmpspacelimit && !spillspacelimit &&
                                         /* if not superuser or have createrole privilege, permission of lock and unlock is denied */
                                         stmt->lockstatus == DO_NOTHING &&
                                         /* if alter password, it will be handled below */
                                         roleid == GetUserId()) ||
                                     (roleid != GetUserId() && dpassword == NULL)) {
                                     str_reset(password);
                                     str_reset(replPasswd);
                                     ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("Permission denied.")));
                                 }
                          }
                          ...
                             /* 将要更改的角色属性值分别更新到新元组中,再将新元组替代旧元组存入系统表pg_authid中 */
                             if (issuper >= 0) {
                                 new_record[Anum_pg_authid_rolsuper - 1] = BoolGetDatum(issuper > 0);
                                 new_record_repl[Anum_pg_authid_rolsuper - 1] = true;


                                 new_record[Anum_pg_authid_rolcatupdate - 1] = BoolGetDatum(issuper > 0);
                                 new_record_repl[Anum_pg_authid_rolcatupdate - 1] = true;
                             }
                             if (inherit >= 0) {
                                 new_record[Anum_pg_authid_rolinherit - 1] = BoolGetDatum(inherit > 0);
                                 new_record_repl[Anum_pg_authid_rolinherit - 1] = true;
                             }
                           . . .
                             HeapTuple new_tuple = heap_modify_tuple(tuple, pg_authid_dsc, new_record, new_record_nulls, new_record_repl);
                             simple_heap_update(pg_authid_rel, &tuple->t_self, new_tuple);


                             CatalogUpdateIndexes(pg_authid_rel, new_tuple);
                            . . .
                          /* 判断成员关系,增加或删除成员 */
                             if (stmt->action == 1)
                                 AddRoleMems(stmt->role, roleid, rolemembers, roleNamesToIds(rolemembers), GetUserId(), false);
                             else if (stmt->action == -1) * drop members FROM role */
                                 DelRoleMems(stmt->role, roleid, rolemembers, roleNamesToIds(rolemembers), false);


                             . . .
                             heap_close(pg_authid_rel, NoLock);
                          }

                          2. 删除角色

                          如果要删除一个角色,可以使用SQL命令DROP ROLE。角色的删除是通过调用DropRole函数来实现的,该函数只有一个类型为DropRoleStmt结构的参数。相关代码如下:

                            typedef struct DropRoleStmt {
                               NodeTagtype;
                               List*roles;               *  要删除的角色列表  */
                               boolmissing_ok;          /*  判断角色是否存在  */
                               boolis_user;             *  要删除的是角色还是用户  */
                               boolinherit_from_parent;  /*  是否继承自父角色*/
                               DropBehavior behavior;            /*  是否级联删除依赖对象  */
                            } DropRoleStmt;

                            删除角色的流程如图16所示。

                            图16  openGauss角色删除流程图

                            角色删除的执行流程为:首先判断当前操作者是否有权限执行该操作,若没有则报错退出;然后检查待删除的角色是否存在,若不存在,则根据missing_ok选择返回ERROR或NOTICE提示信息;再通过扫描系统表pg_authid和pg_auth_members,删除所有涉及待删除角色的元组执行;若behavior取值DROP_CASCADE,则级联删除该角色所拥有的所有数据库对象;最后删除该角色在系统表pg_auth_history和pg_user_status中对应的信息。具体的实现过程代码如下:

                              void DropRole(DropRoleStmt* stmt)
                              {
                                 . . .
                              /*  检查执行者是否有权限删除角色  */
                                 if (!have_createrole_privilege())
                                     ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("Permission denied to drop role.")));
                              /*  循环处理要删除的角色  */
                                 foreach (item, stmt->roles) {
                              . . .
                              /*  检查要删除的角色是否存在,若不存在则提示报错  */
                                     HeapTuple tuple = SearchSysCache1(AUTHNAME, PointerGetDatum(role));
                                     if (!HeapTupleIsValid(tuple)) {
                                         if (!stmt->missing_ok) {
                                             ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), errmsg("role \"%s\" does not exist", role)));
                                         } else {
                                             ereport(NOTICE, (errmsg("role \"%s\" does not exist, skipping", role)));
                                         }
                                         continue;
                                     }
                                     . . .
                              /*  当前用户不允许删除  */
                                     if (roleid == GetUserId())
                                         ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("current user cannot be dropped")));
                                     if (roleid == GetOuterUserId())
                                         ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("current user cannot be dropped")));
                                     if (roleid == GetSessionUserId())
                                         ereport(ERROR, (errcode(ERRCODE_OBJECT_IN_USE), errmsg("session user cannot be dropped")));
                                     /*  校验执行者和被删除角色的权限,如系统管理员才有权限删除其他系统管理员  */
                                     if((((Form_pg_authid)GETSTRUCT(tuple))->rolsuper|| ((Form_pg_authid)GETSTRUCT(tuple))->rolsystemadmin) &&
                                         !isRelSuperuser())
                                         ereport(ERROR,(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("Permission denied.")));
                                     if ((((Form_pg_authid)GETSTRUCT(tuple))->rolauditadmin) &&
                                         g_instance.attr.attr_security.enablePrivilegesSeparate && !isRelSuperuser())
                                         ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("Permission denied.")));
                                     . . .
                                 /*  针对CASCADE(级联)的情况,删除该角色拥有的对象  */
                                     if (stmt->behavior == DROP_CASCADE) {
                                         char* user = NULL;
                                         CancelQuery(role);
                                         user = (char*)palloc(sizeof(char) * strlen(role) + 1);
                                         errno_t errorno = strncpy_s(user, strlen(role) + 1, role, strlen(role));
                                         securec_check(errorno, "\0", "\0");
                                         drop_objectstmt.behavior = stmt->behavior;
                                         drop_objectstmt.type = T_DropOwnedStmt;
                                         drop_objectstmt.roles = list_make1(makeString(user));


                                         DropOwnedObjects(&drop_objectstmt);
                                         list_free_deep(drop_objectstmt.roles);
                                     }


                                     /*  检查是否有对象依赖于该角色,若还存在依赖,则提示报错  */
                                     if (checkSharedDependencies(AuthIdRelationId, roleid, &detail, &detail_log))
                                         ereport(ERROR,
                                             (errcode(ERRCODE_DEPENDENT_OBJECTS_STILL_EXIST),
                                                 errmsg("role \"%s\" cannot be dropped because some objects depend on it", role),
                                                 errdetail_internal("%s", detail),
                                                 errdetail_log("%s", detail_log)));
                                     /*  从相关系统表中删除涉及待删除角色的元组  */
                                     simple_heap_delete(pg_authid_rel, &tuple->t_self);
                              . . .
                                     while (HeapTupleIsValid(tmp_tuple = systable_getnext(sscan))) {
                                         simple_heap_delete(pg_auth_members_rel, &tmp_tuple->t_self);
                                     }


                                     systable_endscan(sscan);
                                     DropAuthHistory(roleid);
                                     DropUserStatus(roleid);
                                     DeleteSharedComments(roleid, AuthIdRelationId);
                                     DeleteSharedSecurityLabel(roleid, AuthIdRelationId);
                                     DropSetting(InvalidOid, roleid);
                              . . .
                                 heap_close(pg_auth_members_rel, NoLock);
                                 heap_close(pg_authid_rel, NoLock);
                              }

                              3. 授予和回收角色

                              如果要授予或回收角色的成员关系,可以使用SQL命令“GRANT/REVOKE”。如果声明了“WITH ADMIN OPTION”选项,那么被加入的成员角色还可以将其他角色加入到父角色中。角色的授予或回收通过调用GrantRole函数来实现,该函数只有一个类型为GrantRoleStmt结构的参数。相关代码如下:

                                typedef struct GrantRoleStmt {
                                   NodeTag type;
                                   List* granted_roles;/*  被授予或回收的角色集合  */
                                   List* grantee_roles;/*  从granted_roles中增加或删除的角色集合  */
                                   Bool is_grant;/*  true代表授权,false代表回收  */
                                   Bool admin_opt;/*  是否带有with admin option选项  */
                                   char* grantor;/*  授权者  */
                                   Drop Behaviorbehavior;/*  是否级联回收角色  */
                                } GrantRoleStmt;

                                授予角色时,grantee_roles中的角色将被添加到granted_roles,通过调用函数AddRoleMems实现;回收角色时,将grantee_roles中的角色从granted_roles中删除,通过调用函数DelRoleMems实现。

                                函数AddRoleMems的实现流程如图17所示。

                                图17  openGauss增加用户成员流程图

                                函数AddRoleMems的具体实现代码如下,其中:

                                (1) rolename和roleid分别表示要被加入成员的角色的名称和OID。

                                (2) memberNames和memberIds分别是要添加的角色名称和OID的列表。

                                (3) grantorId表示授权者的OID。

                                (4) admin_opt表示是否带有with admin option选项。

                                  static void AddRoleMems(
                                     const char* rolename, Oid roleid, const List* memberNames, List* memberIds, Oid grantorId, bool admin_opt)
                                  {
                                  . . .
                                     /*  校验执行者的权限  */
                                     if (superuser_arg(roleid)) {
                                         if (!isRelSuperuser())
                                             ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("Permission denied.")));
                                     . . .
                                     }
                                  . . .
                                     if (grantorId != GetUserId() && !superuser())
                                         ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), errmsg("must be system admin to set grantor")));
                                  /*  循环处理要添加的角色  */
                                     pg_authmem_rel = heap_open(AuthMemRelationId, RowExclusiveLock);
                                     pg_authmem_dsc = RelationGetDescr(pg_authmem_rel);


                                     forboth(nameitem, memberNames, iditem, memberIds)
                                     {
                                         /*  针对角色和成员信息创建pg_auth_members元组,再将新元组插入到系统表中  */
                                  . . .
                                         new_record[Anum_pg_auth_members_roleid - 1] = ObjectIdGetDatum(roleid);
                                         new_record[Anum_pg_auth_members_member -1] = ObjectIdGetDatum(memberid);
                                         new_record[Anum_pg_auth_members_grantor -  1] = ObjectIdGetDatum(grantorId);
                                         new_record[Anum_pg_auth_members_admin_option - 1] = BoolGetDatum(admin_opt);


                                         if (HeapTupleIsValid(authmem_tuple)) {
                                             new_record_repl[Anum_pg_auth_members_grantor - 1] = true;
                                             new_record_repl[Anum_pg_auth_members_admin_option - 1] = true;
                                             tuple = heap_modify_tuple(authmem_tuple, pg_authmem_dsc, new_record, new_record_nulls, new_record_repl);
                                             simple_heap_update(pg_authmem_rel, &tuple->t_self, tuple);
                                             CatalogUpdateIndexes(pg_authmem_rel, tuple);
                                             ReleaseSysCache(authmem_tuple);
                                         } else {
                                             tuple = heap_form_tuple(pg_authmem_dsc, new_record, new_record_nulls);
                                             (void)simple_heap_insert(pg_authmem_rel, tuple);
                                             CatalogUpdateIndexes(pg_authmem_rel, tuple);
                                         }
                                     }
                                  . . .
                                     heap_close(pg_authmem_rel, NoLock);
                                  }

                                  函数DelRoleMems的实现过程类似。首先对执行者的相关权限进行校验,然后循环处理要删除的角色,删除系统表pg_auth_member中相关的元组。

                                  四、对象权限管理

                                  权限管理是安全管理重要的一环,openGauss权限管理基于访问控制列表(access control list,ACL)实现。

                                  4.1  权限管理

                                  1. 访问控制列表

                                  访问控制列表是实现数据库对象权限管理的基础,每个对象都具有ACL,存储该对象的所有授权信息。当用户访问对象时,只有用户在对象的ACL中并且具有所需的权限才能够访问该对象。

                                  每个ACL是由1个或多个AclItem构成的链表,每1个AclItem由授权者、被授权者和权限位3部分构成,记录着可在对象上进行操作的用户及其权限。

                                  数据结构AclItem的代码如下:

                                    typedef struct AclItem {
                                       Oid ai_grantee;     * 被授权者的OID */
                                       Oid ai_grantor;     * 授权者的OID */
                                      AclMode ai_privs;  /* 权限位:32位的比特位 */
                                    } AclItem;

                                    其中ai_privs字段是AclMode类型。AclMode是一个32位的比特位。其高16位为权限选项位,当该比特位取值为1时,表示AclItem中的ai_grantee对应的用户具有此对象的相应操作的授权权限,否则表示用户没有授权权限;低16位为操作权限位,当该比特位取值为1时,表示AclItem中的ai_grantee对应的用户具有此对象的相应操作权限,否则表示用户没有相应的权限。在AclMode的结构位图18中,Grant Option记录各权限位的权限授予或被转授情况。低16位记录各权限的授予情况,当授权语句使用ALL时,则表示对象的所有权限。

                                    图18  openGauss AclMode结构图

                                    openGauss将执行DML类操作和DDL类操作的权限分别记在两个AclMode结构中,并以第15位的值来区分2者,从而实现对于每一个数据库对象,相同的授权者和被授权者对应两个不同的AclMode,分别表示记录DML类操作权限和DDL类操作权限。实现方式如图19和图20所示。

                                    图19  openGauss记录DML类操作权限的AclMode结构

                                    图20  openGauss记录DDL类操作权限的AclMode结构

                                    每个权限参数代表的权限如表4所示。

                                    表4  权限参数

                                    参数

                                    对象权限

                                    参数

                                    对象权限

                                    a

                                    INSERT

                                    T

                                    TEMPORARY

                                    r

                                    SELECT

                                    c

                                    CONNECT

                                    w

                                    UPDATE

                                    p

                                    COMPUTE

                                    d

                                    DELETE

                                    R

                                    READ

                                    D

                                    TRUNCATE

                                    W

                                    WRITE

                                    x

                                    REFERENCES

                                    A

                                    ALTER

                                    t

                                    TRIGGER

                                    P

                                    DROP

                                    X

                                    EXECUTE

                                    m

                                    COMMENT

                                    U

                                    USAGE

                                    i

                                    INDEX

                                    C

                                    CREATE

                                    v

                                    VACUUM

                                    2. 对象权限管理

                                    数据库对象权限管理主要通过使用SQL命令“GRANT/REVOKE”授予或回收一个或多个角色在对象上的权限。“GRANT/REVOKE”命令都由函数ExecuteGrantStmt实现,该函数只有一个GrantStmt类型的参数,基本执行流程如图21所示。

                                    图21  函数ExecuteGrantStmt执行流程

                                    数据结构GrantStmt定义代码如下:

                                      typedef struct GrantStmt {
                                         NodeTag type;
                                         bool is_grant;            /* true = 授权, false = 回收 */
                                         GrantTargetType targtype;  /*  操作目标的类型  */
                                        GrantObjectType objtype;  /*  被操作对象的类型:表、数据库、模式、函数等  */
                                         List* objects;            /*  被操作对象的集合  */
                                        List* privileges;          /*  要操作权限列表  */
                                         List* grantees;           *  被授权者的集合  */
                                         bool grant_option;       *  true = 再授予权限  */
                                         DropBehavior behavior;   *  回收权限的行为  */
                                      } GrantStmt;

                                      函数ExecuteGrantStmt首先将GrantStmt结构转换为InternalGrant结构,并将权限列表转换为内部的AclMode表示形式。当privileges 取值为NIL时,表示授予或回收所有的权限,此时置InternalGrant的all_privs字段为true,privileges字段为ACL_NO_RIGHTS。

                                      数据结构InternalGrant的代码如下:

                                        typedef struct InternalGrant {
                                           bool is_grant;            /*  true=授权, false=回收  */
                                          GrantObjectType objtype;  /*  被操作对象的类型:表、数据库、模式、函数等  */
                                           List* objects;            /*  被操作对象的集合  */
                                           bool all_privs;           /*  是否授予或回收所有的权限  */
                                        AclMode privileges;      /*  AclMode形式表示的DML类操作对应的权限  */
                                        AclMode ddl_privileges;  /*  AclMode形式表示的DDL类操作对应的权限  */
                                        List* col_privs;          /*  对列执行的DML类操作对应的权限  */
                                        List* col_ddl_privs;      /*  对列执行的DDL类操作对应的权限  */
                                           List* grantees;          /*  被授权者的集合  */
                                           bool grant_option;      /*  true=再授予权限  */
                                           DropBehavior behavior; /*  回收权限的行为  */
                                        } InternalGrant;

                                        函数ExecuteGrantStmt在完成结构转换之后,调用函数ExecGrantStmt_oids,根据对象类型分别调用相应对象的权限管理函数。接下来以表对象的权限管理过程为例介绍权限管理的算法。函数ExecGrant_Relation用来处理表对象权限的授予或回收操作,入参为InternalGrant类型的变量,存储着授权或回收操作的操作对象信息、被授权者信息和权限信息。函数ExecGrant_Relation的处理流程如图22所示。

                                        图22  函数ExecGrant_Relation的处理流程

                                        该函数的处理流程为:

                                        (1) 从系统表pg_class中获取旧ACL。如果不存在旧的ACL,则新建一个ACL,并调用函数acldefault将默认的权限信息赋给该ACL。根据对象的不同,初始的缺省权限含有部分可赋予PUBLIC的权限。如果存在旧的ACL,则将旧的ACL存储为一个副本。

                                        (2) 调用select_best_grantor函数来获取授权者对操作对象所拥有的授权权限avail_goptions;将参数avail_goptions传入函数restrict_and_check_grant,结合SQL命令中给出的操作权限,计算出实际需要授予或回收的权限。

                                        (3) 调用merge_acl_with_grant函数生成新的ACL。如果是授予权限,则将要授予的权限添加到旧ACL中;如果是回收权限,则将要被回收的权限从旧ACL中删除。

                                        (4) 将新的ACL更新到系统表pg_class对应元组的ACL字段,完成授权或回收过程。

                                        该函数的相关代码如下:

                                          static void ExecGrant_Relation(InternalGrant* istmt)
                                          {
                                             . . .
                                          /*  循环处理每一个表对象   */
                                             foreach (cell, istmt->objects) {
                                                 . . .
                                          /*  判断所要操作的表对象是否存在,若不存在则提示报错   */
                                                 tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
                                                 if (!HeapTupleIsValid(tuple))
                                                     ereport(
                                                         ERROR, (errcode(ERRCODE_CACHE_LOOKUP_FAILED), errmsg("cache lookup failed for relation %u", relOid)));
                                                 pg_class_tuple = (Form_pg_class)GETSTRUCT(tuple);
                                          . . .
                                                 /*  系统表pg_class中获取旧ACL。若不存在旧的ACL,则新建一个ACL,若存在旧的ACL,则将旧的ACL存储为一个副本   */
                                                 ownerId = pg_class_tuple->relowner;
                                                 aclDatum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_relacl, &isNull);
                                                 if (isNull) {
                                                     switch (pg_class_tuple->relkind) {
                                                         case RELKIND_SEQUENCE:
                                                             old_acl = acldefault(ACL_OBJECT_SEQUENCE, ownerId);
                                                             break;
                                                         default:
                                                             old_acl = acldefault(ACL_OBJECT_RELATION, ownerId);
                                                             break;
                                                     }
                                                     noldmembers = 0;
                                                     oldmembers = NULL;
                                                 } else {
                                                     old_acl = DatumGetAclPCopy(aclDatum);
                                                     noldmembers = aclmembers(old_acl, &oldmembers);
                                                 }
                                                 old_rel_acl = aclcopy(old_acl);


                                                 /*  处理表级别的权限   */
                                                 if (this_privileges != ACL_NO_RIGHTS) {
                                                     AclMode avail_goptions;
                                                     Acl* new_acl = NULL;
                                                     Oid grantorId;
                                                     HeapTuple newtuple = NULL;
                                                     Datum values[Natts_pg_class];
                                                     bool nulls[Natts_pg_class] = {false};
                                                     bool replaces[Natts_pg_class] = {false};
                                                     int nnewmembers;
                                                     Oid* newmembers = NULL;
                                                     AclObjectKind aclkind;


                                                     /*  获取授权者grantorId和授权者对该操作对象所拥有的授权权限avail_goptions   */
                                                     select_best_grantor(GetUserId(), this_privileges, old_acl, ownerId, &grantorId, &avail_goptions);


                                                     switch (pg_class_tuple->relkind) {
                                                         case RELKIND_SEQUENCE:
                                                             aclkind = ACL_KIND_SEQUENCE;
                                                             break;
                                                         default:
                                                             aclkind = ACL_KIND_CLASS;
                                                             break;
                                                     }


                                                     /*  结合参数avail_goptions和SQL命令中给出的操作权限,计算出实际需要授予或回收的权限   */
                                                     this_privileges = restrict_and_check_grant(istmt->is_grant,
                                                         avail_goptions,
                                                         istmt->all_privs,
                                                         this_privileges,
                                                         relOid,
                                                         grantorId,
                                                         aclkind,
                                                         NameStr(pg_class_tuple->relname),
                                                         0,
                                                         NULL);


                                                     /*  生成新的ACL,并更新到系统表pg_class对应元组的ACL字段   */
                                                     new_acl = merge_acl_with_grant(old_acl,
                                                         istmt->is_grant,
                                                         istmt->grant_option,
                                                         istmt->behavior,
                                                         istmt->grantees,
                                                         this_privileges,
                                                         grantorId,
                                                         ownerId);
                                          . . .
                                                     replaces[Anum_pg_class_relacl - 1] = true;
                                                     values[Anum_pg_class_relacl - 1] = PointerGetDatum(new_acl);


                                                     newtuple = heap_modify_tuple(tuple, RelationGetDescr(relation), values, nulls, replaces);


                                                     simple_heap_update(relation, &newtuple->t_self, newtuple);
                                          . . .
                                                 }


                                                 /*  若存在列级授权或回收,则调用ExecGrant_Attribute 函数处理  */
                                          . . .
                                                 if (have_col_privileges) {
                                                     AttrNumber i;


                                                     for (i = 0; i < num_col_privileges; i++) {
                                                         if (col_privileges[i] == ACL_NO_RIGHTS)
                                                             continue;
                                                         ExecGrant_Attribute(istmt,
                                                             relOid,
                                                             NameStr(pg_class_tuple->relname),
                                                             i + FirstLowInvalidHeapAttributeNumber,
                                                             ownerId,
                                                             col_privileges[i],
                                                             attRelation,
                                                             old_rel_acl);
                                                     }
                                                 }
                                             . . .
                                             }


                                             heap_close(attRelation, RowExclusiveLock);
                                             heap_close(relation, RowExclusiveLock);
                                          }

                                          4.2  权限检查

                                          用户在对数据库对象进行访问操作时,数据库会检查用户是否拥有该对象的操作权限。通常数据库对象的所有者和初始用户(superuser)拥有该对象的全部操作权限,其他普通用户需要被授予权限才可以执行相应操作。数据库通过查询数据库对象的访问控制列表检查用户对数据库对象的访问权限,数据库对象的ACL保存在对应的系统表中,当被授予或回收对象权限时,系统表中保存的ACL权限位会被更新。常用的数据库对象权限检查函数、ACL检查函数、ACL所在系统表以及对象所有者检查函数对应关系如表5所示。

                                          表5 数据库对象函数对应关系表。


                                          对象

                                          权限检查

                                          ACL检查

                                          所有者检查

                                          系统表

                                          table

                                          pg_class_aclcheck

                                          pg_class_aclmask

                                          pg_class_ownercheck

                                          pg_class

                                          column

                                          pg_attribute_aclcheck

                                          pg_attribute_aclmask

                                          NA

                                          pg_attribute

                                          database

                                          pg_database_aclcheck

                                          pg_database_aclmask

                                          pg_database_ownercheck

                                          pg_database

                                          function

                                          pg_proc_aclcheck

                                          pg_proc_aclmask

                                          pg_proc_ownercheck

                                          pg_proc

                                          language

                                          pg_language_aclcheck

                                          pg_language_aclmask

                                          pg_language_ownercheck

                                          pg_language

                                          largeobject

                                          pg_largeobject_aclcheck_snapshot

                                          pg_largeobject_aclmask_snapshot

                                          pg_largeobject_ownercheck

                                          pg_largeobject_metadata

                                          namespace

                                          pg_namespace_aclcheck

                                          pg_namespace_aclmask

                                          pg_namespace_ownercheck

                                          pg_namespace

                                          tablespace

                                          pg_tablespace_aclcheck

                                          pg_tablespace_aclmask

                                          pg_tablespace_ownercheck

                                          pg_tablespace

                                          foreign data wrapper

                                          pg_foreign_data_wrapper_aclcheck

                                          pg_foreign_data_wrapper_aclmask

                                          pg_foreign_data_wrapper_ownercheck

                                          pg_foreign_data_wrapper

                                          foreign server

                                          pg_foreign_server_aclcheck

                                          pg_foreign_server_aclmask

                                          pg_foreign_server_ownercheck

                                          pg_foreign_server

                                          type

                                          pg_type_aclcheck

                                          pg_type_aclmask

                                          pg_type_ownercheck

                                          pg_type


                                          下面以表的权限检查为例进行权限检查过程说明。表权限检查函数pg_class_aclcheck的定义代码如下:

                                            AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode, bool check_nodegroup)
                                            {
                                               if (pg_class_aclmask(table_oid, roleid, mode, ACLMASK_ANY, check_nodegroup) != 0)
                                                   return ACLCHECK_OK;
                                               else
                                                   return ACLCHECK_NO_PRIV;
                                            }

                                            pg_class_aclcheck函数有4个入参,其中table_oid用于表示待检查的表,roleid用于表示待检查的用户或角色,mode表示待检查的权限,此权限可以是一种权限也可以是多种权限的组合。第4个参数check_nodegroup用于表示是否检查nodegroup逻辑集群权限,如果调用时不给此参数赋值则默认为true。函数返回值为枚举类型AclResult,如果检查结果有权限返回ACLCHECK_OK,无权限则返回ACLCHECK_NO_PRIV。

                                            pg_class_aclcheck函数通过调用pg_class_aclmask函数实现对象权限检查。pg_class_aclmask函数有5个参数,其中第4个参数how为AclMaskHow枚举类型,包括ACLMASK_ALL和ACLMASK_ANY两种取值;ACLMASK_ALL表示需要满足待检查权限mode中的所有权限,ACLMASK_ANY表示只需满足待检查权限mode中的一种权限即可。pg_class_aclmask函数的其余4个参数table_oid、roleid、mode和check_nodegroup,直接由pg_class_aclcheck函数传入。pg_class_aclmask函数从pg_class系统表中获取ACL权限信息并调用aclmask函数完成权限位校验,通过AclMode数据类型返回权限检查结果。

                                            往期推荐

                                            openGauss数据库源码解析系列文章——安全管理源码解析(一)

                                            文章转载自openGauss,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

                                            评论