Create JDBC Connection in Airflow to connect to DB2 Database

Prerequisites:

Get Connection Details:

  • DB2 Host
  • DB2 Port
  • DB2 Database
  • DB2 username
  • DB2 password

Create your Connection:

  1. Download the correct version of db2jcc.jar from IBM DB2 Driver Downloads and place it in the include directory of your Airflow project.

  2. Add the following to your packages.txt file:

default-jre
default-jdk
  1. Add the following to your requirements.txt file:
apache-airflow-providers-jdbc
  1. Append the following to your Dockerfile:
ENV JAVA_HOME /usr/lib/jvm/java-1.8.openjdk
  1. Add the connection details as shown in the image below:
    (The connection URL should be of the format jdbc:db2://host:port/database. And remember to include the driver_class and driver_path in Extra)

How to Test:
Since the Test functionality in Airflow is disabled for security reasons, you can test your connection using the following DAG:

from datetime import datetime

from airflow.decorators import dag, task
from airflow.providers.jdbc.hooks.jdbc import JdbcHook
from airflow.providers.jdbc.operators.jdbc import JdbcOperator

@task
def exe_db2_hook():
    sql = """SELECT id, name
             FROM TABLE_NAME
             FETCH FIRST 10 ROWS ONLY"""

    print("---------- jdbc_hook ----------")
    jdbc_hook = JdbcHook(task_id="db2_jdbc_hook", jdbc_conn_id="xxxxxxxxxx")

    print("---------- jdbc get_records ----------")
    results = jdbc_hook.get_records(sql=sql)

    print("++++++++++ results ++++++++++")
    print(results)


@task
def exe_db2_query():
    """
    An operator simply executes a query. Its most common use-case is to update, insert, or
    delete data. It can run a select query, but it doesn't return the results.
    Instead, a hook is used to read and return data.
    """

    sql = """SELECT id, name
             FROM TABLE_NAME
             FETCH FIRST 10 ROWS ONLY"""

    result = JdbcOperator(task_id="jdbc_op", jdbc_conn_id="xxxxxxxxxx", sql=sql, autocommit=True)
    print("---------- result ----------")
    print(result.output)


@dag(
    schedule_interval=None,
    start_date=datetime(2024, 1, 10),
    catchup=False,
)
def db2_test():
    # task_result = exe_db2_query()
    task_result = exe_db2_hook()

    task_result


_ = db2_test()

How it works:

  • Airflow uses the python package jaydebeapi to connect to your database.

References: