Source code for boomi_cicd.util.deployed_package

import json

import boomi_cicd
import boomi_cicd.util.json.deployed_package
from boomi_cicd import logger


# https://help.boomi.com/bundle/developer_apis/page/r-atm-Deployed_Package_object.html


[docs]def create_deployed_package(release, package_id, environment_id): """ Create a deployed package in the Boomi environment. :param release: The release information. :type release: dict :param package_id: The ID of the package. :type package_id: str :param environment_id: The ID of the environment. :type environment_id: str :return: The deployment ID of the created package. :rtype: str """ resource_path = "/DeployedPackage" payload = boomi_cicd.util.json.deployed_package.create() payload["environmentId"] = environment_id payload["packageId"] = package_id payload["notes"] = release["notes"] payload["listenerStatus"] = release.get("listenerStatus") if "listenerStatus" in release: payload["listenerStatus"] = release["listenerStatus"] response = boomi_cicd.requests_post(resource_path, payload) return json.loads(response.text)["deploymentId"]
[docs]def query_deployed_package(package_id, environment_id, currently_deployed=True): """ Query the deployed package status in the Boomi environment. :param package_id: The ID of the package. :type package_id: str :param environment_id: The ID of the environment. :type environment_id: str :param currently_deployed: Flag indicating if currently deployed packages should be queried (default: True). :type currently_deployed: bool :return: True if the package has already been deployed, False otherwise. :rtype: bool """ resource_path = "/DeployedPackage/query" payload = boomi_cicd.util.json.deployed_package.query() payload["QueryFilter"]["expression"]["nestedExpression"][0]["argument"][ 0 ] = environment_id payload["QueryFilter"]["expression"]["nestedExpression"][1]["argument"][ 0 ] = package_id # If active is True, then a query will be made for only active packages. # If active is missing, then a query will be made for all packages. if currently_deployed: active_status = {"argument": [True], "operator": "EQUALS", "property": "active"} payload["QueryFilter"]["expression"]["nestedExpression"].append(active_status) response = boomi_cicd.requests_post(resource_path, payload) number_of_results = json.loads(response.text)["numberOfResults"] if number_of_results: logger.info("Package has already been deployed.") return True else: return False
[docs]def delete_deployed_package(deployment_id): """ Delete a deployed package in the Boomi environment. :param deployment_id: The ID of the deployment. :type deployment_id: str :return: The response text. :rtype: str """ resource_path = "/DeployedPackage/{}".format(deployment_id) response = boomi_cicd.requests_delete(resource_path) return response.text