主要是想尝试实现一个rust 的check 方法,所以先研究下golang 的内部实现
- CheckResources
func (cs *CerbosService) CheckResources(ctx context.Context, req *requestv1.CheckResourcesRequest) (*responsev1.CheckResourcesResponse, error) {
log := logging.ReqScopeLog(ctx)
if err := cs.checkNumResourcesLimit(len(req.Resources)); err != nil {
log.Error("Request too large", zap.Error(err))
return nil, err
}
// 数据提取处理,主要是一些其他数据,比较jwt 认证的
auxData, err := cs.auxData.Extract(ctx, req.AuxData)
if err != nil {
log.Error("Failed to extract auxData", zap.Error(err))
return nil, status.Error(codes.InvalidArgument, "invalid auxData")
}
inputs := make([]*enginev1.CheckInput, len(req.Resources))
for i, res := range req.Resources {
if err := cs.checkNumActionsLimit(len(res.Actions)); err != nil {
log.Error("Request too large", zap.Error(err))
return nil, err
}
inputs[i] = &enginev1.CheckInput{
RequestId: req.RequestId,
Actions: res.Actions,
Principal: req.Principal,
Resource: res.Resource,
AuxData: auxData,
}
}
// 基于engine 提供的check 方法,此方法也是我们需要注意的
outputs, err := cs.eng.Check(logging.ToContext(ctx, log), inputs)
if err != nil {
log.Error("Policy check failed", zap.Error(err))
if errors.Is(err, compile.PolicyCompilationErr{}) {
return nil, status.Errorf(codes.FailedPrecondition, "Check failed due to invalid policy")
}
return nil, status.Errorf(codes.Internal, "Policy check failed")
}
result := &responsev1.CheckResourcesResponse{
RequestId: req.RequestId,
Results: make([]*responsev1.CheckResourcesResponse_ResultEntry, len(outputs)),
}
// 对于多个返回的check 结果,需要转换为标准格式的输出
for i, out := range outputs {
resource := inputs[i].Resource
entry := &responsev1.CheckResourcesResponse_ResultEntry{
Resource: &responsev1.CheckResourcesResponse_ResultEntry_Resource{
Id: resource.Id,
Kind: resource.Kind,
PolicyVersion: resource.PolicyVersion,
Scope: resource.Scope,
},
ValidationErrors: out.ValidationErrors,
Actions: make(map[string]effectv1.Effect, len(out.Actions)),
}
if req.IncludeMeta {
entry.Meta = &responsev1.CheckResourcesResponse_ResultEntry_Meta{
EffectiveDerivedRoles: out.EffectiveDerivedRoles,
Actions: make(map[string]*responsev1.CheckResourcesResponse_ResultEntry_Meta_EffectMeta, len(out.Actions)),
}
}
if len(out.Outputs) > 0 {
entry.Outputs = out.Outputs
}
for action, actionEffect := range out.Actions {
entry.Actions[action] = actionEffect.Effect
if req.IncludeMeta {
entry.Meta.Actions[action] = &responsev1.CheckResourcesResponse_ResultEntry_Meta_EffectMeta{
MatchedPolicy: actionEffect.Policy,
MatchedScope: actionEffect.Scope,
}
}
}
result.Results[i] = entry
}
return result, nil
}
- engine check 处理
func (engine *Engine) Check(ctx context.Context, inputs []*enginev1.CheckInput, opts ...CheckOpt) ([]*enginev1.CheckOutput, error) {
outputs, err := metrics.RecordDuration2(metrics.EngineCheckLatency(), func() (outputs []*enginev1.CheckOutput, err error) {
ctx, span := tracing.StartSpan(ctx, "engine.Check")
defer span.End()
checkOpts := newCheckOptions(ctx, engine.conf, opts...)
// 包含了串行以及并行的check
// if the number of inputs is less than the threshold, do a serial execution as it is usually faster.
// ditto if the worker pool is not initialized
if len(inputs) < parallelismThreshold || len(engine.workerPool) == 0 {
outputs, err = engine.checkSerial(ctx, inputs, checkOpts)
} else {
outputs, err = engine.checkParallel(ctx, inputs, checkOpts)
}
if err != nil {
tracing.MarkFailed(span, http.StatusBadRequest, err)
}
return outputs, err
})
metrics.EngineCheckBatchSize().Record(context.Background(), int64(len(inputs)))
return engine.logCheckDecision(ctx, inputs, outputs, err)
}
- checkSerial 以及checkParallel 核心的计算(evaluate)
func (engine *Engine) evaluate(ctx context.Context, input *enginev1.CheckInput, checkOpts *CheckOptions) (*enginev1.CheckOutput, error) {
ctx, span := tracing.StartSpan(ctx, "engine.Evaluate")
defer span.End()
span.SetAttributes(tracing.RequestID(input.RequestId), tracing.ReqResourceID(input.Resource.Id))
// exit early if the context is cancelled
if err := ctx.Err(); err != nil {
tracing.MarkFailed(span, http.StatusRequestTimeout, err)
return nil, err
}
output := &enginev1.CheckOutput{
RequestId: input.RequestId,
ResourceId: input.Resource.Id,
Actions: make(map[string]*enginev1.CheckOutput_ActionEffect, len(input.Actions)),
}
// 先构建上下文
ec, err := engine.buildEvaluationCtx(ctx, checkOpts.evalParams, input)
if err != nil {
return nil, err
}
tctx := tracer.Start(checkOpts.tracerSink)
// evaluate the policies,执行策略
result, err := ec.evaluate(ctx, tctx, input)
if err != nil {
logging.FromContext(ctx).Error("Failed to evaluate policies", zap.Error(err))
return nil, fmt.Errorf("failed to evaluate policies: %w", err)
}
// update the output
for _, action := range input.Actions {
output.Actions[action] = &enginev1.CheckOutput_ActionEffect{
Effect: defaultEffect,
Policy: noPolicyMatch,
}
if einfo, ok := result.effects[action]; ok {
ae := output.Actions[action]
ae.Effect = einfo.Effect
ae.Policy = einfo.Policy
ae.Scope = einfo.Scope
}
}
output.EffectiveDerivedRoles = result.effectiveDerivedRoles
output.ValidationErrors = result.validationErrors
output.Outputs = result.outputs
return output, nil
}
- buildEvaluationCtx 处理
func (engine *Engine) buildEvaluationCtx(ctx context.Context, eparams evalParams, input *enginev1.CheckInput) (*evaluationCtx, error) {
ec := &evaluationCtx{}
principal 以及resource policy 的处理,后边实际执行需要使用具体的实现进行evaluate
// get the principal policy check
ppName, ppVersion, ppScope := engine.policyAttr(input.Principal.Id, input.Principal.PolicyVersion, input.Principal.Scope)
ppCheck, err := engine.getPrincipalPolicyEvaluator(ctx, eparams, ppName, ppVersion, ppScope)
if err != nil {
return nil, fmt.Errorf("failed to get check for [%s.%s]: %w", ppName, ppVersion, err)
}
ec.addCheck(ppCheck)
// get the resource policy check
rpName, rpVersion, rpScope := engine.policyAttr(input.Resource.Kind, input.Resource.PolicyVersion, input.Resource.Scope)
rpCheck, err := engine.getResourcePolicyEvaluator(ctx, eparams, rpName, rpVersion, rpScope)
if err != nil {
return nil, fmt.Errorf("failed to get check for [%s.%s]: %w", rpName, rpVersion, err)
}
ec.addCheck(rpCheck)
return ec, nil
}
- evaluate实际执行
func (ec *evaluationCtx) evaluate(ctx context.Context, tctx tracer.Context, input *enginev1.CheckInput) (*evaluationResult, error) {
ctx, span := tracing.StartSpan(ctx, "engine.EvalCtxEvaluate")
defer span.End()
resp := &evaluationResult{}
if ec.numChecks == 0 {
tracing.MarkFailed(span, http.StatusNotFound, errNoPoliciesMatched)
resp.setDefaultsForUnmatchedActions(tctx, input)
return resp, nil
}
for i := 0; i < ec.numChecks; i++ {
c := ec.checks[i]
result, err := c.Evaluate(ctx, tctx, input)
if err != nil {
logging.FromContext(ctx).Error("Failed to evaluate policy", zap.Error(err))
tracing.MarkFailed(span, http.StatusInternalServerError, err)
return nil, fmt.Errorf("failed to execute policy: %w", err)
}
incomplete := resp.merge(result)
if !incomplete {
return resp, nil
}
}
tracing.MarkFailed(span, http.StatusNotFound, errNoPoliciesMatched)
resp.setDefaultsForUnmatchedActions(tctx, input)
return resp, nil
}
对于Evaluate目前有三种实现,如下。包含了resource 以及principal
- principalPolicyEvaluator 的处理
内部处理都基于了google 的cel-go 包
func (ppe *principalPolicyEvaluator) Evaluate(ctx context.Context, tctx tracer.Context, input *enginev1.CheckInput) (*PolicyEvalResult, error) {
_, span := tracing.StartSpan(ctx, "principal_policy.Evaluate")
span.SetAttributes(tracing.PolicyFQN(ppe.policy.Meta.Fqn))
defer span.End()
policyKey := namer.PolicyKeyFromFQN(ppe.policy.Meta.Fqn)
evalCtx := newEvalContext(ppe.evalParams, checkInputToRequest(input))
result := newEvalResult(input.Actions)
pctx := tctx.StartPolicy(ppe.policy.Meta.Fqn)
for _, p := range ppe.policy.Policies {
actionsToResolve := result.unresolvedActions()
if len(actionsToResolve) == 0 {
return result, nil
}
sctx := pctx.StartScope(p.Scope)
// evaluate the variables of this policy
variables, err := evalCtx.evaluateVariables(sctx.StartVariables(), p.OrderedVariables)
if err != nil {
sctx.Failed(err, "Failed to evaluate variables")
return nil, fmt.Errorf("failed to evaluate variables: %w", err)
}
for resource, resourceRules := range p.ResourceRules {
rctx := sctx.StartResource(resource)
if !util.MatchesGlob(resource, input.Resource.Kind) {
rctx.Skipped(nil, "Did not match input resource kind")
continue
}
for _, rule := range resourceRules.ActionRules {
matchedActions := util.FilterGlob(rule.Action, actionsToResolve)
ruleActivated := false
for _, action := range matchedActions {
actx := rctx.StartAction(action)
ok, err := evalCtx.satisfiesCondition(actx.StartCondition(), rule.Condition, variables)
if err != nil {
actx.Skipped(err, "Error evaluating condition")
continue
}
if !ok {
actx.Skipped(nil, "Condition not satisfied")
continue
}
result.setEffect(action, EffectInfo{Effect: rule.Effect, Policy: policyKey, Scope: p.Scope})
actx.AppliedEffect(rule.Effect, "")
ruleActivated = true
}
// evaluate output expression if the rule was activated
if ruleActivated && rule.Output != nil {
octx := rctx.StartOutput(rule.Name)
output := &enginev1.OutputEntry{
Src: namer.RuleFQN(ppe.policy.Meta, p.Scope, rule.Name),
Val: evalCtx.evaluateProtobufValueCELExpr(rule.Output.Checked, variables),
}
result.Outputs = append(result.Outputs, output)
octx.ComputedOutput(output)
}
}
}
}
result.setDefaultEffect(pctx, EffectInfo{Effect: effectv1.Effect_EFFECT_NO_MATCH})
return result, nil
}
- resourcePolicyEvaluator 的处理
内部处理都基于了google 的cel-go 包
func (rpe *resourcePolicyEvaluator) Evaluate(ctx context.Context, tctx tracer.Context, input *enginev1.CheckInput) (*PolicyEvalResult, error) {
_, span := tracing.StartSpan(ctx, "resource_policy.Evaluate")
span.SetAttributes(tracing.PolicyFQN(rpe.policy.Meta.Fqn))
defer span.End()
policyKey := namer.PolicyKeyFromFQN(rpe.policy.Meta.Fqn)
request := checkInputToRequest(input)
result := newEvalResult(input.Actions)
effectiveRoles := internal.ToSet(input.Principal.Roles)
pctx := tctx.StartPolicy(rpe.policy.Meta.Fqn)
// validate the input
vr, err := rpe.schemaMgr.ValidateCheckInput(ctx, rpe.policy.Schemas, input)
if err != nil {
pctx.Failed(err, "Error during validation")
return nil, fmt.Errorf("failed to validate input: %w", err)
}
if len(vr.Errors) > 0 {
result.ValidationErrors = vr.Errors.SchemaErrors()
pctx.Failed(vr.Errors, "Validation errors")
if vr.Reject {
for _, action := range input.Actions {
actx := pctx.StartAction(action)
result.setEffect(action, EffectInfo{Effect: effectv1.Effect_EFFECT_DENY, Policy: policyKey})
actx.AppliedEffect(effectv1.Effect_EFFECT_DENY, "Rejected due to validation failures")
}
return result, nil
}
}
// evaluate policies in the set
for _, p := range rpe.policy.Policies {
// Get the actions that are yet to be resolved. This is to implement first-match-wins semantics.
// Within the context of a single policy, later rules can potentially override the result for an action (unless it was DENY).
actionsToResolve := result.unresolvedActions()
if len(actionsToResolve) == 0 {
return result, nil
}
sctx := pctx.StartScope(p.Scope)
evalCtx := newEvalContext(rpe.evalParams, request)
// calculate the set of effective derived roles
effectiveDerivedRoles := make(internal.StringSet, len(p.DerivedRoles))
for drName, dr := range p.DerivedRoles {
dctx := sctx.StartDerivedRole(drName)
if !internal.SetIntersects(dr.ParentRoles, effectiveRoles) {
dctx.Skipped(nil, "No matching roles")
continue
}
// evaluate variables of this derived roles set
drVariables, err := evalCtx.evaluateVariables(dctx.StartVariables(), dr.OrderedVariables)
if err != nil {
dctx.Skipped(err, "Error evaluating variables")
continue
}
ok, err := evalCtx.satisfiesCondition(dctx.StartCondition(), dr.Condition, drVariables)
if err != nil {
dctx.Skipped(err, "Error evaluating condition")
continue
}
if !ok {
dctx.Skipped(nil, "Condition not satisfied")
continue
}
effectiveDerivedRoles[drName] = struct{}{}
result.EffectiveDerivedRoles[drName] = struct{}{}
dctx.Activated()
}
evalCtx = evalCtx.withEffectiveDerivedRoles(effectiveDerivedRoles)
// evaluate the variables of this policy
variables, err := evalCtx.evaluateVariables(sctx.StartVariables(), p.OrderedVariables)
if err != nil {
sctx.Failed(err, "Failed to evaluate variables")
return nil, fmt.Errorf("failed to evaluate variables: %w", err)
}
// evaluate each rule until all actions have a result
for _, rule := range p.Rules {
rctx := sctx.StartRule(rule.Name)
if !internal.SetIntersects(rule.Roles, effectiveRoles) && !internal.SetIntersects(rule.DerivedRoles, evalCtx.effectiveDerivedRoles) {
rctx.Skipped(nil, "No matching roles or derived roles")
continue
}
ruleActivated := false
for actionGlob := range rule.Actions {
matchedActions := util.FilterGlob(actionGlob, actionsToResolve)
for _, action := range matchedActions {
actx := rctx.StartAction(action)
ok, err := evalCtx.satisfiesCondition(actx.StartCondition(), rule.Condition, variables)
if err != nil {
actx.Skipped(err, "Error evaluating condition")
continue
}
if !ok {
actx.Skipped(nil, "Condition not satisfied")
continue
}
result.setEffect(action, EffectInfo{Effect: rule.Effect, Policy: policyKey, Scope: p.Scope})
actx.AppliedEffect(rule.Effect, "")
ruleActivated = true
}
}
// evaluate output expression if the rule was activated
if ruleActivated && rule.Output != nil {
octx := rctx.StartOutput(rule.Name)
output := &enginev1.OutputEntry{
Src: namer.RuleFQN(rpe.policy.Meta, p.Scope, rule.Name),
Val: evalCtx.evaluateProtobufValueCELExpr(rule.Output.Checked, variables),
}
result.Outputs = append(result.Outputs, output)
octx.ComputedOutput(output)
}
}
}
// set the default effect for actions that were not matched
result.setDefaultEffect(pctx, EffectInfo{Effect: effectv1.Effect_EFFECT_DENY, Policy: policyKey})
return result, nil
}
说明
cerbos 内部check 的处理还是比较复杂的,而且属于cerbos 的核心部分
参考资料
https://cerbos.dev/
https://cerbos.dev/product-cerbos-hub
https://github.com/cerbos/cerbos
https://github.com/google/cel-spec