暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

PostgreSQL行级安全策略探究

前言

最近和朋友讨论oracle行级安全策略(VPD)时,查看了下官方文档,看起来VPD的原理是针对应用了Oracle行级安全策略的表、视图或同义词发出的 SQL 语句动态添加where子句。通俗理解就是将行级安全策略动态添加为where 条件。那么PG中的行级安全策略是怎么处理的呢?

行级安全简介

行级安全策略(Row Level Security)是更细粒度的数据安全控制策略。行级策略可以根据每个用户限制哪些行可以通过常规查询返回,哪些行可以通过数据修改命令插入、更新或删除。默认情况下,表没有任何行级安全策略,因此如果用户根据 SQL 权限系统具有表的访问权限,则其中的所有行都可以平等地用于查询或更新。

在PG中我们可以创建行级策略,在SQL执行时行级策略表达式将作为查询的一部分运行。
https://www.postgresql.org/docs/16/ddl-rowsecurity.html

行级安全演示

创建3个用户

  1. postgres=# create user admin;

  2. CREATE ROLE

  3. postgres=# create user peter;

  4. CREATE ROLE

  5. postgres=# create user bob;

  6. CREATE ROLE

创建一个rlsdb数据库

  1. postgres=# create database rlsdb owner admin;

  2. CREATE DATABASE

在rlsdb中使用admin用户创建表employee,并插入3个用户对应的数据

  1. postgres=# \c rlsdb admin

  2. You are now connected to database "rlsdb" as user "admin".

  3. rlsdb=> create table employee ( empno int, ename text, address text, salary int, account_number text );

  4. CREATE TABLE

  5. rlsdb=> insert into employee values (1, 'admin', '2 down str', 80000, 'no0001' );

  6. INSERT 0 1

  7. rlsdb=> insert into employee values (2, 'peter', '132 south avn', 60000, 'no0002' );

  8. INSERT 0 1

  9. rlsdb=> insert into employee values (3, 'bob', 'Down st 17th', 60000, 'no0003' );

  10. INSERT 0 1

  11. rlsdb=>

授权后,三个用户都能看到employee表的所有数据

  1. rlsdb=> grant select on table employee to peter;

  2. GRANT

  3. rlsdb=> grant select on table employee to bob;

  4. GRANT

  5. rlsdb=> select * from employee;

  6. empno | ename | address | salary | account_number

  7. -------+-------+---------------+--------+----------------

  8. 1 | admin | 2 down str | 80000 | no0001

  9. 2 | peter | 132 south avn | 60000 | no0002

  10. 3 | bob | Down st 17th | 60000 | no0003

  11. (3 rows)


  12. rlsdb=>

  13. rlsdb=> \c rlsdb peter

  14. You are now connected to database "rlsdb" as user "peter".

  15. rlsdb=> select * from employee;

  16. empno | ename | address | salary | account_number

  17. -------+-------+---------------+--------+----------------

  18. 1 | admin | 2 down str | 80000 | no0001

  19. 2 | peter | 132 south avn | 60000 | no0002

  20. 3 | bob | Down st 17th | 60000 | no0003

  21. (3 rows)


  22. rlsdb=> \c rlsdb bob

  23. You are now connected to database "rlsdb" as user "bob".

  24. rlsdb=> select * from employee;

  25. empno | ename | address | salary | account_number

  26. -------+-------+---------------+--------+----------------

  27. 1 | admin | 2 down str | 80000 | no0001

  28. 2 | peter | 132 south avn | 60000 | no0002

  29. 3 | bob | Down st 17th | 60000 | no0003

  30. (3 rows)

