暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PostgreSQL行级安全策略探究

原创 NickYoung 2024-09-11
216

前言

最近和朋友讨论oracle行级安全策略(VPD)时,查看了下官方文档,看起来VPD的原理是针对应用了Oracle行级安全策略的表、视图或同义词发出的 SQL 语句动态添加where子句。通俗理解就是将行级安全策略动态添加为where 条件。那么PG中的行级安全策略是怎么处理的呢?

行级安全简介

行级安全策略(Row Level Security)是更细粒度的数据安全控制策略。行级策略可以根据每个用户限制哪些行可以通过常规查询返回,哪些行可以通过数据修改命令插入、更新或删除。默认情况下,表没有任何行级安全策略,因此如果用户根据 SQL 权限系统具有表的访问权限,则其中的所有行都可以平等地用于查询或更新。

在PG中我们可以创建行级策略,在SQL执行时行级策略表达式将作为查询的一部分运行。
https://www.postgresql.org/docs/16/ddl-rowsecurity.html

行级安全演示

创建3个用户

postgres=# create user admin; CREATE ROLE postgres=# create user peter; CREATE ROLE postgres=# create user bob; CREATE ROLE
复制

创建一个rlsdb数据库

postgres=# create database rlsdb owner admin; CREATE DATABASE
复制

在rlsdb中使用admin用户创建表employee,并插入3个用户对应的数据

postgres=# \c rlsdb admin You are now connected to database "rlsdb" as user "admin". rlsdb=> create table employee ( empno int, ename text, address text, salary int, account_number text ); CREATE TABLE rlsdb=> insert into employee values (1, 'admin', '2 down str', 80000, 'no0001' ); INSERT 0 1 rlsdb=> insert into employee values (2, 'peter', '132 south avn', 60000, 'no0002' ); INSERT 0 1 rlsdb=> insert into employee values (3, 'bob', 'Down st 17th', 60000, 'no0003' ); INSERT 0 1 rlsdb=>
复制

授权后,三个用户都能看到employee表的所有数据

rlsdb=> grant select on table employee to peter; GRANT rlsdb=> grant select on table employee to bob; GRANT rlsdb=> select * from employee; empno | ename | address | salary | account_number -------+-------+---------------+--------+---------------- 1 | admin | 2 down str | 80000 | no0001 2 | peter | 132 south avn | 60000 | no0002 3 | bob | Down st 17th | 60000 | no0003 (3 rows) rlsdb=> rlsdb=> \c rlsdb peter You are now connected to database "rlsdb" as user "peter". rlsdb=> select * from employee; empno | ename | address | salary | account_number -------+-------+---------------+--------+---------------- 1 | admin | 2 down str | 80000 | no0001 2 | peter | 132 south avn | 60000 | no0002 3 | bob | Down st 17th | 60000 | no0003 (3 rows) rlsdb=> \c rlsdb bob You are now connected to database "rlsdb" as user "bob". rlsdb=> select * from employee; empno | ename | address | salary | account_number -------+-------+---------------+--------+---------------- 1 | admin | 2 down str | 80000 | no0001 2 | peter | 132 south avn | 60000 | no0002 3 | bob | Down st 17th | 60000 | no0003 (3 rows)
复制

使用admin用户创建行级安全策略,对于peter和bob就只能看到自己的数据了。

rlsdb=> \c rlsdb admin You are now connected to database "rlsdb" as user "admin". rlsdb=> CREATE POLICY emp_rls_policy ON employee FOR ALL TO PUBLIC USING (ename=current_user); CREATE POLICY rlsdb=> ALTER TABLE employee ENABLE ROW LEVEL SECURITY; ALTER TABLE rlsdb=> \c rlsdb peter You are now connected to database "rlsdb" as user "peter". rlsdb=> select * from employee; empno | ename | address | salary | account_number -------+-------+---------------+--------+---------------- 2 | peter | 132 south avn | 60000 | no0002 (1 row) rlsdb=> \c rlsdb bob You are now connected to database "rlsdb" as user "bob". rlsdb=> select * from employee; empno | ename | address | salary | account_number -------+-------+--------------+--------+---------------- 3 | bob | Down st 17th | 60000 | no0003 (1 row) rlsdb=>
复制

行级安全原理

先看下行级安全策略在数据库中的呈现是什么样的。
查看pg_policy表,可以看到我们创建的emp_rls_policy这个策略,具体的策略polqual是一串字符,熟悉parsetree结构的朋友能关注到这是一个OPEXPR node。我们常见的where 条件也是类似的结构。

我们可以使用函数让polqual以更适合人阅读的方式来展示。

创建策略时,其实是将策略转换为where子句存到pg_policy表中。

