Using the API

The section provides examples on using API.

Example 1

The following is an example of how to use the API without integrating with a specific tool. It consists of a payload and the request to Databand.

Example
import gzip
import json
import uuid

from datetime import datetime, timedelta

import pytz
import requests


run_uid = uuid.uuid4()
simple_payload = [
    {
        "eventType": "FAIL",
        "eventTime": datetime.utcnow().replace(tzinfo=pytz.utc),
        "inputs": [],
        "job": {"facets": {}, "namespace": "airflow-prod", "name": "my_dag"},
        "outputs": [],
        "run": {
            "facets": {
                "nominalTime": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/NominalTimeRunFacet.json",
                    "nominalStartTime": datetime.utcnow().replace(tzinfo=pytz.utc),
                },
                "log": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                    "logBody": "very helpful log.. and very long",
                    "logUrl": "https://bucket.s3.somewhere.com/.../file.log",
                },
                "startTime": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                    "startTime": datetime.utcnow().replace(tzinfo=pytz.utc)
                    - timedelta(minutes=5),
                },
                "errorMessage": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/ErrorMessageRunFacet.json",
                    "message": "org.apache.spark.sql.AnalysisException: Table or view not found: wrong_table_name; line 1 pos 14",
                    "programmingLanguage": "JAVA",
                    "stackTrace": 'Exception in thread "main" java.lang.RuntimeException: A test exception\nat io.openlineage.SomeClass.method(SomeClass.java:13)\nat io.openlineage.SomeClass.anotherMethod(SomeClass.java:9)',
                },
                "tags": {
                    "projectName": "test_project",
                    "runName": "test_run_name",
                    "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                    "_producer": "https://some.producer.com/version/1.0",
                },
            },
            "runId": run_uid,
        },
        "producer": "https://custom.api",
        "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent",
    },
    {
        "eventTime": datetime.utcnow().replace(tzinfo=pytz.utc),
        "eventType": "FAIL",
        "job": {
            "facets": {},
            "namespace": "airflow-prod",
            "name": "my_dag.failing_task_with_log",
        },
        "run": {
            "facets": {
                "nominalTime": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/SQLJobFacet.json",
                    "nominalStartTime": datetime.utcnow().replace(tzinfo=pytz.utc),
                },
                "log": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                    "logBody": "very helpful log.. and very long",
                    "logUrl": "https://bucket.s3.somewhere.com/.../file.log",
                },
                "startTime": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet",
                    "startTime": datetime.utcnow().replace(tzinfo=pytz.utc)
                    - timedelta(minutes=5),
                },
                "errorMessage": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/ErrorMessageRunFacet.json",
                    "message": "org.apache.spark.sql.AnalysisException: Table or view not found: wrong_table_name; line 1 pos 14",
                    "programmingLanguage": "JAVA",
                    "stackTrace": 'Exception in thread "main" java.lang.RuntimeException: A test exception\nat io.openlineage.SomeClass.method(SomeClass.java:13)\nat io.openlineage.SomeClass.anotherMethod(SomeClass.java:9)',
                },
                "parent": {
                    "_producer": "https://some.producer.com/version/1.0",
                    "_schemaURL": "https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ParentRunFacet",
                    "job": {"name": "my_dag", "namespace": "airflow-prod"},
                    "run": {"runId": run_uid},
                },
            },
            "runId": uuid.uuid4(),
        },
        "producer": "https://custom.api",
        "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent",
    },
]
ACCESS_TOKEN = "<your_access_token>"
DATABAND_CUSTOM_INTEGRATION_FULL_API = "<your_api_endpoint_provided_in_ui>"

if __name__ == "__main__":
    resp = requests.post(
        DATABAND_CUSTOM_INTEGRATION_FULL_API,
        data=gzip.compress(json.dumps(simple_payload,default=str).encode("utf-8")),
        headers={
            "Authorization": f"Bearer {ACCESS_TOKEN}",
            "Content-Type": "application/json",
            "Content-Encoding": "gzip",
        },
        timeout=30,
    )
    if resp.ok:
        print("success")
    else:
        resp.raise_for_status()

Example 2

The following example is a Python script that shows how to map from other tools (in this example - GitLab) to the Databand schema. The example shows how you can create a monitor and provide the data to the Databand API. You do not have to use the following example to map from your orchestration or data integration tools, but any code you supply must include the following information:

  • How often to pull the data
  • A way to pull the data from your orchestration or data integration tool
  • A conversion to the Databand schema
  • A request to the custom integration API

