Tag Archives: Cloud Composer

Producer java library for Data Lineage is now open source

Integrating OpenLineage producers with GCP Lineage just got a lot easier


What is Data Lineage

Data Lineage is a GCP feature that allows tracking data movement. This tool helps data owners and analysts detect anomalies in data flows, find connections between data sources and verify the potential consequences of planned changes in data pipelines.

Lineage is injected automatically for some Google Cloud products (BigQuery, Cloud Data Fusion, Cloud Composer, Dataproc, Vertex AI). That means, if Lineage integration with any of those products is enabled in the projects, data movements coming from executing jobs by these products will be reported to GCP Lineage.

For custom integrations, the API can be used to report and fetch lineage.

After injecting, lineage can be viewed in the Google Cloud console (available from DataCatalog UI, BigQuery UI, Vertex UI). There are two representations: graph view, with data sources as nodes and data movements as edges, and list view, a tabular representation. Lineage information can also be fetched from the API.

More information is available in the documentation.


GCP Lineage information model

We describe data flows using the following concepts:

  • Process is a definition of some data transformation. For example, a SQL or Spark script.
  • Run is an execution of a Process.
  • Lineage Event is a data transformation event. It is reported in context of a Run.
  • A Link represents a connection between two data sources, when data in the link’s Target depends on its Source. A Lineage Event contains a list of Links.

OpenLineage support

OpenLineage is an open standard for reporting lineage information. It unifies lineage reporting between systems, which means the events generated in this format can be consumed by any product supporting it. This leads to more flexibility: adding or replacing a lineage producer does not imply changing the consumer, and vice versa.

OpenLineage format is adopted by a number of lineage producers and consumers, meaning there is already tooling available to report lineage from/to those systems. GCP Lineage is one of those consumers: users can report events in OpenLineage format, see the resulting lineage on the UI, and query it via the API.

OpenLineage is the preferred method for reporting lineage in GCP Lineage. It is used by the Dataproc lineage integration. To find out more about sending OpenLineage events to GCP Lineage refer to the documentation.

After injecting lineage in OpenLineage format, it can be accessed in the same way as if it was injected via other API methods or automatically: from the Google Cloud console or the API.


Why producer library

The GCP Lineage producer library is an extension of the client library. Client libraries are recommended for calling Cloud APIs programmatically. They handle low level API call details, leaving the necessary user code simpler and shorter.

The producer library further simplifies integration by providing ready to use code needed to call the API from Java. It adds additional functionality such as synchronous and asynchronous clients, translating OpenLineage JSON messages to the API friendly format, error handling etc.

Using the producer library, all the code needed to send a request to GCP Lineage API is:

SyncLineageProducerClient client = SyncLineageProducerClient.create();
ProcessOpenLineageRunEventRequest request =
        ProcessOpenLineageRunEventRequest.newBuilder()
            .setParent(parent)
            .setOpenLineage(openLineageMessage)
            .build();
client.processOpenLineageRunEvent(request);

The field openLineageMessage here is a protobuf Struct that includes information about job execution, inputs and outputs and other metadata. The object model is described in the documentation. An example message is:

{
  "eventType": "START",
  "eventTime": "2023-04-04T13:21:16.098Z",
  "run": {
    "runId": "502483d6-3e3d-474f-9380-da565eaa7516",
    "facets": {
       "spark_properties": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/spark",
        "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
        "properties": {
          "spark.master": "yarn",
          "spark.app.name": "sparkJobTest.py"
        }
      }
    }
  },
  "job": {
    "namespace": "project-name",
    "name": "cluster-name",
    "facets": {
    "jobType": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/spark",
        "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet",
        "processingType": "BATCH",
        "integration": "SPARK",
        "jobType": "SQL_JOB"
      },

    }
  },
  "inputs": [
    {
      "namespace": "bigquery",
      "name": "project.dataset.input_table",
    }],
  "outputs": [
   {
      "namespace": "bigquery",
      "name": "project.dataset.output_table",
    }],
  "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.18.0/integration/spark",
  "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
}

Learn more about building an OpenLineage message.


Best Practices for Constructing OpenLineage Messages

The openLineageMessage should follow the OpenLineage format. The fields that are required for correct parsing by the GCP Lineage API are:

job

mapped to Process

job.namespace

used to construct Process name

job.name

used to construct Process name

run

mapped to Run

run.runId

used to construct Run name

producer

URI identifying the producer of this metadata

eventTime

time of the data movement

schemaURL

URL pointing to the schema definition for this message

In addition to those, the fields used to create lineage are:

eventType

corresponds to the status of the Run

inputs

mapped to sources of links. Must be specified according to the naming conventions

outputs

mapped to targets of links. Must be specified according to the naming conventions

The GCP Lineage API supports OpenLineage major versions 1 and 2. For more information please refer to the documentation.


How to access GCP Lineage?

The code is now publicly available on GitHub. The library is also published to Maven.


GcpLineageTransport

To simplify integration with GCP Lineage, we offer GcpLineageTransport. It is available on the OpenLineage GitHub repository and is built to a separate maven artifact. It is built on top of the producer library mentioned above.

Using the transport minimises the code for sending events to GCP Lineage. The GcpLineageTransport can be configured as the event sink for any existing OpenLineage producer such as Airflow, Spark, and Flink. Find more information and examples on GCP Lineage.

