Source code for config.application_agent

from indykite_sdk.indykite.config.v1beta1 import config_management_api_pb2 as pb2
from indykite_sdk.model.application_agent import ApplicationAgent
from indykite_sdk.indykite.config.v1beta1.model_pb2 import UniqueNameIdentifier
from indykite_sdk.indykite.config.v1beta1.model_pb2 import google_dot_protobuf_dot_wrappers__pb2 as wrappers
from indykite_sdk.model.create_application_agent import CreateApplicationAgent
from indykite_sdk.model.update_application_agent import UpdateApplicationAgent
import sys
import indykite_sdk.utils.logger as logger


[docs]def read_application_agent_by_id(self, application_agent_id, bookmarks=[]): """ get an ApplAgent object with an id :param self: :param application_agent_id: string gid id :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: ApplicationAgent object """ sys.excepthook = logger.handle_excepthook try: response = self.stub.ReadApplicationAgent( pb2.ReadApplicationAgentRequest( id=str(application_agent_id), bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return ApplicationAgent.deserialize(response.application_agent)
[docs]def read_application_agent_by_name(self, app_space_id, application_agent_name, bookmarks=[]): """ get an ApplAgent object with a name :param self: :param app_space_id: string gid id :param application_agent_name: string :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: AppAgent object """ sys.excepthook = logger.handle_excepthook try: response = self.stub.ReadApplicationAgent( pb2.ReadApplicationAgentRequest( name=UniqueNameIdentifier( location=app_space_id, name=application_agent_name ), bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return ApplicationAgent.deserialize(response.application_agent)
[docs]def create_application_agent(self, application_id, name, display_name, description="", bookmarks=[]): """ create an AppAgent :param self: :param application_id: string gid id :param name: string pattern: ^[a-z](?:[-a-z0-9]{0,61}[a-z0-9])$ :param display_name: string :param description: string :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized CreateApplicationAgentResponse """ sys.excepthook = logger.handle_excepthook try: response = self.stub.CreateApplicationAgent( pb2.CreateApplicationAgentRequest( application_id=application_id, name=name, display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return CreateApplicationAgent.deserialize(response, application_id, name)
[docs]def update_application_agent(self, application_agent_id, etag, display_name, description="", bookmarks=[]): """ update existing AppAgent :param self: :param application_agent_id: string gid id :param etag: string :param display_name: string :param description: string :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: deserialized UpdateApplicationAgentResponse """ sys.excepthook = logger.handle_excepthook try: response = self.stub.UpdateApplicationAgent( pb2.UpdateApplicationAgentRequest( id=application_agent_id, etag=wrappers.StringValue(value=etag), display_name=wrappers.StringValue(value=display_name), description=wrappers.StringValue(value=description), bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return UpdateApplicationAgent.deserialize(response)
[docs]def list_application_agents(self, app_space_id, match=[], bookmarks=[]): """ list App which match exact name in match param :param self: :param app_space_id: string gid id :param match: list of strings :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: ListApplicationAgentResponse object """ sys.excepthook = logger.handle_excepthook try: streams = self.stub.ListApplicationAgents( pb2.ListApplicationAgentsRequest( app_space_id=app_space_id, match=match, bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not streams: return None responses = [] try: for response in streams: responses.append(response) except Exception as exception: return logger.logger_error(exception) return responses
[docs]def delete_application_agent(self, application_agent_id, etag, bookmarks): """ delete an AppAgent :param self: :param application_agent_id: string gid id :param etag: string :param bookmarks: list of strings with pattern: ^[a-zA-Z0-9_-]{40,}$ :return: DeleteApplicationResponse """ sys.excepthook = logger.handle_excepthook try: response = self.stub.DeleteApplicationAgent( pb2.DeleteApplicationAgentRequest( id=application_agent_id, etag=wrappers.StringValue(value=etag), bookmarks=bookmarks ) ) except Exception as exception: return logger.logger_error(exception) if not response: return None return response