This example represents the request to the custom integration API. It uses an openlineage_api function that sends a compressed JSON payload, which contains tracking data, to the specified endpoint. The example uses a POST request with gzip compression and authorization headers.

In this example, the run corresponds to the individual execution of a CICD pipeline. From the point that this pipeline begins until it either completes successfully, fails, or is cancelled, everything that happens during that time is considered part of a single run. Over time, Databand will build a history of runs for this pipeline. As a result, users will be able to go back to any historical run of this pipeline to see all of the metrics and logs that were collected at the time.

Within a GitLab pipeline, there are stages. Stages act as groups of tasks within a pipeline, but the individual stages are not reported as their own jobs. With Databand’s API, each stage can be mapped as a task within its parent pipeline. This means that in Databand, all metadata are logically divided and assigned to its corresponding stage. In the event of a pipeline incident, the user will be able to:

  • Tell the specific state and duration of each individual stage
  • View logs and error messages in the context of the stage in which they occurred
Example
import gzip
import json

import requests


class DbndClient:
    def __init__(
        self, access_token: str, databand_custom_integration_full_api: str
    ) -> None:
        self.default_headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }
        self.databand_custom_integration_full_api = databand_custom_integration_full_api

    def openlineage_api(self, data: list[dict]) -> None:
        resp = requests.post(
            self.databand_custom_integration_full_api,
            data=gzip.compress(json.dumps(data).encode("utf-8")),
            headers=self.default_headers | {"Content-Encoding": "gzip"},
            timeout=30,
        )
        resp.raise_for_status()

This example code interacts with GitLab, using a GitlabClient function to retrieve and process information about your pipelines and jobs.

import logging

from collections import defaultdict
from datetime import datetime
from functools import partial
from time import sleep
from typing import Any

from requests import Session


logger = logging.getLogger(__file__)


class GitlabPipelineInfo:
    pipeline: dict
    job_stages: dict


class GitlabClient:
    def __init__(self, gitlab_project_id: str, gitlab_token: str) -> None:
        self.gitlab_project_id = gitlab_project_id

        self.session = Session()
        self.session.request = partial(self.session.request, timeout=15)
        self.session.headers.update({"Private-Token": gitlab_token})  # type: ignore

    @property
    def gitlab_base_url(self):
        return f"https://gitlab.com/api/v4/projects/{self.gitlab_project_id}"

    def _get(self, url):
        """
        Performs a GET request to a specified URL with retry logic for rate limits.

        Args:
            url (str): The URL to send the GET request to.

        Returns:
            requests.Response: The response object from the request.
        """
        while True:
            resp = self.session.get(url)
            if not resp.ok:
                logger.warning("Failed fetching url %s: %s", url, resp)
            if resp.status_code != 429:
                return resp
            logger.info("got 429, sleeping")
            sleep(60)

    def list_pipelines(self, last_updated_after: datetime) -> Any:
        """
        Lists GitLab pipelines updated after a specified time.

        Args:
            last_updated_after (str): ISO 8601 formatted string representing the starting point for fetching pipelines.

        Returns:
            list[dict]: A list of dictionaries representing pipelines.
            []: An empty list if no pipelines are found or the request fails.
        """
        resp = self._get(
            f"{self.gitlab_base_url}/pipelines?updated_after={last_updated_after.strftime('%Y-%m-%dT%H:%M:%S.%fZ')}&order_by=updated_at&sort=asc&per_page=100"
        )
        if resp.ok:
            return resp.json()
        return []

    def get_pipeline(self, pipeline_id: int) -> Any:
        """
        Fetches details of a specific GitLab pipeline by its ID.

        Args:
            pipeline_id (int): The ID of the pipeline to retrieve.

        Returns:
            dict: A dictionary containing pipeline details if the request is successful.
            None: If the pipeline is not found or the request fails.
        """
        resp = self._get(f"{self.gitlab_base_url}/pipelines/{pipeline_id}")
        if resp.ok:
            return resp.json()
        return None

    def get_jobs(self, pipeline_id: int) -> Any:
        """
        Retrieves all jobs associated with a given GitLab pipeline.

        Args:
            pipeline_id (int): The ID of the pipeline to retrieve jobs from.

        Returns:
            list[dict]: A list of dictionaries containing job details.
            []: An empty list if no jobs are found or the request fails.
        """
        resp = self._get(
            f"{self.gitlab_base_url}/pipelines/{pipeline_id}/jobs?per_page=100&include_retried=true"
        )
        if resp.ok:
            return resp.json()
        return []

    def get_pipeline_full_info(self, pipeline_id: int) -> GitlabPipelineInfo:
        """
        Retrieves all jobs associated with a given GitLab pipeline.

        Args:
            pipeline_id (int):  pipeline ID to retrieve info for.

        Returns:
            GitlabPipelineInfo: pipeline full info (pipeline + jobs details)
        """

        pipeline_run = GitlabPipelineInfo()
        pipeline_run.pipeline = self.get_pipeline(pipeline_id)
        if not pipeline_run.pipeline:
            logger.error("Can't get pipeline %s information from Gitlab")

        jobs = self.get_jobs(pipeline_id)
        if not jobs:
            logger.info("No jobs found for pipeline %s on Gitlab", pipeline_id)

        sorted_jobs: list[Any] = sorted(jobs, key=lambda x: x["created_at"])

        stages: dict[str, list[Any]] = defaultdict(list)
        # group by stage
        for job in sorted_jobs:
            stages[job["stage"]].append(job)

        pipeline_run.job_stages = stages

        return pipeline_run

    @staticmethod
    def get_last_pipeline_datetime(
        pipeline: GitlabPipelineInfo, last_pipeline_datetime: datetime
    ) -> datetime:
        created_at = datetime.strptime(
            pipeline.pipeline.get("created_at"), "%Y-%m-%dT%H:%M:%S.%fZ"
        )

        if not last_pipeline_datetime:
            return created_at

        if last_pipeline_datetime < created_at:
            return created_at

        return last_pipeline_datetime


GITLAB_JOB_STATUS_MAP = {
    "failed": "FAIL",
    "warning": "COMPLETE",
    "pending": "START",
    "running": "RUNNING",
    "manual": "START",
    "scheduled": "START",
    "canceled": "ABORT",
    "success": "COMPLETE",
    "skipped": "ABORT",
    "created": "START",
}
GITLAB_PIPELINE_STATUS_MAP = {
    "created": "START",
    "waiting_for_resource": "START",
    "preparing": "START",
    "pending": "START",
    "running": "RUNNING",
    "success": "COMPLETE",
    "failed": "FAIL",
    "canceled": "ABORT",
    "skipped": "ABORT",
    "manual": "START",
    "scheduled": "START",
}


def map_job_status(gitlab_job_status: str) -> str:
    """
    Maps a GitLab job status to a corresponding status for OLRunState.

    Args:
        gitlab_job_status (str): The GitLab job status.

    Returns:
        str: The mapped status for OLRunState.
    """
    status = GITLAB_JOB_STATUS_MAP.get(gitlab_job_status, "START")
    return status


def map_pipeline_status(gitlab_pipeline_status: str) -> str:
    """
    Maps a GitLab pipeline status to a corresponding status for OLRunState.

    Args:
        gitlab_pipeline_status (str): The GitLab pipeline status.

    Returns:
        str: The mapped status for OLRunState.
    """
    return GITLAB_PIPELINE_STATUS_MAP.get(gitlab_pipeline_status, "scheduled")


# mapping between gitlab statuses to openlineage statuses
def calc_stage_status(jobs: list[Any]):
    job_statuses = [job["status"] for job in jobs]

    if all(js == "skipped" for js in job_statuses):
        return "skipped", None

    not_started = {"queued", "skipped", "scheduled"}
    if all(js in not_started for js in job_statuses):
        return "scheduled", None

    finished_statuses = {"failed", "skipped", "cancelled", "success"}
    if all(js in finished_statuses for js in job_statuses):
        try:
            finished_at = max(job["finished_at"] for job in jobs if job["finished_at"])
        except ValueError:
            finished_at = None

        for status in ("failed", "cancelled", "success"):
            if status in job_statuses:
                return status, finished_at
        return "skipped", finished_at

    return "running", None


finished_states_str = {"FAIL", "COMPLETE", "ABORT"}


def calc_event_time(job: Any, status: str) -> tuple[str, str]:
    if status in finished_states_str:
        event_time = job["finished_at"]
    elif job["started_at"]:
        event_time = job["started_at"]
    else:
        event_time = job["created_at"]
    return event_time

This example code synchronizes GitLab pipeline data with Databand. It uses a GitlabClient function to interact with GitLab API and a DbndClient function to send the data to Databand.

# © Copyright Databand.ai, an IBM Company 2024
import logging

from datetime import datetime, timedelta
from time import sleep
from typing import Any
from uuid import NAMESPACE_URL, UUID, uuid5

from docs.custom_integration.databand_client import DbndClient
from docs.custom_integration.gitlab_client import (
    GitlabClient,
    GitlabPipelineInfo,
    calc_event_time,
    calc_stage_status,
    map_job_status,
    map_pipeline_status,
)


