Source code for boomi_cicd.util.merge_request

import json

import boomi_cicd
from boomi_cicd import logger


# https://developer.boomi.com/docs/api/platformapi/MergeRequest#tag/MergeRequest/operation/ExecuteMergeRequest

# The Modifies or Update a MergeRequest object is not implemented because of the complexity of the request. 
# Create the payload and use atomsphere_request directly.

[docs]def query_merge_request(source_branch_id=None, destination_branch_id=None, stage=None): """ Query merge requests based on sourch branch ID, target branch ID, or stage. Source branch ID or target branch ID must be supplied. stage is optional. :param source_branch_id: The source branch ID to filter by. :type source_branch_id: str :param destination_branch_id: The destination branch ID to filter by. :type destination_branch_id: str :param stage: The stage to filter by. Possible values are "NOT_EXIST", "DRAFTING", "FAILED_TO_DRAFT," "FAILED_TO_REDRAFT", "DRAFTED", "REVIEWING", "MERGING", "MERGED", "FAILED_TO_MERGE", "DELETED", "REDRAFTING", and "REVERTED". :type stage: str :return: The merge request query response. Return contains original response data from API. :rtype: dict """ if not source_branch_id and not destination_branch_id: raise ValueError( "Either source_branch_id or destionation_branch_id must be provided." ) resource_path = "/MergeRequest/query" payload = { "QueryFilter": {"expression": {"operator": "and", "nestedExpression": []}} } if source_branch_id: payload["QueryFilter"]["expression"]["nestedExpression"].append( { "argument": [source_branch_id], "operator": "EQUALS", "property": "sourceBranchId", } ) if destination_branch_id: payload["QueryFilter"]["expression"]["nestedExpression"].append( { "argument": [destination_branch_id], "operator": "EQUALS", "property": "destinationBranchId", } ) if stage: payload["QueryFilter"]["expression"]["nestedExpression"].append( {"argument": [stage], "operator": "EQUALS", "property": "stage"} ) # TODO: Enabled queryMore results response = boomi_cicd.atomsphere_request( method="post", resource_path=resource_path, payload=payload ) json_response = json.loads(response.text) return json_response
[docs]def get_merge_request(merge_request_id): """ Get a merge request by its ID. :param merge_request_id: The merge request ID. :type merge_request_id: str :return: The merge request retrieval response. Return contains original response data from API. :rtype: dict """ if not merge_request_id: raise ValueError("merge_request_id must be provided.") resource_path = f"/MergeRequest/{merge_request_id}" response = boomi_cicd.atomsphere_request(method="get", resource_path=resource_path) json_response = json.loads(response.text) return json_response
[docs]def create_merge_request( source_branch_id, destination_branch_id, strategy, priority_branch ): """ You can use the Merge Request object to merge a development branch into the main branch. \ To create a merge request, you need the branch IDs for the source and destination branches. \ The source branch is the branch you want to merge into the destination branch. :param source_branch_id: The source branch ID. :type source_branch_id: str :param destination_branch_id: The destination branch ID. :type destination_branch_id: str :param strategy: The merge strategy. Must be 'OVERRIDE' or 'CONFLICT_RESOLVE'. \ There are two merge request strategies you can choose from: OVERRIDE or CONFLICT_RESOLVE. \ An override merge automatically resolves any merge conflicts by prioritizing the branch \ specified in the priorityBranch field. If you choose the CONFLICT_RESOLVE strategy, \ you have the opportunity to review any conflicts and choose which version you want to keep. :type strategy: str :param priority_branch: The priority branch for conflict resolution. Must be 'SOURCE' or 'DESTINATION'. :type priority_branch: str :return: The merge request creation response id. Return contains the merge request ID. :rtype: str """ if not source_branch_id or not destination_branch_id: raise ValueError( "Both source_branch_id and destionation_branch_id must be provided." ) if strategy not in ["OVERRIDE", "CONFLICT_RESOLVE"]: raise ValueError("Invalid strategy. Must be 'OVERRIDE' or 'CONFLICT_RESOLVE'.") if priority_branch not in ["SOURCE", "DESTINATION"]: raise ValueError("Invalid priority_branch. Must be 'SOURCE' or 'DESTINATION'.") resource_path = "/MergeRequest" payload = { "sourceBranchId": source_branch_id, "destinationBranchId": destination_branch_id, "strategy": strategy, "priorityBranch": priority_branch, } response = boomi_cicd.atomsphere_request( method="post", resource_path=resource_path, payload=payload ) json_response = json.loads(response.text) if "id" not in json_response: raise ValueError(f"Merge request creation failed. Response: {json_response}") return json_response["id"]
[docs]def execute_merge_request(merge_request_id, action): """ Execute a merge request by its ID. :param merge_request_id: The merge request ID. :type merge_request_id: str :return: The merge request execution response. Return contains original response data from API. :rtype: dict """ if not merge_request_id: raise ValueError("merge_request_id must be provided.") if action not in ["MERGE", "REVERT", "RETRY_DRAFTING"]: raise ValueError( "Invalid action. Must be 'MERGE', 'REVERT', or 'RETRY_DRAFTING'." ) # Check to merge stage to ensure that the merge can be executed merge_stage = get_merge_request(merge_request_id)["stage"] if action == "MERGE" and merge_stage not in ["REVIEWING", "FAILED_TO_MERGE"]: raise ValueError( f"Merge request cannot be merged in its current stage: {merge_stage}" ) if action == "REVERT" and merge_stage not in ["MERGED", "DELETED"]: raise ValueError( f"Merge request cannot be reverted in its current stage: {merge_stage}" ) if action == "RETRY_DRAFTING" and merge_stage not in [ "FAILED_TO_DRAFT", "FAILED_TO_REDRAFT", ]: raise ValueError( f"Merge request drafting cannot be retried in its current stage: {merge_stage}" ) resource_path = f"/MergeRequest/execute/{merge_request_id}" payload = {"id": merge_request_id, "mergeRequestAction": action} response = boomi_cicd.atomsphere_request( method="post", resource_path=resource_path, payload=payload ) json_response = json.loads(response.text) return json_response