使用admin用户创建行级安全策略,对于peter和bob就只能看到自己的数据了。

  1. rlsdb=> \c rlsdb admin

  2. You are now connected to database "rlsdb" as user "admin".

  3. rlsdb=> CREATE POLICY emp_rls_policy ON employee FOR ALL TO PUBLIC USING (ename=current_user);

  4. CREATE POLICY

  5. rlsdb=> ALTER TABLE employee ENABLE ROW LEVEL SECURITY;

  6. ALTER TABLE

  7. rlsdb=> \c rlsdb peter

  8. You are now connected to database "rlsdb" as user "peter".

  9. rlsdb=> select * from employee;

  10. empno | ename | address | salary | account_number

  11. -------+-------+---------------+--------+----------------

  12. 2 | peter | 132 south avn | 60000 | no0002

  13. (1 row)


  14. rlsdb=> \c rlsdb bob

  15. You are now connected to database "rlsdb" as user "bob".

  16. rlsdb=> select * from employee;

  17. empno | ename | address | salary | account_number

  18. -------+-------+--------------+--------+----------------

  19. 3 | bob | Down st 17th | 60000 | no0003

  20. (1 row)


  21. rlsdb=>

行级安全原理

先看下行级安全策略在数据库中的呈现是什么样的。
查看pg_policy表,可以看到我们创建的emp_rls_policy这个策略,具体的策略polqual是一串字符,熟悉parsetree结构的朋友能关注到这是一个OPEXPR node。我们常见的where 条件也是类似的结构。

我们可以使用函数让polqual以更适合人阅读的方式来展示。

创建策略时,其实是将策略转换为where子句存到pg_policy表中。

  1. ObjectAddress

  2. CreatePolicy(CreatePolicyStmt *stmt)

  3. {

  4. /*省略部分代码行*/

  5. /*将策略转化为where子句*/

  6. qual = transformWhereClause(qual_pstate,

  7. stmt->qual,

  8. EXPR_KIND_POLICY,

  9. "POLICY");

  10. with_check_qual = transformWhereClause(with_check_pstate,

  11. stmt->with_check,

  12. EXPR_KIND_POLICY,

  13. "POLICY");

  14. /* Fix up collation information */

  15. assign_expr_collations(qual_pstate, qual);

  16. assign_expr_collations(with_check_pstate, with_check_qual);

  17. /* 将转换后的子句写入pg_policy*/

  18. /* Open pg_policy catalog */

  19. pg_policy_rel = table_open(PolicyRelationId, RowExclusiveLock);

  20. /* Set key - policy's relation id. */

  21. ScanKeyInit(&skey[0],

  22. Anum_pg_policy_polrelid,

  23. BTEqualStrategyNumber, F_OIDEQ,

  24. ObjectIdGetDatum(table_id));

  25. /* Set key - policy's name. */

  26. ScanKeyInit(&skey[1],

  27. Anum_pg_policy_polname,

  28. BTEqualStrategyNumber, F_NAMEEQ,

  29. CStringGetDatum(stmt->policy_name));

  30. sscan = systable_beginscan(pg_policy_rel,

  31. PolicyPolrelidPolnameIndexId, true, NULL, 2,

  32. skey);

  33. policy_tuple = systable_getnext(sscan);

  34. /* Complain if the policy name already exists for the table */

  35. if (HeapTupleIsValid(policy_tuple))

  36. ereport(ERROR,

  37. (errcode(ERRCODE_DUPLICATE_OBJECT),

  38. errmsg("policy \"%s\" for table \"%s\" already exists",

  39. stmt->policy_name, RelationGetRelationName(target_table))));

  40. policy_id = GetNewOidWithIndex(pg_policy_rel, PolicyOidIndexId,

  41. Anum_pg_policy_oid);

  42. values[Anum_pg_policy_oid - 1] = ObjectIdGetDatum(policy_id);

  43. values[Anum_pg_policy_polrelid - 1] = ObjectIdGetDatum(table_id);

  44. values[Anum_pg_policy_polname - 1] = DirectFunctionCall1(namein,

  45. CStringGetDatum(stmt->policy_name));

  46. values[Anum_pg_policy_polcmd - 1] = CharGetDatum(polcmd);

  47. values[Anum_pg_policy_polpermissive - 1] = BoolGetDatum(stmt->permissive);

  48. values[Anum_pg_policy_polroles - 1] = PointerGetDatum(role_ids);

  49. /* Add qual if present. */

  50. if (qual)

  51. values[Anum_pg_policy_polqual - 1] = CStringGetTextDatum(nodeToString(qual));

  52. else

  53. isnull[Anum_pg_policy_polqual - 1] = true;

  54. /* Add WITH CHECK qual if present */

  55. if (with_check_qual)

  56. values[Anum_pg_policy_polwithcheck - 1] = CStringGetTextDatum(nodeToString(with_check_qual));

  57. else

  58. isnull[Anum_pg_policy_polwithcheck - 1] = true;

  59. policy_tuple = heap_form_tuple(RelationGetDescr(pg_policy_rel), values,

  60. isnull);

  61. CatalogTupleInsert(pg_policy_rel, policy_tuple);

  62. /* Record Dependencies */

  63. target.classId = RelationRelationId;

  64. target.objectId = table_id;

  65. target.objectSubId = 0;

  66. myself.classId = PolicyRelationId;

  67. myself.objectId = policy_id;

  68. myself.objectSubId = 0;

  69. recordDependencyOn(&myself, &target, DEPENDENCY_AUTO);

  70. recordDependencyOnExpr(&myself, qual, qual_pstate->p_rtable,

  71. DEPENDENCY_NORMAL);

  72. recordDependencyOnExpr(&myself, with_check_qual,

  73. with_check_pstate->p_rtable, DEPENDENCY_NORMAL);

  74. /* Register role dependencies */

  75. target.classId = AuthIdRelationId;

  76. target.objectSubId = 0;

  77. for (i = 0; i < nitems; i++)

  78. {

  79. target.objectId = DatumGetObjectId(role_oids[i]);

  80. /* no dependency if public */

  81. if (target.objectId != ACL_ID_PUBLIC)

  82. recordSharedDependencyOn(&myself, &target,

  83. SHARED_DEPENDENCY_POLICY);

  84. }

  85. InvokeObjectPostCreateHook(PolicyRelationId, policy_id, 0);

  86. /* Invalidate Relation Cache */

  87. CacheInvalidateRelcache(target_table);

  88. /* Clean up. */

  89. heap_freetuple(policy_tuple);

  90. free_parsestate(qual_pstate);

  91. free_parsestate(with_check_pstate);

  92. systable_endscan(sscan);

  93. relation_close(target_table, NoLock);

  94. table_close(pg_policy_rel, RowExclusiveLock);

  95. return myself;

  96. }

在SQL执行时,查询重写阶段会将对应的安全策略拼接到parsetree里,最后生成执行计划去执行。
从执行计划来看SQL没有where条件,但是执行计划中存在 Filter: (ename = CURRENT_USER),证明了这个过程。

  1. rlsdb=> explain analyze select * from employee ;

  2. QUERY PLAN

  3. -----------------------------------------------------------------------------------------------------

  4. Seq Scan on employee (cost=0.00..19.15 rows=3 width=104) (actual time=0.010..0.012 rows=1 loops=1)

  5. Filter: (ename = CURRENT_USER)

  6. Rows Removed by Filter: 2

  7. Planning Time: 0.416 ms

  8. Execution Time: 0.036 ms

  9. (5 rows)


  10. rlsdb=>

再debug验证下这个过程。
给fireRIRrules函数设置断点,进入断点后从stack可以看到目前是在QueryRewrite阶段,结合一些规则进行查询重写。

观察这个时候的parsetree,可以看到还没有将安全策略对应的OPEXPR拼接进来。

等执行到get_row_security_policies函数已获取到表对应安全策略securityQuals。
打印securityQuals可以看到和我们查询pg_policy中的OPEXPR是一致的。

接着将securityQuals加入到rte的list中,这样我们再去打印parsetree就可以看到安全策略securityQuals对应的OPEXPR已经被拼接进来。

然后就是去生成执行计划并执行。

小结

PG的RLS也是将对应的策略动态转换为where子句,在查询重写阶段将安全策略拼接到parsetree,生成执行计划去执行。

行级安全策略,可以提供更精细粒度的表数据权限管理,在一定的场景下,比如只让用户看到自己对应的数据,能做到更安全的权限把控。



文章转载自开源软件联盟PostgreSQL分会,如果涉嫌侵权,请发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论