ObjectAddress CreatePolicy(CreatePolicyStmt *stmt) { /*省略部分代码行*/ /*将策略转化为where子句*/ qual = transformWhereClause(qual_pstate, stmt->qual, EXPR_KIND_POLICY, "POLICY"); with_check_qual = transformWhereClause(with_check_pstate, stmt->with_check, EXPR_KIND_POLICY, "POLICY"); /* Fix up collation information */ assign_expr_collations(qual_pstate, qual); assign_expr_collations(with_check_pstate, with_check_qual); /* 将转换后的子句写入pg_policy*/ /* Open pg_policy catalog */ pg_policy_rel = table_open(PolicyRelationId, RowExclusiveLock); /* Set key - policy's relation id. */ ScanKeyInit(&skey[0], Anum_pg_policy_polrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(table_id)); /* Set key - policy's name. */ ScanKeyInit(&skey[1], Anum_pg_policy_polname, BTEqualStrategyNumber, F_NAMEEQ, CStringGetDatum(stmt->policy_name)); sscan = systable_beginscan(pg_policy_rel, PolicyPolrelidPolnameIndexId, true, NULL, 2, skey); policy_tuple = systable_getnext(sscan); /* Complain if the policy name already exists for the table */ if (HeapTupleIsValid(policy_tuple)) ereport(ERROR, (errcode(ERRCODE_DUPLICATE_OBJECT), errmsg("policy \"%s\" for table \"%s\" already exists", stmt->policy_name, RelationGetRelationName(target_table)))); policy_id = GetNewOidWithIndex(pg_policy_rel, PolicyOidIndexId, Anum_pg_policy_oid); values[Anum_pg_policy_oid - 1] = ObjectIdGetDatum(policy_id); values[Anum_pg_policy_polrelid - 1] = ObjectIdGetDatum(table_id); values[Anum_pg_policy_polname - 1] = DirectFunctionCall1(namein, CStringGetDatum(stmt->policy_name)); values[Anum_pg_policy_polcmd - 1] = CharGetDatum(polcmd); values[Anum_pg_policy_polpermissive - 1] = BoolGetDatum(stmt->permissive); values[Anum_pg_policy_polroles - 1] = PointerGetDatum(role_ids); /* Add qual if present. */ if (qual) values[Anum_pg_policy_polqual - 1] = CStringGetTextDatum(nodeToString(qual)); else isnull[Anum_pg_policy_polqual - 1] = true; /* Add WITH CHECK qual if present */ if (with_check_qual) values[Anum_pg_policy_polwithcheck - 1] = CStringGetTextDatum(nodeToString(with_check_qual)); else isnull[Anum_pg_policy_polwithcheck - 1] = true; policy_tuple = heap_form_tuple(RelationGetDescr(pg_policy_rel), values, isnull); CatalogTupleInsert(pg_policy_rel, policy_tuple); /* Record Dependencies */ target.classId = RelationRelationId; target.objectId = table_id; target.objectSubId = 0; myself.classId = PolicyRelationId; myself.objectId = policy_id; myself.objectSubId = 0; recordDependencyOn(&myself, &target, DEPENDENCY_AUTO); recordDependencyOnExpr(&myself, qual, qual_pstate->p_rtable, DEPENDENCY_NORMAL); recordDependencyOnExpr(&myself, with_check_qual, with_check_pstate->p_rtable, DEPENDENCY_NORMAL); /* Register role dependencies */ target.classId = AuthIdRelationId; target.objectSubId = 0; for (i = 0; i < nitems; i++) { target.objectId = DatumGetObjectId(role_oids[i]); /* no dependency if public */ if (target.objectId != ACL_ID_PUBLIC) recordSharedDependencyOn(&myself, &target, SHARED_DEPENDENCY_POLICY); } InvokeObjectPostCreateHook(PolicyRelationId, policy_id, 0); /* Invalidate Relation Cache */ CacheInvalidateRelcache(target_table); /* Clean up. */ heap_freetuple(policy_tuple); free_parsestate(qual_pstate); free_parsestate(with_check_pstate); systable_endscan(sscan); relation_close(target_table, NoLock); table_close(pg_policy_rel, RowExclusiveLock); return myself; }
复制

在SQL执行时,查询重写阶段会将对应的安全策略拼接到parsetree里,最后生成执行计划去执行。
从执行计划来看SQL没有where条件,但是执行计划中存在 Filter: (ename = CURRENT_USER),证明了这个过程。

rlsdb=> explain analyze select * from employee ; QUERY PLAN ----------------------------------------------------------------------------------------------------- Seq Scan on employee (cost=0.00..19.15 rows=3 width=104) (actual time=0.010..0.012 rows=1 loops=1) Filter: (ename = CURRENT_USER) Rows Removed by Filter: 2 Planning Time: 0.416 ms Execution Time: 0.036 ms (5 rows) rlsdb=>
复制

再debug验证下这个过程。
给fireRIRrules函数设置断点,进入断点后从stack可以看到目前是在QueryRewrite阶段,结合一些规则进行查询重写。

观察这个时候的parsetree,可以看到还没有将安全策略对应的OPEXPR拼接进来。

等执行到get_row_security_policies函数已获取到表对应安全策略securityQuals。
打印securityQuals可以看到和我们查询pg_policy中的OPEXPR是一致的。

接着将securityQuals加入到rte的list中,这样我们再去打印parsetree就可以看到安全策略securityQuals对应的OPEXPR已经被拼接进来。

然后就是去生成执行计划并执行。

小结

PG的RLS也是将对应的策略动态转换为where子句,在查询重写阶段将安全策略拼接到parsetree,生成执行计划去执行。

行级安全策略,可以提供更精细粒度的表数据权限管理,在一定的场景下,比如只让用户看到自己对应的数据,能做到更安全的权限把控。

「喜欢这篇文章,您的关注和赞赏是给作者最好的鼓励」
关注作者
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论

目录
  • 前言
  • 行级安全简介
  • 行级安全演示
  • 行级安全原理
  • 小结