logger = logging.getLogger(__name__)

GITLAB_TOKEN = "<your_gitlab_token>"
GITLAB_PROJECT_ID = "<your_gitlab_project_id>"

DATABAND_ACCESS_TOKEN = "<your_databand_access_token>"
DATABAND_CUSTOM_INTEGRATION_FULL_API = "<your_api_endpoint_provided_in_ui>"  # example: https://client.databand.ai/api/v1/tracking/open-lineage/3a8bc7f8-0c65-115f-98f6-aa662e60cbe7/events/bulk

DBND_CLIENT = DbndClient(
    access_token=DATABAND_ACCESS_TOKEN,
    databand_custom_integration_full_api=DATABAND_CUSTOM_INTEGRATION_FULL_API,
)

GITLAB_CLIENT = GitlabClient(GITLAB_PROJECT_ID, GITLAB_TOKEN)

_PRODUCER = "https://some.producer.com/version/1.0"
_SCHEMA_URL = (
    "https://github.com/OpenLineage/OpenLineage/blob/main/spec/facets/SQLJobFacet.json"
)
RUN_ID_NAMESPACE = uuid5(NAMESPACE_URL, DATABAND_CUSTOM_INTEGRATION_FULL_API)


def generate_run_id(job_name: str, parent_run_id: UUID | None):
    return str(uuid5(UUID(parent_run_id or RUN_ID_NAMESPACE), job_name))


class RunPayloadBuilder:
    def __init__(self) -> None:
        self.runs = []
        self.pipeline_run_uid = None
        self.pipeline_job_name = None
        self.pipeline_project_name = None
        self.namespace = None

    def get_data(self) -> list[dict]:
        return self.runs

    def add_job(self, job: Any) -> None:
        """
        Adds a new job to the run data.

        Args:
            job (Any): A dictionary representing the job to add.

        Returns:
            None
        """
        execution_time = {
            "nominalStartTime": str(job["created_at"]),
            "_producer": _PRODUCER,
            "_schemaURL": _SCHEMA_URL,
        }
        start_time = None
        if job["started_at"]:
            start_time = {
                "startTime": str(job["started_at"]),
                "_producer": _PRODUCER,
                "_schemaURL": _SCHEMA_URL,
            }
        parent_facet = {
            "job": {
                "name": job["parent_name"],
                "namespace": self.namespace,
                "_producer": _PRODUCER,
                "_schemaURL": _SCHEMA_URL,
            },
            "run": {
                "runId": job["paren_run_uid"],
                "_producer": _PRODUCER,
                "_schemaURL": _SCHEMA_URL,
            },
        }
        inputs = []
        if job["upstream_name"]:
            dataset = {
                "name": job["upstream_name"],
                "namespace": self.namespace,
                "_producer": _PRODUCER,
                "_schemaURL": _SCHEMA_URL,
            }
            inputs.append(dataset)
        event_time = calc_event_time(job, job["status"])

        data = {
            "eventTime": str(event_time),
            "eventType": map_job_status(job["status"]),
            "inputs": inputs,
            "job": {"name": job["name"], "namespace": self.namespace},
            "outputs": [],
            "run": {
                "runId": job["run_uid"],
                "facets": {
                    "startTime": start_time,
                    "nominalTime": execution_time,
                    "parent": parent_facet,
                    "_producer": _PRODUCER,
                    "_schemaURL": _SCHEMA_URL,
                },
            },
            "producer": _PRODUCER,
            "schemaURL": _SCHEMA_URL,
        }
        self.runs.append(data)

    def set_jobs(self, jobs: Any) -> None:
        for job in jobs:
            run_uid = generate_run_id(job["name"], UUID(self.pipeline_run_uid))
            parent_run_uid = generate_run_id(job["stage"], UUID(self.pipeline_run_uid))

            virtual_job = {
                "created_at": job["created_at"],
                "started_at": job["started_at"],
                "status": job["status"],
                "finished_at": job["finished_at"],
                "name": job["name"],
                "parent_name": job["stage"],
                "paren_run_uid": parent_run_uid,
                "run_uid": run_uid,
                "upstream_name": None,
            }
            self.add_job(virtual_job)

    def set_stages(self, stages: Any) -> None:
        """
        This function iterates over the provided stages and their associated jobs
        to create virtual jobs representing the stages in the pipeline run.
        Each stage is represented as a virtual job with metadata.

        In GitLab, stages act as task groups but are not directly reported as jobs.
        This function bridges the gap by creating virtual jobs for each stage, allowing them to be tracked as tasks in Databand.

        Args:
            stages (Any): A list of tuples where each tuple contains the stage name and a list of jobs associated with that stage.

        """
        prev_stage_task_id = None
        for stage_name, jobs in stages:
            created_at = min(job["created_at"] for job in jobs if job["created_at"])

            try:
                started_at = min(job["started_at"] for job in jobs if job["started_at"])
            except ValueError:
                started_at = None

            # Calculate the status and finish time of the stage based on its jobs
            stage_status, finished_at = calc_stage_status(jobs)

            # Generate a unique run ID for the stage
            run_uid = generate_run_id(stage_name, UUID(self.pipeline_run_uid))

            # Create a virtual job representing the stage
            virtual_job = {
                "created_at": created_at,
                "started_at": started_at,
                "status": stage_status,
                "finished_at": finished_at,
                "name": stage_name,
                "parent_name": self.pipeline_job_name,
                "paren_run_uid": self.pipeline_run_uid,
                "run_uid": run_uid,
                "upstream_name": prev_stage_task_id,
            }

            # Add the virtual job to the pipeline
            self.add_job(virtual_job)

            # Update the previous stage task ID for establishing upstream relationship
            prev_stage_task_id = stage_name

    def set_pipeline(self, pipeline: Any) -> None:
        """
        Sets up the initial pipeline details in the run data.

        Args:
            pipeline (Any): A dictionary representing the pipeline details.
            tracking_source_uid (str): The unique ID for tracking source.

        Returns:
            None
        """
        status = map_pipeline_status(pipeline["status"])
        event_time = calc_event_time(pipeline, status)
        start_time_facet = None
        if pipeline["started_at"]:
            start_time_facet = {
                "startTime": str(pipeline["started_at"]),
                "_producer": _PRODUCER,
                "_schemaURL": _SCHEMA_URL,
            }
        self.pipeline_run_uid = generate_run_id(str(pipeline["id"]), None)
        self.pipeline_job_name = pipeline["ref"]
        self.pipeline_project_name = pipeline["web_url"].split("/")[4]
        self.namespace = pipeline["web_url"].split("/")[4]
        data = {
            "eventTime": str(event_time),
            "eventType": status,
            "inputs": [],
            "job": {"name": self.pipeline_job_name, "namespace": self.namespace},
            "outputs": [],
            "run": {
                "runId": self.pipeline_run_uid,
                "facets": {
                    "startTime": start_time_facet,
                    "nominalTime": {
                        "nominalStartTime": str(pipeline["created_at"]),
                        "_producer": _PRODUCER,
                        "_schemaURL": _SCHEMA_URL,
                    },
                    "tags": {"projectName": self.pipeline_project_name},
                    "_producer": _PRODUCER,
                    "_schemaURL": _SCHEMA_URL,
                },
            },
            "producer": _PRODUCER,
            "schemaURL": _SCHEMA_URL,
        }
        self.runs.append(data)