By Mary Idamkina – Data Lineage

DAGify: Accelerate Your Journey from Control-M to Apache Airflow


In the dynamic world of data engineering and workflow orchestration, organizations are increasingly migrating from legacy enterprise schedulers like Control-M to the open-source powerhouse, Apache Airflow. However, this transition often involves a complex and time-consuming process of converting existing job definitions. DAGify emerges as a beacon of efficiency in this scenario, offering an open-source solution to automate the conversion of Control-M XML files into Airflow's native DAG format.

DAGify isn't just a simple conversion tool; it's a migration accelerator, designed to significantly reduce the manual effort and potential errors associated with transitioning to Airflow. While it might not provide a perfect 1:1 migration in every case, its primary goal is to expedite the process, allowing developers to focus on optimizing their workflows in the new environment.


Introduction

Control-M has served as a reliable workhorse for many organizations, but its proprietary nature and limitations can become roadblocks in today's cloud-centric and agile data landscape. Apache Airflow, with its flexibility, scalability, and thriving community, presents a compelling alternative. However, the migration journey can be daunting, especially when dealing with intricate Control-M job definitions.

DAGify steps in to bridge this gap, offering an intuitive and extensible solution. By automating the conversion process, it empowers organizations to embrace Airflow's capabilities without the burden of manual translation. This translates to faster migrations, reduced errors, and a smoother transition overall.


Technical Details

Under the hood, DAGify employs a template-driven approach, making it adaptable to various Control-M configurations and Airflow requirements. It parses Control-M XML files, extracting crucial information about jobs, dependencies, and schedules. This data is then intelligently mapped to Airflow's operators, tasks, and dependencies, preserving the essence of the original workflow. While still under active development, DAGify already supports key Control-M features like job and dependency mapping. The project roadmap includes further enhancements, such as handling custom calendars and expanding support for other enterprise schedulers.


Template-driven conversion

DAGify employs a flexible template system that empowers you to define the mapping between Control-M jobs and Airflow operators. These user-defined YAML templates specify how Control-M attributes translate into Airflow operator parameters. For instance, the control-m-command-to-airflow-ssh template maps Control-M's "Command" task type to Airflow's SSHOperator, outlining how attributes like JOBNAME and CMDLINE are incorporated into the generated DAG.

The template's structure field utilizes Jinja2 templating to dynamically construct the Airflow operator code, seamlessly integrating Control-M job attributes.

Example:

A Control-M task like:

<JOB 
  APPLICATION="my_application" 
  SUB_APPLICATION="my_sub_application" 
  JOBNAME="job_1" 
  DESCRIPTION="job_1_reports"  
  TASKTYPE="Command" 
  CMDLINE="./hello_world.sh" 
  PARENT_FOLDER="my_folder">
  <OUTCOND NAME="job_1_completed" ODATE="ODAT" SIGN="+" />
</JOB>

is converted to an Airflow operator using the control-m-command-to-airflow-ssh-gce template:

job_1 = SSHOperator(
    task_id="x_job_1",
    command="./hello_world.sh",
    dag=dag,
)

The repository includes several pre-defined templates for common Control-M task types. The config.yaml file at the project's root allows you to customize which templates are applied during the conversion process.


Leveraging Google Cloud Composer

For organizations seeking a fully managed Airflow experience, Google Cloud Composer provides a compelling solution. It eliminates the complexities of managing Airflow infrastructure, allowing you to focus on building and orchestrating your data pipelines. DAGify seamlessly integrates with Google Cloud Composer, making it even easier to migrate your Control-M workflows to a cloud-native environment.


Try it yourself

Eager to experience the power of DAGify? It's readily available as an open-source project on GitHub: https://github.com/GoogleCloudPlatform/dagify. The repository provides detailed instructions on setting up and running DAGify locally or within a Docker container.

Key steps to get started:
  1. Clone the repository: git clone https://github.com/GoogleCloudPlatform/dagify.git
  2. Install dependencies: make clean (This sets up a virtual environment and installs required packages)
  3. Run DAGify: python3 DAGify.py --source-path=[YOUR-SOURCE-XML-FILE]

Remember, DAGify is an ongoing project, and community contributions are welcome! If you encounter any issues or have feature requests, feel free to open an issue on GitHub.


Conclusion

DAGify represents a significant leap forward in simplifying enterprise scheduler migrations to Apache Airflow. By automating the conversion process and seamlessly integrating with Google Cloud Composer, it empowers organizations to embrace the benefits of Airflow more rapidly and efficiently. Whether you're a seasoned Airflow developer or just starting your migration journey, DAGify is a valuable tool to explore.

Remember:

  • Thorough testing is crucial: Always test your converted DAGs in a staging environment before deploying them to production.
  • Leverage Airflow's ecosystem: Explore the vast array of Airflow plugins and integrations to further enhance your workflows.
  • Stay engaged with the community: Keep an eye on DAGify's development and contribute to its growth if you can!

Happy migrating!

By Konrad Schieban and Tim Hiatt – Google Cloud


Acknowledgments

Thank you to the following team members who made this solution possible: Shreya Prabhu, Harish S, Slava Guzanov and Joanna Rajaseharan from Google Cloud.