Source code for boomi_cicd.util.execution_connector

import json
import time

import boomi_cicd
import boomi_cicd.util.json.execution_connector
from boomi_cicd.util import logging

# https://help.boomi.com/docs/Atomsphere/Integration/AtomSphere%20API/int-Execution_connector_object_abb28203-9946-4c03-9205-8dc95b8f3f41


[docs]def query_execution_connector( execution_id, connector_type="return", request_interval_sec=10, max_wait_sec=300 ): """ Query the ExecutionConnector by the Execution ID and connector type. This function sends a request to the AtomSphere API to query for an ExecutionConnector with the specified execution ID and connector type. It retries the request at specified intervals until a record is found or the maximum wait time is reached. The default connector type is "return" because it is mainly intended for use within automated testing. :param execution_id: The Boomi execution ID. :type execution_id: str :param connector_type: The type of the connector to query. Default is "return". :type connector_type: str, optional :param request_interval_sec: The time to wait between retries in seconds. Default is 10 seconds. :type request_interval_sec: int, optional :param max_wait_sec: The maximum wait time in seconds. Default is 300 seconds (5 minutes). :type max_wait_sec: int, optional :return: The JSON response containing the queried ExecutionConnector. :rtype: dict :raises ValueError: If the ExecutionConnector is not found after max wait time. :raises requests.RequestException: If the request fails. """ resource_path = "/ExecutionConnector/query" payload = boomi_cicd.util.json.execution_connector.query() payload["QueryFilter"]["expression"]["nestedExpression"][0]["argument"][ 0 ] = execution_id payload["QueryFilter"]["expression"]["nestedExpression"][1]["argument"][ 0 ] = connector_type # The ExecutionConnector may not be immediately available. Loop if needed. attempt = 0 number_of_records = 0 response = None timeout = time.time() + max_wait_sec while number_of_records == 0 and time.time() < timeout: response = boomi_cicd.atomsphere_request( method="post", resource_path=resource_path, payload=payload ) json_response = json.loads(response.text) number_of_records = json_response["numberOfResults"] attempt += 1 if number_of_records == 0: logging.info( f"ExecutionConnector not found. Attempt {attempt}. Retrying in {request_interval_sec} seconds." ) time.sleep(request_interval_sec) return json.loads(response.text)