def sync_pipeline(pipeline_id: int) -> GitlabPipelineInfo:
    pipeline_full_info = GITLAB_CLIENT.get_pipeline_full_info(pipeline_id)
    payload_builder = RunPayloadBuilder()
    payload_builder.set_pipeline(pipeline_full_info.pipeline)
    payload_builder.set_stages(pipeline_full_info.job_stages.items())
    flattened_jobs = [
        item for sublist in pipeline_full_info.job_stages.values() for item in sublist
    ]
    payload_builder.set_jobs(flattened_jobs)
    DBND_CLIENT.openlineage_api(payload_builder.get_data())
    return pipeline_full_info


if __name__ == "__main__":
    pipeline_cursor = datetime.now() - timedelta(days=1)

    while True:
        pipelines = GITLAB_CLIENT.list_pipelines(pipeline_cursor)

        if not pipelines:
            logger.info("No new pipelines on gitlab")

        for i, pipeline in enumerate(pipelines):
            logger.info(
                "========= Pipeline %s (%s/%s)", pipeline["id"], i, len(pipelines)
            )
            try:
                pipeline_full_info = sync_pipeline(pipeline["id"])
                pipeline_cursor = GITLAB_CLIENT.get_last_pipeline_datetime(
                    pipeline_full_info, pipeline_cursor
                )
            except Exception as e:
                logger.exception(
                    "Failed syncing pipline %s error: %s", pipeline["id"], e
                )

        logger.info("sleeping %s", datetime.now())
        sleep(2 * 60)

Known issues

When you send a POST request to Databand, remember to add to the headers the following request:

"Content-Type": "application/json"