Source code for config.config_node

from indykite_sdk.indykite.config.v1beta1 import config_management_api_pb2 as pb2
from indykite_sdk.indykite.config.v1beta1.model_pb2 import google_dot_protobuf_dot_wrappers__pb2 as wrappers
from indykite_sdk.model.create_config_node import CreateConfigNode
from indykite_sdk.model.update_config_node import UpdateConfigNode
from indykite_sdk.model.config_node import ConfigNode
from indykite_sdk.model.conveyance_preference import ConveyancePreference
from indykite_sdk.model.user_verification_requirement import UserVerificationRequirement
from indykite_sdk.model.authenticator_attachment import AuthenticatorAttachment
from indykite_sdk.model.authorization_policy_config_status import Status
from indykite_sdk.indykite.config.v1beta1 import model_pb2
import sys
import indykite_sdk.utils.logger as logger


[docs]def create_email_service_config_node(self, location, name, display_name, description, email_service_config, bookmarks=[]): """ create email service :param self: :param location: string gid id :param name: string pattern: ^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])$ :param display_name: string :param description: string :param email_service_config: EmailServiceConfig object :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized CreateConfigNode instance """ sys.excepthook = logger.handle_excepthook try: response = self.stub.CreateConfigNode( pb2.CreateConfigNodeRequest( location=location, name=name, display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), email_service_config=email_service_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return CreateConfigNode.deserialize(response)
[docs]def read_config_node(self, config_node_id, bookmarks=[]): """ read a specific config node :param self: :param config_node_id: string gid id :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized ConfigNode instance """ sys.excepthook = logger.handle_excepthook try: response = self.stub.ReadConfigNode( pb2.ReadConfigNodeRequest( id=str(config_node_id), bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return ConfigNode.deserialize(response.config_node)
[docs]def update_email_service_config_node(self, config_node_id, etag, display_name, description, email_service_config, bookmarks=[]): """ update email service :param self: :param config_node_id: string gid id :param etag: string :param display_name: string :param description: string :param email_service_config: EmailServiceConfig object :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized UpdateConfigNode instance """ sys.excepthook = logger.handle_excepthook try: response = self.stub.UpdateConfigNode( pb2.UpdateConfigNodeRequest( id=config_node_id, etag=wrappers.StringValue(value=etag), display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), email_service_config=email_service_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return UpdateConfigNode.deserialize(response)
[docs]def delete_config_node(self, config_node_id, etag, bookmarks=[]): """ delete a specific config node :param self: :param config_node_id: string gid id :param etag: etag :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: DeleteConfigNodeResponse object """ sys.excepthook = logger.handle_excepthook try: response = self.stub.DeleteConfigNode( pb2.DeleteConfigNodeRequest( id=str(config_node_id), etag=wrappers.StringValue(value=etag), bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return response
[docs]def create_auth_flow_config_node(self, location, name, display_name, description, auth_flow_config, bookmarks=[]): """ create auth flow :param self: :param location: string gid id :param name: string pattern: ^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])$ :param display_name: string :param description: string :param auth_flow_config: AuthFlowConfig object :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized CreateConfigNode instance """ sys.excepthook = logger.handle_excepthook try: response = self.stub.CreateConfigNode( pb2.CreateConfigNodeRequest( location=location, name=name, display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), auth_flow_config=auth_flow_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return CreateConfigNode.deserialize(response)
[docs]def update_auth_flow_config_node(self, config_node_id, etag, display_name, description, auth_flow_config, bookmarks=[]): """ update auth flow :param self: :param config_node_id: string gid id :param etag: string :param display_name: string :param description: string :param auth_flow_config: AuthFlowConfig object :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized UpdateConfigNode instance """ sys.excepthook = logger.handle_excepthook try: response = self.stub.UpdateConfigNode( pb2.UpdateConfigNodeRequest( id=config_node_id, etag=wrappers.StringValue(value=etag), display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), auth_flow_config=auth_flow_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return UpdateConfigNode.deserialize(response)
[docs]def auth_flow_config(self, source_format, source, default): """ create AuthFlowConfig :param self: :param source_format: AuthFlowConfig.Format enum value :param source: bytes of json string :param default: google.protobuf.BoolValue :return: """ sys.excepthook = logger.handle_excepthook try: auth_flow_config = model_pb2.AuthFlowConfig( source_format=source_format, source=bytes(source), default=wrappers.BoolValue(value=default) ) return auth_flow_config except Exception as exception: return logger.logger_error(exception)
[docs]def create_oauth2_client_config_node(self, location, name, display_name, description, oauth2_client_config, bookmarks=[]): """ create oauth2 client :param self: :param location: string gid id :param name: string pattern: ^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])$ :param display_name: string :param description: string :param oauth2_client_config: OAuth2ClientConfig object :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized CreateConfigNode instance """ sys.excepthook = logger.handle_excepthook try: response = self.stub.CreateConfigNode( pb2.CreateConfigNodeRequest( location=location, name=name, display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), oauth2_client_config=oauth2_client_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return CreateConfigNode.deserialize(response)
[docs]def update_oauth2_client_config_node(self, config_node_id, etag, display_name, description, oauth2_client_config, bookmarks=[]): """ update OAuth2 client :param self: :param config_node_id: string gid id :param etag: string :param display_name: string :param description: string :param oauth2_client_config: OAuth2ClientConfig object :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized UpdateConfigNode instance """ sys.excepthook = logger.handle_excepthook try: response = self.stub.UpdateConfigNode( pb2.UpdateConfigNodeRequest( id=config_node_id, etag=wrappers.StringValue(value=etag), display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), oauth2_client_config=oauth2_client_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return UpdateConfigNode.deserialize(response)
[docs]def create_webauthn_provider_config_node(self, location, name, display_name, description, webauthn_provider_config, bookmarks=[]): """ create webauthn provider :param self: :param location: string gid id :param name: string pattern: ^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])$ :param display_name: string :param description: string :param webauthn_provider_config: WebAuthnProviderConfig object :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized CreateConfigNode instance """ sys.excepthook = logger.handle_excepthook try: if webauthn_provider_config and webauthn_provider_config.attestation_preference: self.validate_conveyance(webauthn_provider_config.attestation_preference) if webauthn_provider_config and webauthn_provider_config.user_verification: self.validate_user_verification(webauthn_provider_config.attestation_preference) if webauthn_provider_config and webauthn_provider_config.authenticator_attachment: self.validate_authenticator_attachment(webauthn_provider_config.authenticator_attachment) response = self.stub.CreateConfigNode( pb2.CreateConfigNodeRequest( location=location, name=name, display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), webauthn_provider_config=webauthn_provider_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return CreateConfigNode.deserialize(response)
[docs]def update_webauthn_provider_config_node(self, config_node_id, etag, display_name, description, webauthn_provider_config, bookmarks=[]): """ update webauthn provider :param self: :param config_node_id: string gid id :param etag: string :param display_name: string :param description: string :param webauthn_provider_config: WebAuthnProviderConfig object :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized UpdateConfigNode instance """ sys.excepthook = logger.handle_excepthook try: if webauthn_provider_config and webauthn_provider_config.attestation_preference: self.validate_conveyance(webauthn_provider_config.attestation_preference) if webauthn_provider_config and webauthn_provider_config.user_verification: self.validate_user_verification(webauthn_provider_config.user_verification) if webauthn_provider_config and webauthn_provider_config.authenticator_attachment: self.validate_authenticator_attachment(webauthn_provider_config.authenticator_attachment) response = self.stub.UpdateConfigNode( pb2.UpdateConfigNodeRequest( id=config_node_id, etag=wrappers.StringValue(value=etag), display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), webauthn_provider_config=webauthn_provider_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return UpdateConfigNode.deserialize(response)
[docs]def webauthn_provider_config(self, relying_parties, attestation_preference, authenticator_attachment, require_resident_key, user_verification, registration_timeout, authentication_timeout): """ create WebAuthnProviderConfig :param self: :param relying_parties: map<string, string> :param attestation_preference: ConveyancePreference enum value :param authenticator_attachment: AuthenticatorAttachment enum value :param require_resident_key: bool :param user_verification: UserVerificationRequirement enum value :param registration_timeout: google.protobuf.Duration (google.protobuf.duration_pb2.Duration) :param authentication_timeout: google.protobuf.Duration (google.protobuf.duration_pb2.Duration) :return: WebAuthnProviderConfig object """ sys.excepthook = logger.handle_excepthook try: webauthn_provider = model_pb2.WebAuthnProviderConfig( relying_parties=relying_parties, attestation_preference=attestation_preference, authenticator_attachment=authenticator_attachment, require_resident_key=bool(require_resident_key), user_verification=user_verification, registration_timeout=registration_timeout, authentication_timeout=authentication_timeout ) return webauthn_provider except Exception as exception: return logger.logger_error(exception)
[docs]def create_authorization_policy_config_node(self, location, name, display_name, description, authorization_policy_config, bookmarks=[]): """ create authorization policy :param self: :param location: string gid id :param name: string pattern: ^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])$ :param display_name: string :param description: string :param authorization_policy_config: AuthorizationPolicyConfig object :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized CreateConfigNode instance """ sys.excepthook = logger.handle_excepthook try: valid = True if authorization_policy_config and authorization_policy_config.status: valid = self.validate_authorization_policy_status(authorization_policy_config.status) if valid: response = self.stub.CreateConfigNode( pb2.CreateConfigNodeRequest( location=location, name=name, display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), authorization_policy_config=authorization_policy_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return CreateConfigNode.deserialize(response)
[docs]def update_authorization_policy_config_node(self, config_node_id, etag, display_name, description, authorization_policy_config, bookmarks=[]): """ update authorization policy :param self: :param config_node_id: string gid id :param etag: string :param display_name: string :param description: string :param authorization_policy_config: AuthorizationPolicyConfig object :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized UpdateConfigNode instance """ sys.excepthook = logger.handle_excepthook try: valid = True if authorization_policy_config and authorization_policy_config.status: valid = self.validate_authorization_policy_status(authorization_policy_config.status) if valid: response = self.stub.UpdateConfigNode( pb2.UpdateConfigNodeRequest( id=config_node_id, etag=wrappers.StringValue(value=etag), display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), authorization_policy_config= authorization_policy_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return UpdateConfigNode.deserialize(response)
[docs]def authorization_policy_config(self, policy, status, tags=[]): """ create AuthorizationPolicyConfig :param self: :param policy: JSON string format :param status: AuthorizationPolicyConfig.Status :param tags: list of strings :return: AuthorizationPolicyConfig object """ sys.excepthook = logger.handle_excepthook try: policy_config = model_pb2.AuthorizationPolicyConfig( policy=str(policy), status=status, tags=tags ) return policy_config except Exception as exception: return logger.logger_error(exception)
[docs]def validate_conveyance(self, conveyance): """ validate conveyance :param self: :param conveyance: string :return: True if valid or raises error """ try: conveyances = [c.value for c in ConveyancePreference] if conveyance not in conveyances: raise TypeError("conveyance must be a member of ConveyancePreference") return True except Exception as exception: return logger.logger_error(exception)
[docs]def validate_user_verification(self, user_verification_requirement): """ validate user verification requirement :param self: :param user_verification_requirement: string :return: True if valid or raises error """ try: user_verification_requirements = [u.value for u in UserVerificationRequirement] if user_verification_requirement not in user_verification_requirements: raise TypeError("user_verification_requirements must be a member of UserVerificationRequirement") return True except Exception as exception: return logger.logger_error(exception)
[docs]def validate_authenticator_attachment(self, authenticator_attachment): """ validate authenticator attachment :param self: :param authenticator_attachment: string :return: True if valid or raises error """ try: authenticator_attachments = [a.value for a in AuthenticatorAttachment] if authenticator_attachment not in authenticator_attachments: raise TypeError("authenticator_attachment must be a member of AuthenticatorAttachment") return True except Exception as exception: return logger.logger_error(exception)
[docs]def validate_authorization_policy_status(self, status): """ validate status :param self: :param status: string :return: True if valid or raises error """ try: statuses = [s.value for s in Status] if status not in statuses: raise TypeError("status must be a member of AuthorizationPolicyConfig.Status") return True except Exception as exception: return logger.logger_error(exception)
[docs]def create_readid_provider_config_node(self, location, name, display_name, description, readid_provider_config, bookmarks=[]): """ create ReadID provider :param self: :param location: appSpace GID id :param name: String min_len: 2, max_len: 63, pattern: "^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])$" :param display_name: String :param description: String :param readid_provider_config: ReadIDProviderConfig object :param bookmarks: list :return: """ sys.excepthook = logger.handle_excepthook try: response = self.stub.CreateConfigNode( pb2.CreateConfigNodeRequest( location=location, name=name, display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), readid_provider_config=readid_provider_config, bookmarks=bookmarks ) ) return CreateConfigNode.deserialize(response) except Exception as exception: return logger.logger_error(exception)
[docs]def update_readid_provider_config_node(self, config_node_id, etag, display_name, description, readid_provider_config, bookmarks=[]): """ update readID provider :param self: :param config_node_id: config node gid ID :param etag: string :param display_name: string :param description: string :param readid_provider_config: ReadIDProviderConfig object :param bookmarks: :return: """ sys.excepthook = logger.handle_excepthook try: response = self.stub.UpdateConfigNode( pb2.UpdateConfigNodeRequest( id=config_node_id, etag=wrappers.StringValue(value=etag), display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), readid_provider_config=readid_provider_config, bookmarks=bookmarks ) ) return UpdateConfigNode.deserialize(response) except Exception as exception: return logger.logger_error(exception)
[docs]def readid_provider_config(self, submitter_secret, manager_secret, submitter_password, host_address, property_map={}, unique_property_name=None ): """ create ReadIDProviderConfig object :param submitter_secret: string min_len: 36 required :param manager_secret: string min_len: 36 required :param submitter_password: string min_len: 4 required :param host_address: string valid url, starts from https://, required :param property_map: map string, ReadIDProviderConfig.Property :param unique_property_name: string :return: ReadIDProviderConfig object """ sys.excepthook = logger.handle_excepthook try: read_id_provider = model_pb2.ReadIDProviderConfig( submitter_secret=str(submitter_secret), manager_secret=str(manager_secret), submitter_password=str(submitter_password), host_address=str(host_address), property_map=property_map, unique_property_name=unique_property_name ) return read_id_provider except Exception as exception: return logger.logger_error(exception)
[docs]def readid_property(self, expression, enabled): """ create ReadIDProviderConfig.Property :param self: :param expression: string min_len: 4 max_len: 512 :param enabled: bool :return: ReadIDProviderConfig.Property object """ sys.excepthook = logger.handle_excepthook try: property = model_pb2.ReadIDProviderConfig.Property( expression=str(expression), enabled=bool(enabled) ) return property except Exception as exception: return logger.logger_error(exception)
[docs]def create_knowledge_graph_schema_config_node(self, location, name, display_name, description, knowledge_graph_schema_config, bookmarks=[]): """ create a schema and a KG DB :param self: :param location: appSpace GID id :param name: String min_len: 2, max_len: 63, pattern: "^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])$" :param display_name: String :param description: String :param knowledge_graph_schema_config: KnowledgeGraphSchemaConfig object :param bookmarks: [] :return: CreateConfigNodeResponse """ sys.excepthook = logger.handle_excepthook try: response = self.stub.CreateConfigNode( pb2.CreateConfigNodeRequest( location=location, name=name, display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), knowledge_graph_schema_config=knowledge_graph_schema_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return CreateConfigNode.deserialize(response)
[docs]def update_knowledge_graph_schema_config_node(self, config_node_id, etag, display_name, description, knowledge_graph_schema_config, bookmarks=[]): """ Update a knowledge graph config node :param self: :param config_node_id: config node gid ID :param etag: string :param display_name: string :param description: string :param knowledge_graph_schema_config: KnowledgeGraphSchemaConfig object :param bookmarks: [] :return: UpdateConfigNodeResponse """ sys.excepthook = logger.handle_excepthook try: response = self.stub.UpdateConfigNode( pb2.UpdateConfigNodeRequest( id=config_node_id, etag=wrappers.StringValue(value=etag), display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), knowledge_graph_schema_config= knowledge_graph_schema_config, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return UpdateConfigNode.deserialize(response)
[docs]def knowledge_graph_schema_config(self, schema): """ create KnowledgeGraphSchemaConfig object :param schema: string (stringified json) :return: KnowledgeGraphSchemaConfig object """ sys.excepthook = logger.handle_excepthook try: knowledge_graph_schema = model_pb2.KnowledgeGraphSchemaConfig( schema=str(schema) ) return knowledge_graph_schema except Exception as exception: return logger.logger_error(exception)