Prerequisites:
- The Astro CLI
- A locally running Airflow using Astro CLI
- A DB2 database
- A DB2 driver based on your DB2 version
Get Connection Details:
- DB2 Host
- DB2 Port
- DB2 Database
- DB2 username
- DB2 password
Create your Connection:
-
Download the correct version of
db2jcc.jar
from IBM DB2 Driver Downloads and place it in theinclude
directory of your Airflow project. -
Add the following to your
packages.txt
file:
default-jre
default-jdk
- Add the following to your
requirements.txt
file:
apache-airflow-providers-jdbc
- Append the following to your
Dockerfile
:
ENV JAVA_HOME /usr/lib/jvm/java-1.8.openjdk
- Add the connection details as shown in the image below:
(The connection URL should be of the formatjdbc:db2://host:port/database
. And remember to include thedriver_class
anddriver_path
in Extra)
How to Test:
Since the Test
functionality in Airflow is disabled for security reasons, you can test your connection using the following DAG:
from datetime import datetime
from airflow.decorators import dag, task
from airflow.providers.jdbc.hooks.jdbc import JdbcHook
from airflow.providers.jdbc.operators.jdbc import JdbcOperator
@task
def exe_db2_hook():
sql = """SELECT id, name
FROM TABLE_NAME
FETCH FIRST 10 ROWS ONLY"""
print("---------- jdbc_hook ----------")
jdbc_hook = JdbcHook(task_id="db2_jdbc_hook", jdbc_conn_id="xxxxxxxxxx")
print("---------- jdbc get_records ----------")
results = jdbc_hook.get_records(sql=sql)
print("++++++++++ results ++++++++++")
print(results)
@task
def exe_db2_query():
"""
An operator simply executes a query. Its most common use-case is to update, insert, or
delete data. It can run a select query, but it doesn't return the results.
Instead, a hook is used to read and return data.
"""
sql = """SELECT id, name
FROM TABLE_NAME
FETCH FIRST 10 ROWS ONLY"""
result = JdbcOperator(task_id="jdbc_op", jdbc_conn_id="xxxxxxxxxx", sql=sql, autocommit=True)
print("---------- result ----------")
print(result.output)
@dag(
schedule_interval=None,
start_date=datetime(2024, 1, 10),
catchup=False,
)
def db2_test():
# task_result = exe_db2_query()
task_result = exe_db2_hook()
task_result
_ = db2_test()
How it works:
- Airflow uses the python package
jaydebeapi
to connect to your database.
References: