You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
501 lines
21 KiB
501 lines
21 KiB
# Copyright 2021 Google LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""Downscoping with Credential Access Boundaries
|
|
|
|
This module provides the ability to downscope credentials using
|
|
`Downscoping with Credential Access Boundaries`_. This is useful to restrict the
|
|
Identity and Access Management (IAM) permissions that a short-lived credential
|
|
can use.
|
|
|
|
To downscope permissions of a source credential, a Credential Access Boundary
|
|
that specifies which resources the new credential can access, as well as
|
|
an upper bound on the permissions that are available on each resource, has to
|
|
be defined. A downscoped credential can then be instantiated using the source
|
|
credential and the Credential Access Boundary.
|
|
|
|
The common pattern of usage is to have a token broker with elevated access
|
|
generate these downscoped credentials from higher access source credentials and
|
|
pass the downscoped short-lived access tokens to a token consumer via some
|
|
secure authenticated channel for limited access to Google Cloud Storage
|
|
resources.
|
|
|
|
For example, a token broker can be set up on a server in a private network.
|
|
Various workloads (token consumers) in the same network will send authenticated
|
|
requests to that broker for downscoped tokens to access or modify specific google
|
|
cloud storage buckets.
|
|
|
|
The broker will instantiate downscoped credentials instances that can be used to
|
|
generate short lived downscoped access tokens that can be passed to the token
|
|
consumer. These downscoped access tokens can be injected by the consumer into
|
|
google.oauth2.Credentials and used to initialize a storage client instance to
|
|
access Google Cloud Storage resources with restricted access.
|
|
|
|
Note: Only Cloud Storage supports Credential Access Boundaries. Other Google
|
|
Cloud services do not support this feature.
|
|
|
|
.. _Downscoping with Credential Access Boundaries: https://cloud.google.com/iam/docs/downscoping-short-lived-credentials
|
|
"""
|
|
|
|
import datetime
|
|
|
|
import six
|
|
|
|
from google.auth import _helpers
|
|
from google.auth import credentials
|
|
from google.oauth2 import sts
|
|
|
|
# The maximum number of access boundary rules a Credential Access Boundary can
|
|
# contain.
|
|
_MAX_ACCESS_BOUNDARY_RULES_COUNT = 10
|
|
# The token exchange grant_type used for exchanging credentials.
|
|
_STS_GRANT_TYPE = "urn:ietf:params:oauth:grant-type:token-exchange"
|
|
# The token exchange requested_token_type. This is always an access_token.
|
|
_STS_REQUESTED_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
|
|
# The STS token URL used to exchanged a short lived access token for a downscoped one.
|
|
_STS_TOKEN_URL = "https://sts.googleapis.com/v1/token"
|
|
# The subject token type to use when exchanging a short lived access token for a
|
|
# downscoped token.
|
|
_STS_SUBJECT_TOKEN_TYPE = "urn:ietf:params:oauth:token-type:access_token"
|
|
|
|
|
|
class CredentialAccessBoundary(object):
|
|
"""Defines a Credential Access Boundary which contains a list of access boundary
|
|
rules. Each rule contains information on the resource that the rule applies to,
|
|
the upper bound of the permissions that are available on that resource and an
|
|
optional condition to further restrict permissions.
|
|
"""
|
|
|
|
def __init__(self, rules=[]):
|
|
"""Instantiates a Credential Access Boundary. A Credential Access Boundary
|
|
can contain up to 10 access boundary rules.
|
|
|
|
Args:
|
|
rules (Sequence[google.auth.downscoped.AccessBoundaryRule]): The list of
|
|
access boundary rules limiting the access that a downscoped credential
|
|
will have.
|
|
Raises:
|
|
TypeError: If any of the rules are not a valid type.
|
|
ValueError: If the provided rules exceed the maximum allowed.
|
|
"""
|
|
self.rules = rules
|
|
|
|
@property
|
|
def rules(self):
|
|
"""Returns the list of access boundary rules defined on the Credential
|
|
Access Boundary.
|
|
|
|
Returns:
|
|
Tuple[google.auth.downscoped.AccessBoundaryRule, ...]: The list of access
|
|
boundary rules defined on the Credential Access Boundary. These are returned
|
|
as an immutable tuple to prevent modification.
|
|
"""
|
|
return tuple(self._rules)
|
|
|
|
@rules.setter
|
|
def rules(self, value):
|
|
"""Updates the current rules on the Credential Access Boundary. This will overwrite
|
|
the existing set of rules.
|
|
|
|
Args:
|
|
value (Sequence[google.auth.downscoped.AccessBoundaryRule]): The list of
|
|
access boundary rules limiting the access that a downscoped credential
|
|
will have.
|
|
Raises:
|
|
TypeError: If any of the rules are not a valid type.
|
|
ValueError: If the provided rules exceed the maximum allowed.
|
|
"""
|
|
if len(value) > _MAX_ACCESS_BOUNDARY_RULES_COUNT:
|
|
raise ValueError(
|
|
"Credential access boundary rules can have a maximum of {} rules.".format(
|
|
_MAX_ACCESS_BOUNDARY_RULES_COUNT
|
|
)
|
|
)
|
|
for access_boundary_rule in value:
|
|
if not isinstance(access_boundary_rule, AccessBoundaryRule):
|
|
raise TypeError(
|
|
"List of rules provided do not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
|
|
)
|
|
# Make a copy of the original list.
|
|
self._rules = list(value)
|
|
|
|
def add_rule(self, rule):
|
|
"""Adds a single access boundary rule to the existing rules.
|
|
|
|
Args:
|
|
rule (google.auth.downscoped.AccessBoundaryRule): The access boundary rule,
|
|
limiting the access that a downscoped credential will have, to be added to
|
|
the existing rules.
|
|
Raises:
|
|
TypeError: If any of the rules are not a valid type.
|
|
ValueError: If the provided rules exceed the maximum allowed.
|
|
"""
|
|
if len(self.rules) == _MAX_ACCESS_BOUNDARY_RULES_COUNT:
|
|
raise ValueError(
|
|
"Credential access boundary rules can have a maximum of {} rules.".format(
|
|
_MAX_ACCESS_BOUNDARY_RULES_COUNT
|
|
)
|
|
)
|
|
if not isinstance(rule, AccessBoundaryRule):
|
|
raise TypeError(
|
|
"The provided rule does not contain a valid 'google.auth.downscoped.AccessBoundaryRule'."
|
|
)
|
|
self._rules.append(rule)
|
|
|
|
def to_json(self):
|
|
"""Generates the dictionary representation of the Credential Access Boundary.
|
|
This uses the format expected by the Security Token Service API as documented in
|
|
`Defining a Credential Access Boundary`_.
|
|
|
|
.. _Defining a Credential Access Boundary:
|
|
https://cloud.google.com/iam/docs/downscoping-short-lived-credentials#define-boundary
|
|
|
|
Returns:
|
|
Mapping: Credential Access Boundary Rule represented in a dictionary object.
|
|
"""
|
|
rules = []
|
|
for access_boundary_rule in self.rules:
|
|
rules.append(access_boundary_rule.to_json())
|
|
|
|
return {"accessBoundary": {"accessBoundaryRules": rules}}
|
|
|
|
|
|
class AccessBoundaryRule(object):
|
|
"""Defines an access boundary rule which contains information on the resource that
|
|
the rule applies to, the upper bound of the permissions that are available on that
|
|
resource and an optional condition to further restrict permissions.
|
|
"""
|
|
|
|
def __init__(
|
|
self, available_resource, available_permissions, availability_condition=None
|
|
):
|
|
"""Instantiates a single access boundary rule.
|
|
|
|
Args:
|
|
available_resource (str): The full resource name of the Cloud Storage bucket
|
|
that the rule applies to. Use the format
|
|
"//storage.googleapis.com/projects/_/buckets/bucket-name".
|
|
available_permissions (Sequence[str]): A list defining the upper bound that
|
|
the downscoped token will have on the available permissions for the
|
|
resource. Each value is the identifier for an IAM predefined role or
|
|
custom role, with the prefix "inRole:". For example:
|
|
"inRole:roles/storage.objectViewer".
|
|
Only the permissions in these roles will be available.
|
|
availability_condition (Optional[google.auth.downscoped.AvailabilityCondition]):
|
|
Optional condition that restricts the availability of permissions to
|
|
specific Cloud Storage objects.
|
|
|
|
Raises:
|
|
TypeError: If any of the parameters are not of the expected types.
|
|
ValueError: If any of the parameters are not of the expected values.
|
|
"""
|
|
self.available_resource = available_resource
|
|
self.available_permissions = available_permissions
|
|
self.availability_condition = availability_condition
|
|
|
|
@property
|
|
def available_resource(self):
|
|
"""Returns the current available resource.
|
|
|
|
Returns:
|
|
str: The current available resource.
|
|
"""
|
|
return self._available_resource
|
|
|
|
@available_resource.setter
|
|
def available_resource(self, value):
|
|
"""Updates the current available resource.
|
|
|
|
Args:
|
|
value (str): The updated value of the available resource.
|
|
|
|
Raises:
|
|
TypeError: If the value is not a string.
|
|
"""
|
|
if not isinstance(value, six.string_types):
|
|
raise TypeError("The provided available_resource is not a string.")
|
|
self._available_resource = value
|
|
|
|
@property
|
|
def available_permissions(self):
|
|
"""Returns the current available permissions.
|
|
|
|
Returns:
|
|
Tuple[str, ...]: The current available permissions. These are returned
|
|
as an immutable tuple to prevent modification.
|
|
"""
|
|
return tuple(self._available_permissions)
|
|
|
|
@available_permissions.setter
|
|
def available_permissions(self, value):
|
|
"""Updates the current available permissions.
|
|
|
|
Args:
|
|
value (Sequence[str]): The updated value of the available permissions.
|
|
|
|
Raises:
|
|
TypeError: If the value is not a list of strings.
|
|
ValueError: If the value is not valid.
|
|
"""
|
|
for available_permission in value:
|
|
if not isinstance(available_permission, six.string_types):
|
|
raise TypeError(
|
|
"Provided available_permissions are not a list of strings."
|
|
)
|
|
if available_permission.find("inRole:") != 0:
|
|
raise ValueError(
|
|
"available_permissions must be prefixed with 'inRole:'."
|
|
)
|
|
# Make a copy of the original list.
|
|
self._available_permissions = list(value)
|
|
|
|
@property
|
|
def availability_condition(self):
|
|
"""Returns the current availability condition.
|
|
|
|
Returns:
|
|
Optional[google.auth.downscoped.AvailabilityCondition]: The current
|
|
availability condition.
|
|
"""
|
|
return self._availability_condition
|
|
|
|
@availability_condition.setter
|
|
def availability_condition(self, value):
|
|
"""Updates the current availability condition.
|
|
|
|
Args:
|
|
value (Optional[google.auth.downscoped.AvailabilityCondition]): The updated
|
|
value of the availability condition.
|
|
|
|
Raises:
|
|
TypeError: If the value is not of type google.auth.downscoped.AvailabilityCondition
|
|
or None.
|
|
"""
|
|
if not isinstance(value, AvailabilityCondition) and value is not None:
|
|
raise TypeError(
|
|
"The provided availability_condition is not a 'google.auth.downscoped.AvailabilityCondition' or None."
|
|
)
|
|
self._availability_condition = value
|
|
|
|
def to_json(self):
|
|
"""Generates the dictionary representation of the access boundary rule.
|
|
This uses the format expected by the Security Token Service API as documented in
|
|
`Defining a Credential Access Boundary`_.
|
|
|
|
.. _Defining a Credential Access Boundary:
|
|
https://cloud.google.com/iam/docs/downscoping-short-lived-credentials#define-boundary
|
|
|
|
Returns:
|
|
Mapping: The access boundary rule represented in a dictionary object.
|
|
"""
|
|
json = {
|
|
"availablePermissions": list(self.available_permissions),
|
|
"availableResource": self.available_resource,
|
|
}
|
|
if self.availability_condition:
|
|
json["availabilityCondition"] = self.availability_condition.to_json()
|
|
return json
|
|
|
|
|
|
class AvailabilityCondition(object):
|
|
"""An optional condition that can be used as part of a Credential Access Boundary
|
|
to further restrict permissions."""
|
|
|
|
def __init__(self, expression, title=None, description=None):
|
|
"""Instantiates an availability condition using the provided expression and
|
|
optional title or description.
|
|
|
|
Args:
|
|
expression (str): A condition expression that specifies the Cloud Storage
|
|
objects where permissions are available. For example, this expression
|
|
makes permissions available for objects whose name starts with "customer-a":
|
|
"resource.name.startsWith('projects/_/buckets/example-bucket/objects/customer-a')"
|
|
title (Optional[str]): An optional short string that identifies the purpose of
|
|
the condition.
|
|
description (Optional[str]): Optional details about the purpose of the condition.
|
|
|
|
Raises:
|
|
TypeError: If any of the parameters are not of the expected types.
|
|
ValueError: If any of the parameters are not of the expected values.
|
|
"""
|
|
self.expression = expression
|
|
self.title = title
|
|
self.description = description
|
|
|
|
@property
|
|
def expression(self):
|
|
"""Returns the current condition expression.
|
|
|
|
Returns:
|
|
str: The current conditon expression.
|
|
"""
|
|
return self._expression
|
|
|
|
@expression.setter
|
|
def expression(self, value):
|
|
"""Updates the current condition expression.
|
|
|
|
Args:
|
|
value (str): The updated value of the condition expression.
|
|
|
|
Raises:
|
|
TypeError: If the value is not of type string.
|
|
"""
|
|
if not isinstance(value, six.string_types):
|
|
raise TypeError("The provided expression is not a string.")
|
|
self._expression = value
|
|
|
|
@property
|
|
def title(self):
|
|
"""Returns the current title.
|
|
|
|
Returns:
|
|
Optional[str]: The current title.
|
|
"""
|
|
return self._title
|
|
|
|
@title.setter
|
|
def title(self, value):
|
|
"""Updates the current title.
|
|
|
|
Args:
|
|
value (Optional[str]): The updated value of the title.
|
|
|
|
Raises:
|
|
TypeError: If the value is not of type string or None.
|
|
"""
|
|
if not isinstance(value, six.string_types) and value is not None:
|
|
raise TypeError("The provided title is not a string or None.")
|
|
self._title = value
|
|
|
|
@property
|
|
def description(self):
|
|
"""Returns the current description.
|
|
|
|
Returns:
|
|
Optional[str]: The current description.
|
|
"""
|
|
return self._description
|
|
|
|
@description.setter
|
|
def description(self, value):
|
|
"""Updates the current description.
|
|
|
|
Args:
|
|
value (Optional[str]): The updated value of the description.
|
|
|
|
Raises:
|
|
TypeError: If the value is not of type string or None.
|
|
"""
|
|
if not isinstance(value, six.string_types) and value is not None:
|
|
raise TypeError("The provided description is not a string or None.")
|
|
self._description = value
|
|
|
|
def to_json(self):
|
|
"""Generates the dictionary representation of the availability condition.
|
|
This uses the format expected by the Security Token Service API as documented in
|
|
`Defining a Credential Access Boundary`_.
|
|
|
|
.. _Defining a Credential Access Boundary:
|
|
https://cloud.google.com/iam/docs/downscoping-short-lived-credentials#define-boundary
|
|
|
|
Returns:
|
|
Mapping[str, str]: The availability condition represented in a dictionary
|
|
object.
|
|
"""
|
|
json = {"expression": self.expression}
|
|
if self.title:
|
|
json["title"] = self.title
|
|
if self.description:
|
|
json["description"] = self.description
|
|
return json
|
|
|
|
|
|
class Credentials(credentials.CredentialsWithQuotaProject):
|
|
"""Defines a set of Google credentials that are downscoped from an existing set
|
|
of Google OAuth2 credentials. This is useful to restrict the Identity and Access
|
|
Management (IAM) permissions that a short-lived credential can use.
|
|
The common pattern of usage is to have a token broker with elevated access
|
|
generate these downscoped credentials from higher access source credentials and
|
|
pass the downscoped short-lived access tokens to a token consumer via some
|
|
secure authenticated channel for limited access to Google Cloud Storage
|
|
resources.
|
|
"""
|
|
|
|
def __init__(
|
|
self, source_credentials, credential_access_boundary, quota_project_id=None
|
|
):
|
|
"""Instantiates a downscoped credentials object using the provided source
|
|
credentials and credential access boundary rules.
|
|
To downscope permissions of a source credential, a Credential Access Boundary
|
|
that specifies which resources the new credential can access, as well as an
|
|
upper bound on the permissions that are available on each resource, has to be
|
|
defined. A downscoped credential can then be instantiated using the source
|
|
credential and the Credential Access Boundary.
|
|
|
|
Args:
|
|
source_credentials (google.auth.credentials.Credentials): The source credentials
|
|
to be downscoped based on the provided Credential Access Boundary rules.
|
|
credential_access_boundary (google.auth.downscoped.CredentialAccessBoundary):
|
|
The Credential Access Boundary which contains a list of access boundary
|
|
rules. Each rule contains information on the resource that the rule applies to,
|
|
the upper bound of the permissions that are available on that resource and an
|
|
optional condition to further restrict permissions.
|
|
quota_project_id (Optional[str]): The optional quota project ID.
|
|
Raises:
|
|
google.auth.exceptions.RefreshError: If the source credentials
|
|
return an error on token refresh.
|
|
google.auth.exceptions.OAuthError: If the STS token exchange
|
|
endpoint returned an error during downscoped token generation.
|
|
"""
|
|
|
|
super(Credentials, self).__init__()
|
|
self._source_credentials = source_credentials
|
|
self._credential_access_boundary = credential_access_boundary
|
|
self._quota_project_id = quota_project_id
|
|
self._sts_client = sts.Client(_STS_TOKEN_URL)
|
|
|
|
@_helpers.copy_docstring(credentials.Credentials)
|
|
def refresh(self, request):
|
|
# Generate an access token from the source credentials.
|
|
self._source_credentials.refresh(request)
|
|
now = _helpers.utcnow()
|
|
# Exchange the access token for a downscoped access token.
|
|
response_data = self._sts_client.exchange_token(
|
|
request=request,
|
|
grant_type=_STS_GRANT_TYPE,
|
|
subject_token=self._source_credentials.token,
|
|
subject_token_type=_STS_SUBJECT_TOKEN_TYPE,
|
|
requested_token_type=_STS_REQUESTED_TOKEN_TYPE,
|
|
additional_options=self._credential_access_boundary.to_json(),
|
|
)
|
|
self.token = response_data.get("access_token")
|
|
# For downscoping CAB flow, the STS endpoint may not return the expiration
|
|
# field for some flows. The generated downscoped token should always have
|
|
# the same expiration time as the source credentials. When no expires_in
|
|
# field is returned in the response, we can just get the expiration time
|
|
# from the source credentials.
|
|
if response_data.get("expires_in"):
|
|
lifetime = datetime.timedelta(seconds=response_data.get("expires_in"))
|
|
self.expiry = now + lifetime
|
|
else:
|
|
self.expiry = self._source_credentials.expiry
|
|
|
|
@_helpers.copy_docstring(credentials.CredentialsWithQuotaProject)
|
|
def with_quota_project(self, quota_project_id):
|
|
return self.__class__(
|
|
self._source_credentials,
|
|
self._credential_access_boundary,
|
|
quota_project_id=quota_project_id,
|
|
)
|
|
|