Alteryx IO Discussions

Customize and extend the power of Alteryx with SDKs, APIs, custom tools, and more.

Post workflow to 2023.2 api

paul_houghton
12 - Quasar

I'm creating a Python wrapper for the Server API using HTTPX as the Python library for web queries.

When implementing the workflow post method I'm getting a 415 error indicating a media type error. If I test the same code using requests the process works and I have mapped it across to httpx correctly (I think), but I can't find why the 415 error is being produced. I think it is due to the way the file is being presented to the API but i cant identify what is incorrect. Any assistance would be appreciated

 

The code I'm using to post the workflow is: (full code on github)

 

from pathlib import Path
from typing import Any, Dict, Optional, Tuple

import httpx


def _post(
    self, endpoint: str, params: Optional[Dict[str, Any]] = None, **kwargs
) -> Tuple[httpx.Response, Dict[str, Any]]:
    self._update_auth_header()
    params = params or {}  # Ensure params is a dictionary
    endpoint = endpoint.lstrip("/")  # Remove leading slash if present
    response = self.http_client.post(
        f"{self.base_url}/{endpoint}", params=params, **kwargs
    )
    response.raise_for_status()
    return response, response.json()


def post_publish_workflow(
    self,
    file_path: Path,
    name: str,
    owner_id: str,
    is_public: bool = False,
    is_ready_for_migration: bool = False,
    others_may_download: bool = True,
    others_can_execute: bool = True,
    execution_mode: str = "Standard",
    workflow_credential_type: str = "Default",
    **kwargs,
) -> Tuple[httpx.Response, Dict[str, Any]]:
    file_path = Path(file_path)

    ## Removed Validation Code for brevity

    data = {
        "name": name,
        "ownerId": owner_id,
        "isPublic": is_public,
        "isReadyForMigration": is_ready_for_migration,
        "othersMayDownload": others_may_download,
        "othersCanExecute": others_can_execute,
        "executionMode": execution_mode,
        "workflowCredentialType": workflow_credential_type,
    }

    # Add keyword arguments to the data dictionary
    for key, value in kwargs.items():
        data[key] = value

    # Update the authorization header
    self._update_auth_header()
    # Make the POST request
    with open(file_path, "rb") as file:
        # content_file = file
        files = {"file": (file.name, file, "application/yxzp")}
        headers = {
            **self.http_client.headers,
            "Content-Type": "application/x-www-form-urlencoded",  # "multipart/form-data",
        }
        headers["Accept"] = "application/json"

        response = self._post(
            "v3/workflows",
            data=data,
            files=files,
            headers=headers,
        )
        return response

 

 

0 REPLIES 0